In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
from pyspark import SparkConf

In [2]:
import pandas as pd
import glob
import os

In [3]:
spark = SparkSession \
        .builder \
        .appName("SuperMarket Pre-processing") \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext

In [4]:
df = spark.sql("""
SELECT *
FROM default.supermarket
""").cache()

df.show(5)

+---------+---------+---------------+--------------+----------+----------+--------+-----+
|SHOP_DATE|SHOP_HOUR|      BASKET_ID|     CUST_CODE|STORE_CODE| PROD_CODE|QUANTITY|SPEND|
+---------+---------+---------------+--------------+----------+----------+--------+-----+
| 20071006|       21|994107800268406|CUST0000153531|STORE00001|PRD0901391|       1| 0.37|
| 20070201|       15|994104300305853|CUST0000219191|STORE00002|PRD0901915|       1| 5.08|
| 20071103|       13|994108200514137|CUST0000526979|STORE00003|PRD0903379|       1| 2.36|
| 20070206|       18|994104400743650|CUST0000913709|STORE00004|PRD0903305|       1|  0.2|
| 20071015|       19|994108000780959|CUST0000961285|STORE00001|PRD0903387|       1| 1.65|
+---------+---------+---------------+--------------+----------+----------+--------+-----+
only showing top 5 rows



In [5]:
Cust_Prod_Sum = df.select("CUST_CODE", "PROD_CODE", "QUANTITY") \
                    .groupBy("CUST_CODE", "PROD_CODE") \
                    .sum()

Cust_Prod_Sum.show(5)

+--------------+----------+-------------+
|     CUST_CODE| PROD_CODE|sum(QUANTITY)|
+--------------+----------+-------------+
|CUST0000402925|PRD0902707|            1|
|CUST0000590958|PRD0903596|           22|
|CUST0000543607|PRD0904399|            2|
|CUST0000140715|PRD0901150|            1|
|CUST0000953469|PRD0900351|            1|
+--------------+----------+-------------+
only showing top 5 rows



In [None]:
# Pivot Customer Product Sum
Pivot_Cust_Prod_Sum = Cust_Prod_Sum.groupBy("CUST_CODE") \
                                .pivot("PROD_CODE") \
                                .sum("sum(QUANTITY)") \
                                .na.fill(0)

# Create View
Pivot_Cust_Prod_Sum.createOrReplaceTempView('Pivot_Cust_Prod_Sum')

# Create Table from View
spark.sql("""
CREATE TABLE default.pivot_cust_prod_sum STORED AS PARQUET AS
SELECT *
FROM Pivot_Cust_Prod_Sum
""")

DataFrame[]

In [9]:
spark.sql("""
SELECT *
FROM Pivot_Cust_Prod_Sum
""").write.format("csv") \
    .option("header", "true") \
    .save("/tmp/Pivot_Customer_Product")

In [10]:
!hdfs dfs -copyToLocal /tmp/Pivot_Customer_Product /data/

In [11]:
os.chdir("/data/Pivot_Customer_Product")

extension = 'csv'
all_filenames = [i for i in glob.glob('*.{}'.format(extension))]

# merge all files
merged_csv = pd.concat([pd.read_csv(f) for f in all_filenames ])
# Save file to csv
merged_csv.to_csv( "Pivot_Customer_Product.csv", index=False, encoding='utf-8')

In [12]:
Pivot_df = pd.read_csv('/data/Pivot_Customer_Product/Pivot_Customer_Product.csv')

Pivot_df.head()

Unnamed: 0,CUST_CODE,PRD0900001,PRD0900002,PRD0900003,PRD0900004,PRD0900005,PRD0900006,PRD0900007,PRD0900008,PRD0900009,...,PRD0904988,PRD0904989,PRD0904990,PRD0904991,PRD0904992,PRD0904993,PRD0904994,PRD0904995,PRD0904996,PRD0904997
0,CUST0000515317,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,CUST0000370270,0,0,0,0,0,0,0,20,0,...,0,0,0,0,0,0,0,0,0,0
2,CUST0000409759,0,0,0,0,0,0,0,2,0,...,4,0,0,0,0,0,0,0,0,0
3,CUST0000623317,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,CUST0000212278,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
