# Retail Data Wrangling and Analytics

In [0]:
# Import modules 
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

# Load CSV into Dataframe
Alternatively, the LGS IT team also dumped the transactional data into a [CSV file](https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/python_data_wrangling/data/online_retail_II.csv). However, the CSV header (column names) doesn't follow the snakecase or camelcase naming convention (e.g. `Customer ID` instead of `customer_id` or `CustomerID`). As a result, you will need to use Pandas to clean up the data before doing any analytics. In addition, unlike the PSQL scheme, CSV files do not have data types associated. Therefore, you will need to cast/convert certain columns into correct data types (e.g. DateTime, numbers, etc..)

**Data Preperation**

- Read the `data/online_retail_II.csv` file into a DataFrame
- Rename all columns to upper camelcase or snakecase
- Convert/cast all columns to the appropriate data types (e.g. datetime)

In [0]:
retail_df = spark.read.table("workspace.default.online_retail_ii")
retail_df.show()


+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 557112|    23243|SET OF TEA COFFEE...|       1|2011-06-16 16:31:00| 4.95|       NULL|United Kingdom|
| 557112|    23245|SET OF 3 REGENCY ...|       4|2011-06-16 16:31:00|10.79|       NULL|United Kingdom|
| 557112|    23251|VINTAGE RED ENAME...|       2|2011-06-16 16:31:00| 2.46|       NULL|United Kingdom|
| 557112|    23256|CHILDRENS CUTLERY...|       1|2011-06-16 16:31:00| 8.29|       NULL|United Kingdom|
| 557112|    23298|      SPOTTY BUNTING|       2|2011-06-16 16:31:00|10.79|       NULL|United Kingdom|
| 557112|    23300|GARDENERS KNEELIN...|       2|2011-06-16 16:31:00| 3.29|       NULL|United Kingdom|
| 557112|    23301|GARDENERS KNEELIN...|       5|2011-06-16 16:31:00| 3.2

# Total Invoice Amount Distribution

In [0]:
from pyspark.sql.functions import col

total_invoice_df = retail_df.groupBy("Invoice").count()
display(total_invoice_df)

print("Dataframe statistics")
print("Min: " + str(total_invoice_df.agg({'count': 'min'}).collect()[0][0]))
print("Max: " + str(total_invoice_df.agg({'count': 'max'}).collect()[0][0]))
print("Median: " + str(total_invoice_df.agg({'count': 'median'}).collect()[0][0]))
print("Mode: " + str(total_invoice_df.agg({'count': 'mode'}).collect()[0][0]))
print("Mean: " + str(total_invoice_df.agg({'count': 'mean'}).collect()[0][0]))

eighty_fifth_quantile_val = total_invoice_df.approxQuantile("count", [0.85], 0.01)
quantile_filtered_total_invoice_df = total_invoice_df.filter((col("count") <= eighty_fifth_quantile_val[0]))

print("Dataframe statistics filtered by removing high outliers (85th percentile and above)")
print("Min: " + str(quantile_filtered_total_invoice_df.agg({'count': 'min'}).collect()[0][0]))
print("Max: " + str(quantile_filtered_total_invoice_df.agg({'count': 'max'}).collect()[0][0]))
print("Median: " + str(quantile_filtered_total_invoice_df.agg({'count': 'median'}).collect()[0][0]))
print("Mode: " + str(quantile_filtered_total_invoice_df.agg({'count': 'mode'}).collect()[0][0]))
print("Mean: " + str(quantile_filtered_total_invoice_df.agg({'count': 'mean'}).collect()[0][0]))


Invoice,count
490298,40
491055,9
491969,548
494345,21
495102,83
C495103,1
497027,53
500571,1
502228,34
509340,6


Dataframe statistics
Min: 1
Max: 1350
Median: 9.0
Mode: 1
Mean: 19.90324084433505
Dataframe statistics filtered by removing high outliers (85th percentile and above)
Min: 1
Max: 33
Median: 6.0
Mode: 1
Mean: 9.406085766020404


# Monthly Placed and Canceled Orders

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import date_format

yyyymm_df = retail_df.withColumn("yyyymm", date_format(col("InvoiceDate"), "yyyyMM"))

monthly_cancelled_df = yyyymm_df.filter(col("invoice").contains("C")) \
    .groupBy(col("yyyymm")) \
    .count() \
    .orderBy("yyyymm") \
    .withColumnRenamed("count", "cancelled")

monthly_placed_df = yyyymm_df.filter(~col("invoice").contains("C")) \
    .groupBy(col("yyyymm")) \
    .count() \
    .orderBy("yyyymm") \
    .withColumnRenamed("count", "placed")

monthly_placed_cancelled_df = monthly_placed_df.join(monthly_cancelled_df, "yyyymm", "left") \
    .withColumn("total_placed", col("placed") - 2*col("cancelled"))

monthly_placed_cancelled_df.show()

+------+------+---------+------------+
|yyyymm|placed|cancelled|total_placed|
+------+------+---------+------------+
|201004| 33431|      626|       32179|
|201002| 28812|      576|       27660|
|201008| 32733|      573|       31587|
|201105| 36409|      621|       35167|
|201001| 30869|      686|       29497|
|200912| 44213|     1015|       42183|
|201106| 36163|      711|       34741|
|201010| 58057|     1041|       55975|
|201003| 40667|      844|       38979|
|201103| 36049|      699|       34651|
|201011| 76821|     1194|       74433|
|201101| 34446|      701|       33044|
|201006| 39190|      793|       37604|
|201104| 29357|      559|       28239|
|201005| 34340|      983|       32374|
|201102| 27232|      475|       26282|
|201012| 63947|     1057|       61833|
|201007| 32649|      734|       31181|
|201009| 41279|      812|       39655|
|201107| 38833|      685|       37463|
+------+------+---------+------------+
only showing top 20 rows


# Monthly Sales

In [0]:
yyyymm_df = yyyymm_df.withColumn("total_sales", col("Quantity") * col("Price"))

monthly_sales_df = yyyymm_df.groupBy("yyyymm").agg({"total_sales": "sum"}).orderBy("yyyymm").select(col("yyyymm"), col("sum(total_sales)").alias("sales"))

monthly_sales_df.show()

+------+------------------+
|yyyymm|             sales|
+------+------------------+
|200912| 799847.1100000143|
|201001| 624032.8919999956|
|201002| 533091.4260000042|
|201003| 765848.7609999765|
|201004| 590580.4319999823|
|201005| 615322.8300000005|
|201006| 679786.6099999842|
|201007|  575236.359999999|
|201008| 656776.3399999854|
|201009| 853650.4309999745|
|201010|1045168.3499998983|
|201011|1422654.6419998251|
|201012|1126445.4699999166|
|201101| 560000.2600000234|
|201102| 498062.6500000268|
|201103| 683267.0800000189|
|201104| 493207.1210000249|
|201105|   723333.51000001|
|201106| 691123.1200000157|
|201107| 681300.1110000301|
+------+------------------+
only showing top 20 rows


# Monthly Sales Growth


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

window = Window.orderBy("yyyymm")

monthly_sales_df = monthly_sales_df.withColumn("percent_sales_growth",
                                               ((col("sales") - lag("sales").over(window))/(col("sales")))*100)
monthly_sales_df.show()



+------+------------------+--------------------+
|yyyymm|             sales|percent_sales_growth|
+------+------------------+--------------------+
|200912| 799847.1100000143|                NULL|
|201001| 624032.8919999956| -28.173870360670023|
|201002| 533091.4260000042| -17.059262551334054|
|201003| 765848.7609999765|   30.39207567510572|
|201004| 590580.4319999823| -29.677300415534162|
|201005| 615322.8300000005|   4.021043392785895|
|201006| 679786.6099999842|   9.482943478393185|
|201007|  575236.359999999|  -18.17518106817609|
|201008| 656776.3399999854|  12.415182313051682|
|201009| 853650.4309999745|  23.062612499283674|
|201010|1045168.3499998983|    18.3241215637598|
|201011|1422654.6419998251|  26.533937391108108|
|201012|1126445.4699999166| -26.295917546717146|
|201101| 560000.2600000234| -101.15088339420942|
|201102| 498062.6500000268| -12.435706632487555|
|201103| 683267.0800000189|  27.105715381456253|
|201104| 493207.1210000249| -38.535526132434825|
|201105|   723333.51

# Monthly Active Users

In [0]:
unique_customer_df = yyyymm_df.dropDuplicates(["yyyymm", "Customer ID"])

monthly_active_df = unique_customer_df.groupBy("yyyymm").agg({"*": "count"}).orderBy("yyyymm").show()

+------+--------+
|yyyymm|count(1)|
+------+--------+
|200912|    1046|
|201001|     787|
|201002|     808|
|201003|    1112|
|201004|     999|
|201005|    1063|
|201006|    1096|
|201007|     989|
|201008|     965|
|201009|    1203|
|201010|    1578|
|201011|    1684|
|201012|     949|
|201101|     784|
|201102|     799|
|201103|    1021|
|201104|     900|
|201105|    1080|
|201106|    1052|
|201107|     994|
+------+--------+
only showing top 20 rows


# New and Existing Users



In [0]:
first_purchase_month_df = yyyymm_df.groupBy("Customer ID").agg({"YYYYMM": "min"}).withColumnRenamed("min(YYYYMM)", "first_purchase_month").orderBy("Customer ID")

first_purchase_month_df = yyyymm_df.join(first_purchase_month_df, "Customer ID", "inner")

new_users_df = first_purchase_month_df.filter(col("YYYYMM") == col("first_purchase_month")).dropDuplicates(["Customer ID", "YYYYMM"]).groupBy("YYYYMM").agg({"*": "count"}).withColumnRenamed("count(1)", "new_users").orderBy("YYYYMM")
ex_users_df = first_purchase_month_df.filter(col("YYYYMM") != col("first_purchase_month")).dropDuplicates(["Customer ID", "YYYYMM"]).groupBy("YYYYMM").agg({"*": "count"}).withColumnRenamed("count(1)", "ex_users").orderBy("YYYYMM")

new_ex_users_df = new_users_df.join(ex_users_df, "YYYYMM", "left").orderBy("YYYYMM")
new_ex_users_df.show()

+------+---------+--------+
|YYYYMM|new_users|ex_users|
+------+---------+--------+
|200912|     1045|    NULL|
|201001|      394|     392|
|201002|      363|     444|
|201003|      436|     675|
|201004|      291|     707|
|201005|      254|     808|
|201006|      269|     826|
|201007|      183|     805|
|201008|      158|     806|
|201009|      242|     960|
|201010|      379|    1198|
|201011|      322|    1361|
|201012|       77|     871|
|201101|       71|     712|
|201102|      123|     675|
|201103|      178|     842|
|201104|      105|     794|
|201105|      108|     971|
|201106|      108|     943|
|201107|      102|     891|
+------+---------+--------+
only showing top 20 rows


## Finding RFM

RFM is a method used for analyzing customer value. It is commonly used in database marketing and direct marketing and has received particular attention in the retail and professional services industries. ([wikipedia](https://en.wikipedia.org/wiki/RFM_(market_research)))

RFM stands for three dimensions:

- Recency – How recently did the customer purchase?

- Frequency – How often do they purchase?

- Monetary Value – How much do they spend?

In [0]:
from pyspark.sql.functions import max, datediff, lit, expr, ntile, col
from pyspark.sql.window import Window

date_end = retail_df.select(col("InvoiceDate")).agg({"InvoiceDate": "max"}).collect()[0][0]

df_x = yyyymm_df.groupBy("Customer ID").agg(
    expr("sum(total_sales) as total_sales"),
    expr(f"datediff('{date_end}', max(InvoiceDate)) as days_since_last_purchase")
)
df_x.show()

df_y = yyyymm_df.groupBy("Customer ID", "Invoice").agg(
    {"total_sales": "sum"}
)
df_y.show()

df_z = df_y.groupBy("Customer ID").agg(
    {"sum(total_sales)": "count"}
)
df_z.show()

rfm_table = df_x.join(df_z, "Customer ID", "inner")
rfm_table = rfm_table.withColumnsRenamed({"days_since_last_purchase": "Recency",
                              "count(sum(total_sales))": "Frequency",
                              "total_sales": "Monetary"})
rfm_table.show()

window_recency = Window.orderBy('Recency')
df_recency = rfm_table.withColumn(
    'RecencyScore', 
    ntile(5).over(window_recency)
)

window_frequency = Window.orderBy('Frequency')
df_frequency = df_recency.withColumn(
    'FrequencyScore',
    ntile(5).over(window_frequency)
)

window_monetary = Window.orderBy('Monetary')
df_monetary = df_frequency.withColumn(
    'MonetaryScore',
    ntile(5).over(window_monetary)
)

df_rfm = df_monetary.withColumn(
    'RecencyScore', 
    6 - col('RecencyScore')
)

df_rfm.select("Customer ID", "RecencyScore", "FrequencyScore", "MonetaryScore").show()

+-----------+------------------+------------------------+
|Customer ID|       total_sales|days_since_last_purchase|
+-----------+------------------+------------------------+
|    18071.0|1428.6299999999999|                     257|
|    13641.0| 921.8200000000002|                     640|
|    16575.0|            -10.49|                     682|
|    15512.0|331.58000000000004|                     155|
|    17838.0|6239.5499999999965|                      26|
|    12782.0| 4526.039999999998|                       4|
|    12408.0| 4279.639999999999|                      32|
|    14364.0|3717.3500000000004|                     108|
|    13112.0|             -5.44|                     541|
|    17328.0|             104.7|                     673|
|    16547.0|111.71000000000001|                     546|
|    13617.0|1263.8000000000002|                      40|
|    13172.0|              69.6|                     400|
|    16471.0|1186.1800000000007|                     274|
|    14835.0| 

# RFM Segmentation

In [0]:
from pyspark.sql.functions import col, when, concat, mean, count

df_rfm = df_rfm.withColumn(
    'Segment', 
    concat(col('RecencyScore').cast('string'), col('FrequencyScore').cast('string'))
)

df_rfm = df_rfm.withColumn(
    'Segment', 
    when(col('Segment').rlike(r'[1-2][1-2]'), 'Hibernating')
    .when(col('Segment').rlike(r'[1-2][3-4]'), 'At Risk')
    .when(col('Segment').rlike(r'[1-2]5'), 'Can\'t Lose')
    .when(col('Segment').rlike(r'3[1-2]'), 'About to Sleep')
    .when(col('Segment').rlike(r'33'), 'Need Attention')
    .when(col('Segment').rlike(r'[3-4][4-5]'), 'Loyal Customers')
    .when(col('Segment').rlike(r'41'), 'Promising')
    .when(col('Segment').rlike(r'51'), 'New Customers')
    .when(col('Segment').rlike(r'[4-5][2-3]'), 'Potential Loyalists')
    .when(col('Segment').rlike(r'5[4-5]'), 'Champions')
    .otherwise('Unknown')
)

agg_df = df_rfm.groupBy("Segment").agg(
    mean("RecencyScore").alias("Mean_Recency"),
    mean("FrequencyScore").alias("Mean_Frequency"),
    mean("MonetaryScore").alias("Mean_Monetary"),
    count("Customer ID").alias("Customer_Count")
)

agg_df.show()



+-------------------+------------------+------------------+------------------+--------------+
|            Segment|      Mean_Recency|    Mean_Frequency|     Mean_Monetary|Customer_Count|
+-------------------+------------------+------------------+------------------+--------------+
|        Hibernating|1.4085526315789474| 1.499342105263158|1.6736842105263159|          1520|
|            At Risk|1.6443569553805775|3.4173228346456694|  3.02755905511811|           762|
|         Can't Lose|1.8085106382978724|               5.0| 4.361702127659575|            94|
|     Need Attention|               3.0|               3.0| 2.937984496124031|           258|
|    Loyal Customers|3.5281569965870307| 4.454778156996587| 4.238907849829351|          1172|
|Potential Loyalists| 4.441798941798941| 2.642857142857143|2.8214285714285716|           756|
|      New Customers|               5.0|               1.0| 1.619047619047619|            63|
|     About to Sleep|               3.0|1.4244031830238726|1