In [None]:
import pandas as pd
import os
import sys
import pyspark
import findspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_date, col, substring
import re

In [None]:
findspark.init('/opt/spark/')
spark = SparkSession.builder.appName('SparkExample').config('spark.driver.python', '/snap/jupyter/6/bin/python3').getOrCreate()

In [2]:
dfs = spark.read.csv("hdfs://localhost:9000/transactions/*.csv", header=True, inferSchema=True)

                                                                                

In [3]:
dfs.show()

+---------+---------+------------+---------+--------+-----+----------+------------+------------+------------+------------+--------------+-----+-----+---------------+-----------+------------------------+-----------+-----------------------+----------+------------+------------+
|SHOP_WEEK|SHOP_DATE|SHOP_WEEKDAY|SHOP_HOUR|QUANTITY|SPEND| PROD_CODE|PROD_CODE_10|PROD_CODE_20|PROD_CODE_30|PROD_CODE_40|     CUST_CODE|seg_1|seg_2|      BASKET_ID|BASKET_SIZE|BASKET_PRICE_SENSITIVITY|BASKET_TYPE|BASKET_DOMINANT_MISSION|STORE_CODE|STORE_FORMAT|STORE_REGION|
+---------+---------+------------+---------+--------+-----+----------+------------+------------+------------+------------+--------------+-----+-----+---------------+-----------+------------------------+-----------+-----------------------+----------+------------+------------+
|   200643| 20061224|           1|       14|       1| 1.77|PRD0900008|     CL00042|    DEP00011|      G00004|      D00002|CUST0000096000|   AZ|   BU|994103700265943|       

In [4]:
dfs.printSchema()

root
 |-- SHOP_WEEK: string (nullable = true)
 |-- SHOP_DATE: string (nullable = true)
 |-- SHOP_WEEKDAY: string (nullable = true)
 |-- SHOP_HOUR: string (nullable = true)
 |-- QUANTITY: string (nullable = true)
 |-- SPEND: string (nullable = true)
 |-- PROD_CODE: string (nullable = true)
 |-- PROD_CODE_10: string (nullable = true)
 |-- PROD_CODE_20: string (nullable = true)
 |-- PROD_CODE_30: string (nullable = true)
 |-- PROD_CODE_40: string (nullable = true)
 |-- CUST_CODE: string (nullable = true)
 |-- seg_1: string (nullable = true)
 |-- seg_2: string (nullable = true)
 |-- BASKET_ID: string (nullable = true)
 |-- BASKET_SIZE: string (nullable = true)
 |-- BASKET_PRICE_SENSITIVITY: string (nullable = true)
 |-- BASKET_TYPE: string (nullable = true)
 |-- BASKET_DOMINANT_MISSION: string (nullable = true)
 |-- STORE_CODE: string (nullable = true)
 |-- STORE_FORMAT: string (nullable = true)
 |-- STORE_REGION: string (nullable = true)



### Perform groupby operations to aggregate data by date for:
##### total spend by store code
##### total spend by store format
##### total spend by prod code / prod_code_10 / prod_code_20 etc - depending on the highest level of granularity during EDA


In [5]:
dfs.createOrReplaceTempView("STORE_DATA")

In [6]:
store_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE")

In [7]:
store_df.show()

[Stage 7:>                                                          (0 + 1) / 1]

+---------+----------+---------+-----------+
|SHOP_DATE|STORE_CODE|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+---------+-----------+
| 20061223|STORE00003|    215.0|     316.65|
| 20061219|STORE00113|    235.0|     340.55|
| 20061221|STORE00120|     21.0|      70.53|
| 20061224|STORE00227|     73.0|     164.17|
| 20061224|STORE00432|     86.0|     118.75|
| 20061219|STORE00494|     13.0|      10.78|
| 20061221|STORE00633|    144.0|     228.09|
| 20061221|STORE00639|    155.0|      178.8|
| 20061222|STORE01091|      9.0|      12.73|
| 20061220|STORE01215|    125.0|     202.03|
| 20061218|STORE01289|     36.0|      37.31|
| 20061222|STORE01330|     24.0|      22.07|
| 20061222|STORE01603|     21.0|       27.4|
| 20061224|STORE01651|    113.0|     137.57|
| 20061222|STORE01672|     41.0|      53.55|
| 20061221|STORE01707|    264.0|     350.76|
| 20061223|STORE01793|     23.0|      18.85|
| 20061223|STORE02524|    147.0|     200.77|
| 20061219|STORE02575|      8.0|      28.24|
| 20061220

                                                                                

In [8]:
store_format_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, STORE_FORMAT, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE, STORE_FORMAT")

In [9]:
store_format_df.show()



+---------+----------+------------+---------+-----------+
|SHOP_DATE|STORE_CODE|STORE_FORMAT|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+------------+---------+-----------+
| 20061218|STORE00107|          LS|     98.0|     153.96|
| 20061218|STORE00120|          MS|      8.0|      22.61|
| 20061218|STORE00422|          MS|     78.0|     147.67|
| 20061221|STORE00425|          MS|     63.0|     118.09|
| 20061221|STORE00460|          LS|    217.0|     317.46|
| 20061221|STORE00807|          LS|     66.0|      79.44|
| 20061222|STORE00840|          LS|    325.0|     487.22|
| 20061221|STORE00877|          LS|    290.0|     487.31|
| 20061223|STORE00895|          SS|      6.0|       8.73|
| 20061222|STORE00966|          MS|    145.0|     194.21|
| 20061218|STORE01035|          LS|     16.0|      16.61|
| 20061219|STORE01160|          LS|    108.0|     136.29|
| 20061224|STORE01298|          SS|     26.0|      26.28|
| 20061221|STORE01316|          LS|    245.0|     429.21|
| 20061221|STO

                                                                                

In [10]:
prod_10_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, PROD_CODE_10, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE, PROD_CODE_10")

In [11]:
prod_10_df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+---------+----------+------------+---------+-----------+
|SHOP_DATE|STORE_CODE|PROD_CODE_10|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+------------+---------+-----------+
| 20060410|STORE00001|     CL00012|      1.0|       0.01|
| 20060410|STORE00001|     CL00020|      5.0|        2.2|
| 20060410|STORE00001|     CL00201|      1.0|       0.81|
| 20060410|STORE00002|     CL00001|      2.0|       1.72|
| 20060410|STORE00003|     CL00023|      1.0|       1.93|
| 20060410|STORE00003|     CL00065|      1.0|       1.16|
| 20060410|STORE00003|     CL00074|      2.0|       5.22|
| 20060410|STORE00003|     CL00128|      1.0|       1.02|
| 20060410|STORE00003|     CL00135|      1.0|       0.16|
| 20060410|STORE00003|     CL00157|      2.0|       1.58|
| 20060410|STORE00003|     CL00161|      3.0|        3.3|
| 20060410|STORE00004|     CL00017|      1.0|       3.35|
| 20060410|STORE00004|     CL00043|      1.0|       0.97|
| 20060410|STORE00004|     CL00063|      6.0|       6.64|
| 20060410|STO

                                                                                

In [12]:
prod_20_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, PROD_CODE_20, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE, PROD_CODE_20")

In [13]:
prod_20_df.show()

[Stage 16:>                                                         (0 + 1) / 1]

+---------+----------+------------+---------+-----------+
|SHOP_DATE|STORE_CODE|PROD_CODE_20|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+------------+---------+-----------+
| 20060410|STORE00001|    DEP00003|      1.0|       0.01|
| 20060410|STORE00001|    DEP00005|      5.0|        2.2|
| 20060410|STORE00001|    DEP00025|      3.0|       2.61|
| 20060410|STORE00001|    DEP00051|      3.0|       3.97|
| 20060410|STORE00001|    DEP00067|      1.0|       0.81|
| 20060410|STORE00001|    DEP00070|      1.0|       0.67|
| 20060410|STORE00002|    DEP00008|      2.0|       2.21|
| 20060410|STORE00002|    DEP00011|      1.0|        0.4|
| 20060410|STORE00003|    DEP00008|     13.0|      17.66|
| 20060410|STORE00003|    DEP00016|      1.0|       5.14|
| 20060410|STORE00003|    DEP00021|     11.0|      16.59|
| 20060410|STORE00003|    DEP00025|      4.0|       4.83|
| 20060410|STORE00003|    DEP00069|      1.0|       1.87|
| 20060410|STORE00003|    DEP00073|      3.0|       1.87|
| 20060410|STO

                                                                                

In [14]:
prod_30_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, PROD_CODE_30, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE, PROD_CODE_30")

In [15]:
prod_30_df.show()

[Stage 19:>                                                         (0 + 1) / 1]

+---------+----------+------------+---------+-----------+
|SHOP_DATE|STORE_CODE|PROD_CODE_30|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+------------+---------+-----------+
| 20060410|STORE00001|      G00002|      1.0|       0.01|
| 20060410|STORE00001|      G00003|      5.0|        2.2|
| 20060410|STORE00001|      G00006|      1.0|       2.58|
| 20060410|STORE00001|      G00010|      3.0|       1.02|
| 20060410|STORE00001|      G00015|      4.0|       5.26|
| 20060410|STORE00002|      G00003|      1.0|       2.05|
| 20060410|STORE00002|      G00011|      1.0|       0.41|
| 20060410|STORE00002|      G00016|      3.0|       3.64|
| 20060410|STORE00002|      G00021|      1.0|        1.4|
| 20060410|STORE00002|      G00023|      1.0|       0.31|
| 20060410|STORE00003|      G00002|      2.0|       2.11|
| 20060410|STORE00003|      G00003|      2.0|        2.2|
| 20060410|STORE00003|      G00004|     20.0|      23.56|
| 20060410|STORE00003|      G00007|     47.0|      67.46|
| 20060410|STO

                                                                                

In [16]:
prod_40_df = spark.sql("SELECT SHOP_DATE, STORE_CODE, PROD_CODE_40, SUM(QUANTITY) AS TOTAL_QTY, ROUND(SUM(SPEND), 2) AS TOTAL_SPEND FROM STORE_DATA GROUP BY SHOP_DATE, STORE_CODE, PROD_CODE_40")

In [17]:
prod_40_df.show()

[Stage 22:>                                                         (0 + 1) / 1]

+---------+----------+------------+---------+-----------+
|SHOP_DATE|STORE_CODE|PROD_CODE_40|TOTAL_QTY|TOTAL_SPEND|
+---------+----------+------------+---------+-----------+
| 20061219|STORE00027|      D00002|     40.0|      58.04|
| 20061218|STORE00030|      D00001|      7.0|       8.51|
| 20061221|STORE00065|      D00004|      4.0|       7.92|
| 20061218|STORE00327|      D00002|     39.0|      59.88|
| 20061222|STORE00327|      D00002|     52.0|      77.29|
| 20061219|STORE00337|      D00001|      4.0|       4.88|
| 20061223|STORE00352|      D00003|      7.0|       5.06|
| 20061218|STORE00377|      D00002|     84.0|     114.26|
| 20061224|STORE00377|      D00004|      2.0|       4.18|
| 20061219|STORE00468|      D00001|      2.0|       2.25|
| 20061218|STORE00500|      D00009|      1.0|       1.35|
| 20061223|STORE00508|      D00001|      5.0|       2.16|
| 20061224|STORE00560|      D00003|     16.0|       11.8|
| 20061219|STORE00574|      D00001|     23.0|      16.83|
| 20061223|STO

                                                                                

In [18]:
prod_40_df.printSchema()

root
 |-- SHOP_DATE: string (nullable = true)
 |-- STORE_CODE: string (nullable = true)
 |-- PROD_CODE_40: string (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



# Convert string fields to numeric by extracting the numeric values only

In [23]:
def extract_and_convert_to_numeric(code):
    numeric_part = re.search(r'\d+$', code).group()
    return int(numeric_part)

In [24]:
def convert_string_col_to_date(df, cols):
    for col_name in cols:
        df_date = df.withColumn(col_name, to_date(col(col_name), "yyyyMMdd"))
    return df_date

In [25]:
# Register the UDF for use with the dataframe
extract_and_convert_to_numeric_udf = udf(extract_and_convert_to_numeric, IntegerType())

In [26]:
def convert_string_col_to_int(df, cols):
    for col_name in cols:
        df = df.withColumn(col_name, extract_and_convert_to_numeric_udf(df[col_name]))
        # df_int.drop(col_name)
    return df

In [27]:
store_df_int = convert_string_col_to_int(store_df, ["STORE_CODE"])
store_df_final = convert_string_col_to_date(store_df_int, ["SHOP_DATE"])

In [28]:
store_df_int.printSchema()

root
 |-- SHOP_DATE: string (nullable = true)
 |-- STORE_CODE: integer (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



In [29]:
store_df_final.printSchema()

root
 |-- SHOP_DATE: date (nullable = true)
 |-- STORE_CODE: integer (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



In [30]:
store_df_final.show()

[Stage 31:>                                                         (0 + 1) / 1]

+----------+----------+---------+-----------+
| SHOP_DATE|STORE_CODE|TOTAL_QTY|TOTAL_SPEND|
+----------+----------+---------+-----------+
|2006-12-23|         3|    215.0|     316.65|
|2006-12-19|       113|    235.0|     340.55|
|2006-12-21|       120|     21.0|      70.53|
|2006-12-24|       227|     73.0|     164.17|
|2006-12-24|       432|     86.0|     118.75|
|2006-12-19|       494|     13.0|      10.78|
|2006-12-21|       633|    144.0|     228.09|
|2006-12-21|       639|    155.0|      178.8|
|2006-12-22|      1091|      9.0|      12.73|
|2006-12-20|      1215|    125.0|     202.03|
|2006-12-18|      1289|     36.0|      37.31|
|2006-12-22|      1330|     24.0|      22.07|
|2006-12-22|      1603|     21.0|       27.4|
|2006-12-24|      1651|    113.0|     137.57|
|2006-12-22|      1672|     41.0|      53.55|
|2006-12-21|      1707|    264.0|     350.76|
|2006-12-23|      1793|     23.0|      18.85|
|2006-12-23|      2524|    147.0|     200.77|
|2006-12-19|      2575|      8.0| 

                                                                                

In [31]:
store_format_df_int = convert_string_col_to_int(store_format_df, ["STORE_CODE"])
store_format_df_final = convert_string_col_to_date(store_format_df_int, ["SHOP_DATE"])

In [32]:
store_format_df_final.printSchema()

root
 |-- SHOP_DATE: date (nullable = true)
 |-- STORE_CODE: integer (nullable = true)
 |-- STORE_FORMAT: string (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



In [44]:
store_format_df_final.select("STORE_FORMAT").distinct().count()

                                                                                

4

In [33]:
prod_10_df.printSchema()

root
 |-- SHOP_DATE: string (nullable = true)
 |-- STORE_CODE: string (nullable = true)
 |-- PROD_CODE_10: string (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



In [34]:
# Extract the prefixes from PROD_CODE_10
prod_10_df = prod_10_df.withColumn("Prefix", substring(col("PROD_CODE_10"), 1, 1))
prod_10_df.select("Prefix").distinct().show()

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)

+------+
|Prefix|
+------+
|     C|
+------+



                                                                                

There is a single prefix for prod_code_10 so we can remove this and store as int without
the risk of aggregating on common numeric portions of prod_code_10

In [35]:
prod_10_df_int = convert_string_col_to_int(prod_10_df, ["STORE_CODE", "PROD_CODE_10"])
prod_10_df_final = convert_string_col_to_date(prod_10_df_int, ["SHOP_DATE"])
prod_10_df_final = prod_10_df_final.drop("Prefix")
prod_10_df_final.show()
prod_10_df_final.printSchema()

[Stage 37:>                                                         (0 + 1) / 1]

+----------+----------+------------+---------+-----------+
| SHOP_DATE|STORE_CODE|PROD_CODE_10|TOTAL_QTY|TOTAL_SPEND|
+----------+----------+------------+---------+-----------+
|2006-04-10|         1|          12|      1.0|       0.01|
|2006-04-10|         1|          20|      5.0|        2.2|
|2006-04-10|         1|         201|      1.0|       0.81|
|2006-04-10|         2|           1|      2.0|       1.72|
|2006-04-10|         3|          23|      1.0|       1.93|
|2006-04-10|         3|          65|      1.0|       1.16|
|2006-04-10|         3|          74|      2.0|       5.22|
|2006-04-10|         3|         128|      1.0|       1.02|
|2006-04-10|         3|         135|      1.0|       0.16|
|2006-04-10|         3|         157|      2.0|       1.58|
|2006-04-10|         3|         161|      3.0|        3.3|
|2006-04-10|         4|          17|      1.0|       3.35|
|2006-04-10|         4|          43|      1.0|       0.97|
|2006-04-10|         4|          63|      6.0|       6.6

                                                                                

In [36]:
# Extract the prefixes from PROD_CODE_20
prod_20_df = prod_20_df.withColumn("Prefix", substring(col("PROD_CODE_20"), 1, 3))
prod_20_df.select("Prefix").distinct().show()



+------+
|Prefix|
+------+
|   DEP|
+------+



                                                                                

In [37]:
prod_20_df_int = convert_string_col_to_int(prod_20_df, ["STORE_CODE", "PROD_CODE_20"])
prod_20_df_final = convert_string_col_to_date(prod_20_df_int, ["SHOP_DATE"])
prod_20_df_final = prod_20_df_final.drop("Prefix")
prod_20_df_final.show()
prod_20_df_final.printSchema()

[Stage 43:>                                                         (0 + 1) / 1]

+----------+----------+------------+---------+-----------+
| SHOP_DATE|STORE_CODE|PROD_CODE_20|TOTAL_QTY|TOTAL_SPEND|
+----------+----------+------------+---------+-----------+
|2006-04-10|         1|           3|      1.0|       0.01|
|2006-04-10|         1|           5|      5.0|        2.2|
|2006-04-10|         1|          25|      3.0|       2.61|
|2006-04-10|         1|          51|      3.0|       3.97|
|2006-04-10|         1|          67|      1.0|       0.81|
|2006-04-10|         1|          70|      1.0|       0.67|
|2006-04-10|         2|           8|      2.0|       2.21|
|2006-04-10|         2|          11|      1.0|        0.4|
|2006-04-10|         3|           8|     13.0|      17.66|
|2006-04-10|         3|          16|      1.0|       5.14|
|2006-04-10|         3|          21|     11.0|      16.59|
|2006-04-10|         3|          25|      4.0|       4.83|
|2006-04-10|         3|          69|      1.0|       1.87|
|2006-04-10|         3|          73|      3.0|       1.8

                                                                                

In [38]:
# Extract the prefixes from PROD_CODE_30
prod_30_df = prod_30_df.withColumn("Prefix", substring(col("PROD_CODE_30"), 1, 1))
prod_30_df.select("Prefix").distinct().show()



+------+
|Prefix|
+------+
|     G|
+------+



                                                                                

In [39]:
prod_30_df_int = convert_string_col_to_int(prod_30_df, ["STORE_CODE", "PROD_CODE_30"])
prod_30_df_final = convert_string_col_to_date(prod_30_df_int, ["SHOP_DATE"])
prod_30_df_final = prod_30_df_final.drop("Prefix")
prod_30_df_final.show()
prod_30_df_final.printSchema()

[Stage 49:>                                                         (0 + 1) / 1]

+----------+----------+------------+---------+-----------+
| SHOP_DATE|STORE_CODE|PROD_CODE_30|TOTAL_QTY|TOTAL_SPEND|
+----------+----------+------------+---------+-----------+
|2006-04-10|         1|           2|      1.0|       0.01|
|2006-04-10|         1|           3|      5.0|        2.2|
|2006-04-10|         1|           6|      1.0|       2.58|
|2006-04-10|         1|          10|      3.0|       1.02|
|2006-04-10|         1|          15|      4.0|       5.26|
|2006-04-10|         2|           3|      1.0|       2.05|
|2006-04-10|         2|          11|      1.0|       0.41|
|2006-04-10|         2|          16|      3.0|       3.64|
|2006-04-10|         2|          21|      1.0|        1.4|
|2006-04-10|         2|          23|      1.0|       0.31|
|2006-04-10|         3|           2|      2.0|       2.11|
|2006-04-10|         3|           3|      2.0|        2.2|
|2006-04-10|         3|           4|     20.0|      23.56|
|2006-04-10|         3|           7|     47.0|      67.4

                                                                                

In [40]:
# Extract the prefixes from PROD_CODE_30
prod_40_df = prod_40_df.withColumn("Prefix", substring(col("PROD_CODE_40"), 1, 1))
prod_40_df.select("Prefix").distinct().show()



+------+
|Prefix|
+------+
|     D|
+------+



                                                                                

In [41]:
prod_40_df_int = convert_string_col_to_int(prod_40_df, ["STORE_CODE", "PROD_CODE_40"])
prod_40_df_final = convert_string_col_to_date(prod_40_df_int, ["SHOP_DATE"])
prod_40_df_final = prod_40_df_final.drop("Prefix")
prod_40_df_final.show()
prod_40_df_final.printSchema()

[Stage 55:>                                                         (0 + 1) / 1]

+----------+----------+------------+---------+-----------+
| SHOP_DATE|STORE_CODE|PROD_CODE_40|TOTAL_QTY|TOTAL_SPEND|
+----------+----------+------------+---------+-----------+
|2006-12-19|        27|           2|     40.0|      58.04|
|2006-12-18|        30|           1|      7.0|       8.51|
|2006-12-21|        65|           4|      4.0|       7.92|
|2006-12-18|       327|           2|     39.0|      59.88|
|2006-12-22|       327|           2|     52.0|      77.29|
|2006-12-19|       337|           1|      4.0|       4.88|
|2006-12-23|       352|           3|      7.0|       5.06|
|2006-12-18|       377|           2|     84.0|     114.26|
|2006-12-24|       377|           4|      2.0|       4.18|
|2006-12-19|       468|           1|      2.0|       2.25|
|2006-12-18|       500|           9|      1.0|       1.35|
|2006-12-23|       508|           1|      5.0|       2.16|
|2006-12-24|       560|           3|     16.0|       11.8|
|2006-12-19|       574|           1|     23.0|      16.8

                                                                                

In [43]:
prod_40_df_final.select("PROD_CODE_40").distinct().count()b

                                                                                

9

In [46]:
store_format_df_final.createOrReplaceTempView("STORE_FORMAT_DATA")

In [48]:
store_format_df_agg = spark.sql("SELECT SHOP_DATE, STORE_FORMAT, SUM(TOTAL_QTY) AS TOTAL_QTY, ROUND(SUM(TOTAL_SPEND), 2) AS TOTAL_SPEND FROM STORE_FORMAT_DATA GROUP BY SHOP_DATE, STORE_FORMAT")

In [49]:
store_format_df_agg.printSchema()

root
 |-- SHOP_DATE: date (nullable = true)
 |-- STORE_FORMAT: string (nullable = true)
 |-- TOTAL_QTY: double (nullable = true)
 |-- TOTAL_SPEND: double (nullable = true)



In [50]:
store_format_df_agg.show()



+----------+------------+---------+-----------+
| SHOP_DATE|STORE_FORMAT|TOTAL_QTY|TOTAL_SPEND|
+----------+------------+---------+-----------+
|2007-08-31|          MS|  13711.0|   16807.61|
|2007-08-29|          SS|   5023.0|    5868.81|
|2007-03-02|          LS|  39974.0|   49780.98|
|2007-04-22|          MS|  13154.0|   15747.25|
|2007-06-09|          MS|  12476.0|   15635.42|
|2008-07-05|         XLS|   5499.0|    6929.79|
|2008-06-09|          SS|   4272.0|    5131.97|
|2007-04-05|          LS|  36783.0|   47767.08|
|2007-04-05|          SS|   3711.0|    4599.49|
|2007-02-24|         XLS|   5288.0|    6753.42|
|2006-06-17|         XLS|   4660.0|    6294.41|
|2007-08-14|          MS|  12126.0|   14373.81|
|2007-09-21|          SS|   4289.0|    5050.13|
|2006-10-20|          MS|  11456.0|   14161.85|
|2006-10-10|          SS|   3039.0|    3426.32|
|2006-08-15|          SS|   3579.0|     4264.1|
|2006-08-11|          MS|  10874.0|   13729.17|
|2007-10-14|          SS|   3995.0|    4

                                                                                

In [53]:
store_format_df_agg.write.parquet("hdfs://localhost:9000/transactions/store_format_agg.parquet")

                                                                                