# Using spark to analyse retail data

The Dataset used in this example can be found on Kaggle (https://www.kaggle.com/ilkeryildiz/online-retail-listing), if you found it interesting consider upvoting it.

## Loading the Dataset into a Spark DataFrame

### We need to load the 'Price' columns as string, because of the decimals being separated by commas. We will adress this in the following blocks.

In [61]:
import pyspark as spark
from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},   'name': 'Invoice',   'nullable': False,   'type': 'integer'},
{'metadata': {},   'name': 'StockCode',   'nullable': False,   'type': 'string'},
{'metadata': {},   'name': 'Description',   'nullable': False,   'type': 'string'},
{'metadata': {},   'name': 'Quantity',   'nullable': False,   'type': 'integer'},
{'metadata': {},   'name': 'InvoiceDate',   'nullable': False,   'type': 'string'},
{'metadata': {},   'name': 'Price',   'nullable': False,   'type': 'string'},
{'metadata': {},   'name': 'Customer_ID',   'nullable': False,   'type': 'integer'},
{'metadata': {},   'name': 'Country',   'nullable': False,   'type': 'string'}, ],'type': 'struct'})
df = spark.read.option("delimiter", ";").schema(schema).format("csv").options(header="true").load("online_retail_listing.csv")
df.show()

+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer_ID|       Country|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|1.12.2009 07:45| 6,95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|1.12.2009 07:45| 6,75|      13085|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|1.12.2009 07:45|  2,1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|1.12.2009 07:45| 1,25|      13085|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|1.12.2009 07:45| 1,65|      13085|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|1.12.2009 07:45| 1,25|      13085|United Kingdom|
| 489434| 

## Now, let's clean the data
### Changing Price from string to float and replacing the commas with points

In [62]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType

df = df.withColumn('Price', regexp_replace('Price', ',', '.'))
df = df.withColumn('Price', df['Price'].cast("float"))
df.createOrReplaceTempView("retail")
df.show()

+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|    InvoiceDate|Price|Customer_ID|       Country|
+-------+---------+--------------------+--------+---------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|1.12.2009 07:45| 6.95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|1.12.2009 07:45| 6.75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|1.12.2009 07:45| 6.75|      13085|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|1.12.2009 07:45|  2.1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|1.12.2009 07:45| 1.25|      13085|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|1.12.2009 07:45| 1.65|      13085|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|1.12.2009 07:45| 1.25|      13085|United Kingdom|
| 489434| 

## Now it is time to analyse the data
### Which are the 10 most sold products by total quantity?

In [59]:

spark.sql('SELECT StockCode,  SUM(Quantity) FROM retail GROUP BY StockCode ORDER BY SUM(Quantity) DESC LIMIT 10').show()

+---------+-------------+
|StockCode|sum(Quantity)|
+---------+-------------+
|    84077|       107472|
|   85123A|        95415|
|   85099B|        95290|
|    21212|        95136|
|    84879|        80007|
|    22197|        74828|
|    17003|        70423|
|    21977|        56366|
|    84991|        54271|
|    22492|        45164|
+---------+-------------+



### Which are the 10 most sold products by total price spent?

In [45]:
spark.sql('SELECT StockCode,  SUM(Quantity * Price) AS TotalSpent FROM retail GROUP BY StockCode ORDER BY SUM(Quantity * Price) DESC LIMIT 10').show()

+---------+------------------+
|StockCode|        TotalSpent|
+---------+------------------+
|    22423| 322643.1459131241|
|      DOT| 304979.9305805862|
|   85123A| 251944.6897907257|
|   85099B|180281.33941817284|
|    47566|147361.55929517746|
|    84879|130242.39337158203|
|    22086|116439.35060596466|
|     POST|110219.20999413729|
|    79321| 83145.64911365509|
|    22197| 76239.60187804699|
+---------+------------------+



### Who are the top 10 purchasers (by total spent) taking only in consideration the 10 most sold products (by quantity)?

In [60]:
# first the StockCode from the 10 most sold products
most_sold = 'SELECT StockCode FROM retail GROUP BY StockCode LIMIT 10'


spark.sql(f'SELECT Customer_ID,  SUM(Quantity * Price) AS TotalSpent \
            FROM retail WHERE StockCode IN ({most_sold}) GROUP BY Customer_ID ORDER BY SUM(Quantity * Price) DESC LIMIT 10').show()
# spark.sql(f'SELECT * \
#             FROM retail WHERE StockCode IN ({most_sold})').show()

+-----------+------------------+
|Customer_ID|        TotalSpent|
+-----------+------------------+
|       null| 6826.059998869896|
|      17450| 2617.199951171875|
|      17381|1623.7599487304688|
|      14911|1413.5499572753906|
|      14646|1045.0199890136719|
|      17404|             900.0|
|      18109| 713.9999675750732|
|      16656| 573.1999778747559|
|      13089|505.09997177124023|
|      13090| 454.6999797821045|
+-----------+------------------+



### Which customers bought more products (only regarding quantity)?

In [63]:
spark.sql(f'SELECT Customer_ID,  SUM(Quantity) AS TotalItemsPurchased \
            FROM retail GROUP BY Customer_ID ORDER BY SUM(Quantity) DESC LIMIT 10').show()


+-----------+-------------------+
|Customer_ID|TotalItemsPurchased|
+-----------+-------------------+
|      14646|             357262|
|       null|             355706|
|      13902|             218090|
|      18102|             184412|
|      13694|             183002|
|      14156|             163910|
|      14911|             140184|
|      17511|             110698|
|      14298|             100176|
|      16684|              96363|
+-----------+-------------------+



Which customers made more purchases?

In [66]:
spark.sql(f'SELECT Customer_ID,  COUNT(*) AS TotalPurchases \
            FROM retail GROUP BY Customer_ID ORDER BY COUNT(*) DESC LIMIT 10').show()

+-----------+--------------+
|Customer_ID|TotalPurchases|
+-----------+--------------+
|       null|        236682|
|      17841|         12780|
|      14911|         11328|
|      12748|          7100|
|      14606|          6608|
|      15311|          4664|
|      14096|          4598|
|      14156|          4130|
|      14646|          3804|
|      13089|          3365|
+-----------+--------------+

