# Retail Data Wrangling and Analytics

# Load CSV into Dataframe
Alternatively, the LGS IT team also dumped the transactional data into a [CSV file](https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/python_data_wrangling/data/online_retail_II.csv). However, the CSV header (column names) doesn't follow the snakecase or camelcase naming convention (e.g. `Customer ID` instead of `customer_id` or `CustomerID`). As a result, you will need to use Pandas to clean up the data before doing any analytics. In addition, unlike the PSQL scheme, CSV files do not have data types associated. Therefore, you will need to cast/convert certain columns into correct data types (e.g. DateTime, numbers, etc..)

**Data Preperation**

- Read the `data/online_retail_II.csv` file into a DataFrame
- Rename all columns to upper camelcase or snakecase
- Convert/cast all columns to the appropriate data types (e.g. datetime)

In [0]:
retail_df = spark.sql("SELECT * FROM retail")
retail_df = retail_df.na.drop()
retail_df.cache()
display(retail_df)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085,United Kingdom


# Total Invoice Amount Distribution

In [0]:
# Write your code in one or more cells (please remove this line from your notebook)
from pyspark.sql.functions import *

retail_filtered_df = (retail_df
  .filter(col("quantity") > 0)
  .cache())

invoice_amount_df = (retail_filtered_df
  .groupBy("invoice_no")
  .sum("quantity")
  .withColumnRenamed("sum(quantity)", "invoice_amount")
)

partial_df = (retail_filtered_df
  .limit(int(retail_filtered_df.count() * 0.85))
  .cache())
 
partial_amount_df = (partial_df
  .groupBy("invoice_no")
  .sum("quantity")
  .withColumnRenamed("sum(quantity)", "invoice_amount")
  .cache()
)
# display(retail_filtered_df.where("quantity < 0"))

In [0]:
invoice_amount_df.select("invoice_amount").summary("min", "mean", "max").show()
partial_amount_df.select("invoice_amount").summary("min", "mean", "max").show()

# Monthly Placed and Canceled Orders

In [0]:
retail_df = (retail_df
  .withColumn("year", year(col("invoice_date")))
  .withColumn("month", month(col("invoice_date")))
  )

retail_filtered_df = (retail_df
  .filter(col("quantity") > 0)
  .cache())

In [0]:
cancelled_df = (retail_df
  .filter(col("quantity") < 0))

total_invoice_df = (retail_df
  .groupBy("year", "month")
  .count()
  .withColumnRenamed("count", "total_count")
  .orderBy("year", "month"))

cancelled_invoice_df = (cancelled_df
  .groupBy("year", "month")
  .count()
  .withColumnRenamed("count", "cancelled_count")
  .orderBy("year", "month"))

placed_invoice_df = (total_invoice_df
  .join(cancelled_invoice_df, ["year", "month"])
  .orderBy("year", "month"))

display(placed_invoice_df
  .withColumn("placed_count", col("total_count") - (col("cancelled_count") * 2)))

year,month,total_count,cancelled_count,placed_count
2009,12,31760,999,29762
2010,1,22439,661,21117
2010,2,23906,537,22832
2010,3,33114,812,31490
2010,4,27833,595,26643
2010,5,29604,960,27684
2010,6,31950,759,30432
2010,7,27746,713,26320
2010,8,26942,549,25844
2010,9,35386,784,33818


# Monthly Sales

In [0]:
monthly_sales = (retail_filtered_df
  .withColumn("sales", col("unit_price") * col("quantity"))
  .groupBy("year", "month")
  .sum("sales")
  .withColumnRenamed("sum(sales)", "sales")
  .orderBy("year", "month"))

display(monthly_sales)

year,month,sales
2009,12,686654.1599999949
2010,1,557319.0620000134
2010,2,506371.06600001536
2010,3,699608.9909999889
2010,4,594609.1919999977
2010,5,599985.7900000075
2010,6,639066.5800000058
2010,7,591636.7400000112
2010,8,604242.6499999989
2010,9,831615.0009999905


# Monthly Sales Growth

In [0]:
sales_list = (monthly_sales
  .select("sales")
  .collect())

In [0]:
growth = [0]
i = 1
while i <= 24:
  growth.append((sales_list[i]["sales"] - sales_list[i-1]["sales"]) / sales_list[i-1]["sales"])
  i += 1
print(growth)

# Monthly Active Users

In [0]:
display(retail_filtered_df
  .select(col("year"), col("month"), col("customer_id"))
  .dropDuplicates()
  .groupBy("year", "month")
  .count()
  .orderBy("year", "month"))

year,month,count
2009,12,955
2010,1,720
2010,2,774
2010,3,1057
2010,4,942
2010,5,966
2010,6,1041
2010,7,928
2010,8,911
2010,9,1145


# New and Existing Users

In [0]:
first_df = (retail_filtered_df
  .groupBy("customer_id")
  .agg(min("invoice_date"))
  .withColumnRenamed("min(invoice_date)", "first_ym")
  .withColumn("first_year", year(col("first_ym")))
  .withColumn("first_month", month(col("first_ym")))
  .drop("first_ym"))

retail_filtered_df = (retail_filtered_df
  .join(first_df, "customer_id"))

In [0]:
new_user_df = (retail_filtered_df
  .filter((col("year") == col("first_year")) & (col("month") == col("first_month")))
  .select("customer_id", "year", "month")
  .distinct()
  .groupBy("year", "month")
  .count()
  .orderBy("year", "month")
  .withColumnRenamed("count", "new_count"))

ex_user_df = (retail_filtered_df
  .filter((col("year") > col("first_year")) | (col("month") > col("first_month")))
  .select("customer_id", "year", "month")
  .distinct()
  .groupBy("year", "month")
  .count()
  .orderBy("year", "month")
  .withColumnRenamed("count", "ex_count"))

year,month,ex_count
2010,1,337
2010,2,398
2010,3,614
2010,4,648
2010,5,712
2010,6,771
2010,7,742
2010,8,749
2010,9,902
2010,10,1120


## Finding RFM

RFM is a method used for analyzing customer value. It is commonly used in database marketing and direct marketing and has received particular attention in the retail and professional services industries. ([wikipedia](https://en.wikipedia.org/wiki/RFM_(market_research)))

Optional Reading: [Making Your Database Pay Off Using Recency Frequency and Monetary Analysis](http://www.dbmarketing.com/2010/03/making-your-database-pay-off-using-recency-frequency-and-monetary-analysis/)


RFM stands for three dimensions:

- Recency – How recently did the customer purchase?

- Frequency – How often do they purchase?

- Monetary Value – How much do they spend?

Note: To simplify the problem, let's keep all placed and canceled orders.


**Sample RFM table**

![](https://i.imgur.com/sXFIg6u.jpg)

In [0]:
from pyspark.sql.types import *

In [0]:
display(retail_df
  .select("customer_id", "invoice_no")
  .distinct()
  .groupBy("customer_id")
  .count()
  .withColumnRenamed("count", "invoice")
  .orderBy("customer_id"))

customer_id,invoice
12346,17
12347,8
12348,5
12349,5
12350,1
12351,1
12352,13
12353,2
12354,1
12355,2


In [0]:
display(retail_df
  .withColumn("monetary", col("unit_price") * col("quantity"))
  .groupBy("customer_id")
  .sum("monetary")
  .withColumnRenamed("sum(monetary)", "monetary")
  .orderBy("customer_id"))

customer_id,monetary
12346,-64.67999999999981
12347,5633.320000000001
12348,2019.4
12349,4404.54
12350,334.40000000000003
12351,300.93
12352,1889.21
12353,406.75999999999993
12354,1079.4
12355,947.61


In [0]:
display(retail_df
  .withColumn("invoice_date", col("invoice_date").cast("long"))
  .groupBy("customer_id")
  .max("invoice_date"))

customer_id,max(invoice_date)
13623,1320840000
17679,1318923780
17389,1323423480
18051,1268661480
13289,1260964380
17753,1283356140
15727,1322051760
16574,1317303540
14832,1269011700
15447,1294917960


In [0]:
display(retail_df
  .withColumn("invoice_date", col("invoice_date").cast("long"))
  .groupBy("customer_id")
  .max("invoice_date")
  .withColumnRenamed("max(invoice_date)", "invoice_date")
  .withColumn("current_date", current_timestamp().cast("long"))
  .withColumn("recency", col("current_date") - col("invoice_date"))
  .withColumn("recency", round(col("recency")/(24*3600)))
  .orderBy("customer_id"))

customer_id,invoice_date,current_date,recency
12346,1295345820,1624464305,3809.0
12347,1323273120,1624464305,3486.0
12348,1316956380,1624464305,3559.0
12349,1321869060,1624464305,3502.0
12350,1296662460,1624464305,3794.0
12351,1291044180,1624464305,3859.0
12352,1320331020,1624464305,3520.0
12353,1305827220,1624464305,3688.0
12354,1303391460,1624464305,3716.0
12355,1304948940,1624464305,3698.0
