In [1]:
import findspark

In [None]:
findspark.find()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [4]:
sc= SparkSession.builder.appName('DataTest').master('local').getOrCreate()

In [57]:
trans_df = sc.read.option('multiline',True).json('./input_data/starter/transactions')
cust_df=sc.read.option('header',True).csv('./input_data/starter/customers.csv')
prod_df = sc.read.option('header',True).csv('./input_data/starter/products.csv')

In [58]:
trans_df.printSchema()
cust_df.printSchema()
prod_df.printSchema()

root
 |-- basket: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- price: long (nullable = true)
 |    |    |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- date_of_purchase: string (nullable = true)
 |-- d: date (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- loyalty_score: string (nullable = true)

root
 |-- product_id: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_category: string (nullable = true)



In [60]:
join_cust_json=cust_df.join(trans_df,['customer_id'],"inner")
join_cust_json.show(3)

+-----------+-------------+--------------------+-------------------+----------+
|customer_id|loyalty_score|              basket|   date_of_purchase|         d|
+-----------+-------------+--------------------+-------------------+----------+
|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|
|         C8|            7|[{819, P03}, {171...|2019-02-20 15:45:00|2019-02-20|
|         C2|            4|[{102, P11}, {565...|2019-01-17 18:25:00|2019-01-17|
+-----------+-------------+--------------------+-------------------+----------+
only showing top 3 rows



In [61]:
add_col= join_cust_json.withColumn('basket_explode',explode('basket'))\
                    .withColumn('product_id',col('basket_explode.product_id'))\
                    .withColumn('product_price',col('basket_explode.price'))
add_col.show(3)

+-----------+-------------+--------------------+-------------------+----------+--------------+----------+-------------+
|customer_id|loyalty_score|              basket|   date_of_purchase|         d|basket_explode|product_id|product_price|
+-----------+-------------+--------------------+-------------------+----------+--------------+----------+-------------+
|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|    {742, P50}|       P50|          742|
|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|   {1838, P51}|       P51|         1838|
|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|    {678, P47}|       P47|          678|
+-----------+-------------+--------------------+-------------------+----------+--------------+----------+-------------+
only showing top 3 rows



In [62]:
join_all=add_col.join(prod_df,['product_id'],"inner")
join_all.show(3)

+----------+-----------+-------------+--------------------+-------------------+----------+--------------+-------------+-------------------+----------------+
|product_id|customer_id|loyalty_score|              basket|   date_of_purchase|         d|basket_explode|product_price|product_description|product_category|
+----------+-----------+-------------+--------------------+-------------------+----------+--------------+-------------+-------------------+----------------+
|       P50|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|    {742, P50}|          742|          camembert|            food|
|       P51|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|   {1838, P51}|         1838|              pizza|            food|
|       P47|         C1|            7|[{742, P50}, {183...|2018-12-25 10:48:00|2018-12-25|    {678, P47}|          678|      whole chicken|            food|
+----------+-----------+-------------+--------------------

In [63]:
not_req=['basket','d','product_description']
final_df=join_all.drop(*(not_req))
final_df=final_df.withColumn("weekoftheyear",weekofyear(col("date_of_purchase")))
final_df.show(3)

+----------+-----------+-------------+-------------------+--------------+-------------+----------------+-------------+
|product_id|customer_id|loyalty_score|   date_of_purchase|basket_explode|product_price|product_category|weekoftheyear|
+----------+-----------+-------------+-------------------+--------------+-------------+----------------+-------------+
|       P50|         C1|            7|2018-12-25 10:48:00|    {742, P50}|          742|            food|           52|
|       P51|         C1|            7|2018-12-25 10:48:00|   {1838, P51}|         1838|            food|           52|
|       P47|         C1|            7|2018-12-25 10:48:00|    {678, P47}|          678|            food|           52|
+----------+-----------+-------------+-------------------+--------------+-------------+----------------+-------------+
only showing top 3 rows



In [65]:
final_df=final_df.groupBy('weekoftheyear','customer_id','loyalty_score','product_id','product_category').count()
final_df.show(3)

+-------------+-----------+-------------+----------+----------------+-----+
|weekoftheyear|customer_id|loyalty_score|product_id|product_category|count|
+-------------+-----------+-------------+----------+----------------+-----+
|           52|         C2|            4|       P08|           house|    1|
|           49|        C14|            3|       P33|       fruit_veg|    1|
|           50|         C6|           10|       P35|          sweets|    1|
+-------------+-----------+-------------+----------+----------------+-----+
only showing top 3 rows



In [66]:
select_col=['customer_id','loyalty_score','product_id','product_category','weekoftheyear']
final_df=final_df.select(*(select_col),
              col("count").alias("purchase_count") 
  )
final_df.show(3)

+-----------+-------------+----------+----------------+-------------+--------------+
|customer_id|loyalty_score|product_id|product_category|weekoftheyear|purchase_count|
+-----------+-------------+----------+----------------+-------------+--------------+
|         C2|            4|       P08|           house|           52|             1|
|        C14|            3|       P33|       fruit_veg|           49|             1|
|         C6|           10|       P35|          sweets|           50|             1|
+-----------+-------------+----------+----------------+-------------+--------------+
only showing top 3 rows



In [54]:
output_dir='./output_data/outputs/'
final_df.write.format("json")\
.mode("overwrite")\
.partitionBy("weekoftheyear")\
.save(output_dir)