# Start spark application

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/jovyan/postgresql-42.2.5.jar --jars /home/jovyan/postgresql-42.2.5.jar pyspark-shell'

import pyspark

spark = pyspark.sql.SparkSession.builder \
        .master("local[*]") \
        .getOrCreate()

# Warm-up spark

In [10]:
spark.sparkContext.range(1000).sum()

499500

# Read some PostgreSQL Data

In [92]:
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

customers.show()

+----+----------+---------+--------------------+
|  id|first_name|last_name|               email|
+----+----------+---------+--------------------+
|1002|    George|   Bailey|  gbailey@foobar.com|
|1009|     Keith|    Doyle|aparks@anderson-b...|
|1004|      Anne|Kretchmar|allenkaren@hotmai...|
|1018|    Daniel|    Smith|youngadrian@gmail...|
|1022|   Caitlyn|   Ingram| aramirez@wiley.info|
|1001|     Sally|   Thomas|megan39@robertson...|
|1003|    Edward|   Walker| noahharvey@kemp.biz|
|1026|     Peter|Rodriguez|williamtrujillo@h...|
|1028| Dominique|  Ramirez|mpeterson@sandova...|
|1029|   Anthony|   Graham|   tracy26@gmail.com|
|1010|   Michael| Crawford|jefferyestrada@ho...|
|1030|     Cindy|  Jackson|michael10@jackson...|
|1020|   Jessica|    Mejia|     hhess@perez.com|
|1032|    Leslie|  Spencer|mcgeerobert@gmail...|
|1023|      Sara|    Allen|  kjohnson@yahoo.com|
|1035|   Timothy|   Potter|vstevens@castillo...|
|1037|     James|   Barron|sethmcgrath@henry...|
|1038|    Alexis|   

# Write snapshot to Data Lake

In [8]:
customers.write.format("parquet").save("/home/jovyan/customers_data")

# Generate some updates on the table

In [13]:
! pip install psycopg2-binary

Collecting psycopg2-binary
[?25l  Downloading https://files.pythonhosted.org/packages/f3/21/b7ccc8ae35e5b6ae62bfe47181353628bae52489c7798f00efd7916de543/psycopg2_binary-2.8.3-cp37-cp37m-manylinux1_x86_64.whl (2.9MB)
[K     |████████████████████████████████| 2.9MB 1.3MB/s eta 0:00:01
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.8.3


In [19]:
! pip install Faker

Collecting Faker
[?25l  Downloading https://files.pythonhosted.org/packages/52/1a/930431923062857520bae512101a648ef528cd327583fda38d9e76fab5ce/Faker-1.0.7-py2.py3-none-any.whl (874kB)
[K     |████████████████████████████████| 880kB 1.4MB/s eta 0:00:01
Collecting text-unidecode==1.2 (from Faker)
[?25l  Downloading https://files.pythonhosted.org/packages/79/42/d717cc2b4520fb09e45b344b1b0b4e81aa672001dd128c180fabc655c341/text_unidecode-1.2-py2.py3-none-any.whl (77kB)
[K     |████████████████████████████████| 81kB 20.6MB/s eta 0:00:01
Installing collected packages: text-unidecode, Faker
Successfully installed Faker-1.0.7 text-unidecode-1.2


In [35]:
import psycopg2

def connect_to_postgres():
    conn = psycopg2.connect(
        dbname="postgres",
        user="postgres",
        host="postgres",
        password="postgres",
        port=5432,
        options=f'-c search_path=inventory')
    return conn

connection = connect_to_postgres()
cursor = connection.cursor()


In [82]:
from faker import Faker
fake = Faker()

def insert_new_customer(connection, cursor):
    cursor.execute("INSERT INTO customers (first_name, last_name, email) VALUES(%s, %s, %s)", (fake.first_name(),fake.last_name(),fake.email()))
    print("Inserting new customer")
    connection.commit()

def update_old_customer(connection, cursor):
    cursor.execute("select id from customers ORDER BY random()")
    customer_id = cursor.fetchone()[0]
    cursor.execute("UPDATE customers SET email = %s where id = %s", (fake.email(),customer_id))
    print("Updating old customer with id = %s" % customer_id)
    connection.commit()
    
def delete_customer(connection, cursor):
    cursor.execute("select id from customers where id>1005 ORDER BY random()")
    customer_id = cursor.fetchone()[0]
    cursor.execute("DELETE FROM customers where id = %s", (customer_id,))
    print("Deleting customer with id = %s" % customer_id  )
    connection.commit()
    
    
insert_new_customer(connection, cursor)
update_old_customer(connection, cursor)
delete_customer(connection, cursor)





Inserting new customer
Updating old customer with id = 1004
Deleting customer with id = 1006


In [86]:
import numpy as np

def generate_operation():
    randomizer = np.random.uniform()
    if randomizer>0.9:
        delete_customer(connection, cursor)
    elif (randomizer>0.7 and randomizer<=0.9):
        update_old_customer(connection, cursor)
    else:
        insert_new_customer(connection, cursor)

In [87]:
import time 

ops_counter = 0

while True:
    generate_operation()
    ops_counter = ops_counter + 1
    if ops_counter % 10 == 0:
        time.sleep(5)

Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Updating old customer with id = 1001
Inserting new customer
Inserting new customer
Updating old customer with id = 1003
Updating old customer with id = 1025
Inserting new customer
Deleting customer with id = 1019
Updating old customer with id = 1026
Inserting new customer
Inserting new customer
Inserting new customer
Updating old customer with id = 1020
Updating old customer with id = 1010
Inserting new customer
Updating old customer with id = 1020
Updating old customer with id = 1013
Inserting new customer
Inserting new customer
Updating old customer with id = 1013
Updating old customer with id = 1023
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Inserting new customer
Updating old customer with id = 1033
Inserting new customer
Insertin

KeyboardInterrupt: 