In [4]:
pip install pyspark==3.0.0

Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install psycopg2-binary randomuser

Note: you may need to restart the kernel to use updated packages.


In [6]:
import psycopg2

In [7]:
def connect_db():
    connection = psycopg2.connect(user="datascience",
                                  password="datascience",
                                  host="datascience",
                                  port="5432",
                                  database="datascience")
    return connection

In [8]:
create_tables = '''
DROP Table IF EXISTS "ecommerce_customer";
CREATE TABLE "ecommerce_customer" (
  "id" SERIAL PRIMARY KEY NOT NULL,
  "email" varchar(255) NOT NULL,
  "email_canonical" varchar(255) NOT NULL,
  "first_name" varchar(255) DEFAULT NULL,
  "last_name" varchar(255) DEFAULT NULL,
  "gender" varchar(1) NOT NULL DEFAULT 'u',
  "created_at" timestamp NOT NULL,
  "updated_at" timestamp DEFAULT NULL,
  "phone_number" varchar(255) DEFAULT NULL,
  "subscribed_to_newsletter" varchar(1) NOT NULL
);

CREATE UNIQUE INDEX "UNIQ_7E82D5E6E7927C74" ON "ecommerce_customer" ("email");
CREATE UNIQUE INDEX "UNIQ_7E82D5E6A0D96FBF" ON "ecommerce_customer" ("email_canonical");

'''

In [9]:
connection = connect_db()
cursor = connection.cursor()
cursor.execute(create_tables)
connection.commit()

In [10]:
from random import randint
import random
def create_account(mycursor, user):
    sql = '''
        INSERT INTO ecommerce_customer (email, email_canonical, first_name, last_name, gender, created_at, updated_at, phone_number, subscribed_to_newsletter)
        VALUES
        (%s, %s, %s, %s, %s, now(), now(), %s, %s);
    '''
    args = (user.get_email(), user.get_email(), user.get_first_name(), user.get_last_name(), random.choice(['m', 'f']), user.get_phone(), randint(0, 1))
    
    mycursor.execute(sql, args)


In [14]:
from randomuser import RandomUser
def random_user_exists(mycursor):
    user = RandomUser()
    create_account(mycursor, user)

In [15]:
connection = connect_db()
cursor = connection.cursor()
for x in range(10):
    random_user_exists(cursor)
connection.commit()

In [16]:
from pyspark.sql import SparkSession

working_directory = 'jars/*'

spark = SparkSession.\
        builder.\
        appName("pyspark-with-mongo").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1g").\
        config("spark.mongodb.input.uri","mongodb://mongodb:27017/ecommerce.orders").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,org.postgresql:postgresql:42.2.18").\
        config('spark.driver.extraClassPath', working_directory). \
        config('spark.jars', working_directory).\
        getOrCreate()

sc = spark.sparkContext

In [17]:
sc

In [18]:
customers = spark.read.format('jdbc')\
    .option("url", "jdbc:postgresql://datascience:5432/datascience")\
    .option("user", "datascience")\
    .option("password", "datascience")\
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "ecommerce_customer")\
    .load()
    
    

In [19]:
customers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- email_canonical: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- subscribed_to_newsletter: string (nullable = true)



In [20]:
customers.select('email', 'id').show()

+--------------------+---+
|               email| id|
+--------------------+---+
|enzo.jean@example...|  1|
|hillebrand.vander...|  2|
|isabella.freeman@...|  3|
|louka.noel@exampl...|  4|
|kjell.hoffmann@ex...|  5|
|jessica.perkins@e...|  6|
|alix.dupuis@examp...|  7|
|miriam.barros@exa...|  8|
|ines.gil@example.com|  9|
|maxime.garnier@ex...| 10|
+--------------------+---+



In [21]:
pip install pymongo

Note: you may need to restart the kernel to use updated packages.


In [22]:
from pymongo import MongoClient
from pprint import pprint as pp
client = MongoClient('mongodb://mongodb:27017')
db = client.ecommerce

In [23]:
import random
def generate_order():
    return {
        "customer_id": random.randint(0,10),
        "total": random.randint(2000, 5000),
        "lines": [{"product_id": random.randint(1,200), "quantity": random.randint(1,5)} for i in range (1, random.randint(1, 10))]
    }

In [24]:
for x in range(10):
    order = generate_order()
    db.orders.insert_one(order)

In [25]:
orders = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://mongodb:27017/ecommerce.orders")\
    .load()

In [26]:
orders.select('*').show()

+--------------------+-----------+--------------------+-----+
|                 _id|customer_id|               lines|total|
+--------------------+-----------+--------------------+-----+
|[6001ae6e8f657fd0...|          4|[[130, 4], [133, ...| 4375|
|[6001ae6e8f657fd0...|          8|          [[101, 4]]| 3113|
|[6001ae6e8f657fd0...|          2|            [[5, 1]]| 4508|
|[6001ae6e8f657fd0...|          0|[[81, 2], [177, 4...| 2171|
|[6001ae6e8f657fd0...|          0|[[41, 3], [115, 3...| 2297|
|[6001ae6e8f657fd0...|          1|[[186, 2], [19, 4...| 4654|
|[6001ae6e8f657fd0...|          2| [[59, 2], [147, 5]]| 3468|
|[6001ae6e8f657fd0...|         10|[[170, 2], [74, 5...| 3674|
|[6001ae6e8f657fd0...|          8|[[151, 4], [175, 5]]| 4780|
|[6001ae6e8f657fd0...|          4|[[172, 4], [60, 2...| 3271|
+--------------------+-----------+--------------------+-----+



### pytania
* Który customer zakupił za największą kwotę
* Czy kupujący posiadający subskrypcję do newslettera kupują więcej?

In [27]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [28]:
orders.registerTempTable("orders")
customers.registerTempTable("customers")

In [32]:
top_buyers = sqlContext.sql('''
    select c.id, sum(o.total) as total
        from orders o
        JOIN customers c on o.customer_id = c.id
        group by c.id
        order by total DESC
''').show()

+---+-----+
| id|total|
+---+-----+
|  2| 7976|
|  8| 7893|
|  4| 7646|
|  1| 4654|
| 10| 3674|
+---+-----+



In [40]:
toBeAwarded = sqlContext.sql('''
    select 
        rank() over (order by total desc) as place,
        first_name, 
        last_name,
        total 
    from (
        select c.id, sum(o.total) as total
        from orders o
        JOIN customers c on o.customer_id = c.id
        group by c.id
        order by total DESC
    ) as t1 join customers c on t1.id = c.id order by total desc
    
''')

In [41]:
toBeAwarded.show()

+-----+----------+-------------+-----+
|place|first_name|    last_name|total|
+-----+----------+-------------+-----+
|    1|Hillebrand|Van der Weele| 7976|
|    2|    Miriam|       Barros| 7893|
|    3|     Louka|         Noel| 7646|
|    4|      Enzo|         Jean| 4654|
|    5|    Maxime|      Garnier| 3674|
+-----+----------+-------------+-----+



In [42]:
customers = toBeAwarded.collect()

In [46]:
customers

[Row(place=1, first_name='Hillebrand', last_name='Van der Weele', total=7976),
 Row(place=2, first_name='Miriam', last_name='Barros', total=7893),
 Row(place=3, first_name='Louka', last_name='Noel', total=7646),
 Row(place=4, first_name='Enzo', last_name='Jean', total=4654),
 Row(place=5, first_name='Maxime', last_name='Garnier', total=3674)]

In [47]:
sc.stop()