In [16]:
from pyspark import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

In [4]:
my_conf = SparkConf()
my_conf.set("spark.app.name","app1")
my_conf.set("spark.master","local[*]")

<pyspark.conf.SparkConf at 0x23b6e814670>

In [5]:
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

In [6]:
customersDf = spark.read.csv("../input_data/starter/customers.csv",header=True)
customersDf.show()

+-----------+-------------+
|customer_id|loyalty_score|
+-----------+-------------+
|         C1|            7|
|         C2|            4|
|         C3|            8|
|         C4|            5|
|         C5|            7|
|         C6|           10|
|         C7|            3|
|         C8|            7|
|         C9|            8|
|        C10|            5|
|        C11|            4|
|        C12|            8|
|        C13|            8|
|        C14|            3|
|        C15|            6|
|        C16|            5|
|        C17|            2|
|        C18|            8|
|        C19|            6|
|        C20|            2|
+-----------+-------------+
only showing top 20 rows



In [7]:
productsDf = spark.read.csv("../input_data/starter/products.csv",header=True)
productsDf.show()

+----------+--------------------+----------------+
|product_id| product_description|product_category|
+----------+--------------------+----------------+
|       P01|           detergent|           house|
|       P02|        kitchen roll|           house|
|       P03|          bin liners|           house|
|       P04|          shower gel|           house|
|       P05|     scented candles|           house|
|       P06|     fabric softener|           house|
|       P07|          cling film|           house|
|       P08|      aluminium foil|           house|
|       P09|        toilet paper|           house|
|       P10|       kitchen knife|           house|
|       P11|  dishwasher tablets|           house|
|       P12|            ice pack|           house|
|       P13|men's dark green ...|         clothes|
|       P14|       women's shoes|         clothes|
|       P15|              jumper|         clothes|
|       P16|          men's belt|         clothes|
|       P17| women's black sock

In [9]:
ordersDf = spark.read.json(r"..\.\input_data\starter\transactions\d=2018-12-01\transactions.json")
#only read one json since it was taking too long to read enitre subdirectory recursively

In [14]:
ordersDf.show()

+--------------------+-----------+-------------------+
|              basket|customer_id|   date_of_purchase|
+--------------------+-----------+-------------------+
|         [{15, P27}]|         C3|2018-12-01 23:36:00|
|[{718, P37}, {567...|         C4|2018-12-01 17:19:00|
|[{302, P27}, {572...|         C9|2018-12-01 18:20:00|
|[{1999, P36}, {16...|        C10|2018-12-01 19:56:00|
|[{720, P15}, {179...|        C16|2018-12-01 03:01:00|
|       [{1595, P52}]|        C18|2018-12-01 18:47:00|
|[{1642, P36}, {12...|        C23|2018-12-01 23:14:00|
|[{183, P61}, {206...|        C27|2018-12-01 10:08:00|
|[{853, P47}, {158...|        C42|2018-12-01 05:02:00|
|       [{1094, P64}]|        C43|2018-12-01 13:43:00|
|[{495, P33}, {147...|        C55|2018-12-01 12:46:00|
|[{1543, P01}, {11...|        C56|2018-12-01 09:35:00|
|        [{382, P12}]|        C67|2018-12-01 21:32:00|
|        [{970, P54}]|        C79|2018-12-01 20:06:00|
|[{1850, P60}, {19...|        C81|2018-12-01 07:38:00|
|[{1220, P

In [17]:
explodedOrders = ordersDf.withColumn("product_id", explode(col("basket.product_id")))
explodedOrders.show()

+--------------------+-----------+-------------------+----------+
|              basket|customer_id|   date_of_purchase|product_id|
+--------------------+-----------+-------------------+----------+
|         [{15, P27}]|         C3|2018-12-01 23:36:00|       P27|
|[{718, P37}, {567...|         C4|2018-12-01 17:19:00|       P37|
|[{718, P37}, {567...|         C4|2018-12-01 17:19:00|       P40|
|[{718, P37}, {567...|         C4|2018-12-01 17:19:00|       P39|
|[{302, P27}, {572...|         C9|2018-12-01 18:20:00|       P27|
|[{302, P27}, {572...|         C9|2018-12-01 18:20:00|       P22|
|[{302, P27}, {572...|         C9|2018-12-01 18:20:00|       P29|
|[{1999, P36}, {16...|        C10|2018-12-01 19:56:00|       P36|
|[{1999, P36}, {16...|        C10|2018-12-01 19:56:00|       P38|
|[{720, P15}, {179...|        C16|2018-12-01 03:01:00|       P15|
|[{720, P15}, {179...|        C16|2018-12-01 03:01:00|       P20|
|       [{1595, P52}]|        C18|2018-12-01 18:47:00|       P52|
|[{1642, P

In [18]:
df  = explodedOrders.join(productsDf,["product_id"]).join(customersDf,["customer_id"]).select("customer_id","loyalty_score", "product_id", "product_category")
df.show()

+-----------+-------------+----------+----------------+
|customer_id|loyalty_score|product_id|product_category|
+-----------+-------------+----------+----------------+
|         C3|            8|       P27|       fruit_veg|
|         C4|            5|       P37|          sweets|
|         C4|            5|       P40|          sweets|
|         C4|            5|       P39|          sweets|
|         C9|            8|       P27|       fruit_veg|
|         C9|            8|       P22|       fruit_veg|
|         C9|            8|       P29|       fruit_veg|
|        C10|            5|       P36|          sweets|
|        C10|            5|       P38|          sweets|
|        C16|            5|       P15|         clothes|
|        C16|            5|       P20|         clothes|
|        C18|            8|       P52|            food|
|        C23|           10|       P36|          sweets|
|        C23|           10|       P39|          sweets|
|        C27|           10|       P61|          

In [21]:
productMerged = df.groupBy(["customer_id"]).agg(collect_list('product_id').alias('Products_id'))
categoryMerged = df.groupBy(["customer_id"]).agg(collect_list("product_category").alias('Products_category'))
purchaseCoun = df.groupBy(["customer_id"]).agg(count('customer_id').alias('Purchase_count'))

In [23]:
df1 = df.join(productMerged,['customer_id']) \
    .join(categoryMerged,['customer_id']) \
    .join(purchaseCoun,['customer_id']) \
    .select("customer_id","loyalty_score", "Products_id", "Products_category","Purchase_count") \
    .distinct()
df1.show()

+-----------+-------------+---------------+--------------------+--------------+
|customer_id|loyalty_score|    Products_id|   Products_category|Purchase_count|
+-----------+-------------+---------------+--------------------+--------------+
|        C98|            7|[P09, P12, P03]|[house, house, ho...|             3|
|       C104|            7|[P46, P54, P55]|  [food, food, food]|             3|
|       C107|            3|     [P62, P61]|          [bws, bws]|             2|
|       C113|            5|[P01, P12, P05]|[house, house, ho...|             3|
|        C42|            2|     [P47, P45]|        [food, food]|             2|
|       C130|            7|[P09, P08, P10]|[house, house, ho...|             3|
|       C102|            1|     [P24, P27]|[fruit_veg, fruit...|             2|
|        C87|            3|     [P42, P47]|        [food, food]|             2|
|       C103|            7|          [P63]|               [bws]|             1|
|        C18|            8|          [P5

In [24]:
pandas_df = df1.toPandas()
pandas_df.set_index("customer_id")
pandas_df.to_json("../test.JSON",orient='records')
#saving json