# Retail Data Wrangling and Analytics

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

fileName = "dbfs:/FileStore/tables/online_retail_II.csv"

retail_schema = StructType([
    StructField("invoice_no", StringType(), True),
    StructField("stock_code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("invoice_date", TimestampType(), True),
    StructField("unit_price", FloatType(), True),
    StructField("customer_id", FloatType(), True),
    StructField("country", StringType(), True)
])

retail_df = (spark.read
             .option("sep", ",")
             .option("header", "true")
             .schema(retail_schema)
             .csv(fileName)
            )

retail_df.cache()

In [0]:
retail_df.show(100, False)

# Load CSV into Dataframe
Alternatively, the LGS IT team also dumped the transactional data into a [CSV file](https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/python_data_wrangling/data/online_retail_II.csv). However, the CSV header (column names) doesn't follow the snakecase or camelcase naming convention (e.g. `Customer ID` instead of `customer_id` or `CustomerID`). As a result, you will need to use Pandas to clean up the data before doing any analytics. In addition, unlike the PSQL scheme, CSV files do not have data types associated. Therefore, you will need to cast/convert certain columns into correct data types (e.g. DateTime, numbers, etc..)

**Data Preperation**

- Read the `data/online_retail_II.csv` file into a DataFrame
- Rename all columns to upper camelcase or snakecase
- Convert/cast all columns to the appropriate data types (e.g. datetime)

# Total Invoice Amount Distribution

In [0]:
retail_df = retail_df.withColumn("invoice_amount", col("quantity") * col("unit_price"))

invoice_df = (retail_df.select(col("invoice_no"), col("invoice_amount"))
              .groupBy("invoice_no")
              .agg(sum("invoice_amount").alias("invoice_amount"))
              .where(col("invoice_amount") > 0)
             )

display(invoice_df)

invoice_no,invoice_amount
489677,192.0
491045,303.20000076293945
491658,155.05999848246574
493542,118.75
493977,275.9499988555908
494244,6711.0
494277,1335.9200439453125
495185,2507.059980869293
495783,48.959999084472656
496171,199.29999351501465


In [0]:
#Minimum, maximum, and mean
invoice_dist = (invoice_df.select(min("invoice_amount").alias("min"),
                      max("invoice_amount").alias("max"),
                      avg("invoice_amount").alias("mean")
                      )
               )

#Median
middle_index = int((invoice_df.count() - 1) / 2)

median = (invoice_df.orderBy("invoice_amount")
          .rdd.zipWithIndex().toDF()
          .select(col("_1").invoice_amount.alias("median"))
          .where(col("_2") == middle_index)
         )

#Mode
mode = (invoice_df.groupBy("invoice_amount")
        .agg(count("invoice_amount").alias("count"))
       )
max_count = mode.select(max("count")).collect()[0][0]
mode = mode.select("invoice_amount").where(col("count") == max_count)

#Add median and mode values into the invoice_dist DF
invoice_dist = (invoice_dist.withColumn("median", lit(median.collect()[0][0]))
                .withColumn("mode", lit(mode.collect()[0][0]))
               )

display(invoice_dist)

min,max,mean,median,mode
0.1899999976158142,168469.59375,523.3037593975993,304.3099985718727,15.0


# Monthly Placed and Canceled Orders

In [0]:
from datetime import date, datetime

#Add YYYYMM column to simplify groupBy
udf_yyyymm = udf(lambda date: date.strftime('%Y%m'), StringType())
retail_df = retail_df.withColumn("yyyymm", udf_yyyymm(col("invoice_date")))

#Compute monthly cancelled orders
monthly_cancelled = retail_df.selectExpr("invoice_no", "yyyymm AS month").where(col("invoice_no").startswith('C')).groupBy("month").agg(countDistinct("invoice_no").alias("cancellations"))

#Monthly placements join expression and base table
monthly_placements = retail_df.selectExpr("invoice_no", "yyyymm AS month").groupBy("month").agg(countDistinct("invoice_no").alias("placements"))
join_expr = monthly_cancelled["month"] == monthly_placements["month"]

#Join and display results
display(monthly_placements.join(monthly_cancelled, join_expr).select(monthly_placements["month"], expr("(placements - 2*cancellations) AS placements"), col("cancellations")).orderBy("month"))

month,placements,cancellations
200912,1528,401
201001,1033,300
201002,1489,240
201003,1553,407
201004,1284,304
201005,1604,407
201006,1502,357
201007,1329,344
201008,1331,273
201009,1633,371


# Monthly Sales

In [0]:
monthly_sales_df = retail_df.groupBy(col("yyyymm").alias("month")).agg(sum("invoice_amount").alias("sales")).orderBy("yyyymm")
display(monthly_sales_df)

month,sales
200912,799847.1075055711
201001,624032.8896515106
201002,533091.424455221
201003,765848.757825097
201004,590580.4300854017
201005,615322.8286303803
201006,679786.6077747606
201007,575236.3596651554
201008,656776.3393343091
201009,853650.4293782803


# Monthly Sales Growth

In [0]:
from pyspark.sql import Row

#Can collect to driver because there are only 24 rows (months) of data to process in this case
sales_list = monthly_sales_df.collect()
diff_sales = []

for i in range(1, len(sales_list)):
    year = sales_list[i][0]
    diff = (sales_list[i][1] - sales_list[i-1][1]) / sales_list[i-1][1]
    diff_sales.append(Row(year, diff))

sales_schema = StructType([
    StructField("month", StringType(), True),
    StructField("sales_growth", FloatType(), True)
])

display(spark.createDataFrame(diff_sales, sales_schema))

month,sales_growth
201001,-0.21980979
201002,-0.14573185
201003,0.43661803
201004,-0.22885501
201005,0.041895054
201006,0.10476416
201007,-0.15379862
201008,0.1417504
201009,0.2997582
201010,0.22435169


# Monthly Active Users

In [0]:
display(retail_df.groupBy(col("yyyymm").alias("month")).agg(countDistinct("customer_id").alias("active_users")).orderBy("month"))

month,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


# Monthly New and Existing Users

In [0]:
customer_start = (retail_df.groupBy("customer_id")
                  .agg(min("yyyymm").alias("month"))
                  .groupBy("month")
                  .agg(count("customer_id").alias("new_customers"))
                 )

total_customers = (retail_df.groupBy("yyyymm")
                     .agg(countDistinct("customer_id").alias("total_customers")))

join_expr = customer_start["month"] == total_customers["yyyymm"]

#Existing customers are total distinct customers - new ones
display(customer_start.join(total_customers, join_expr)
        .selectExpr("month", "new_customers", "(total_customers - new_customers) AS existing_customers")
        .orderBy("month"))

month,new_customers,existing_customers
200912,1045,0
201001,394,392
201002,363,444
201003,436,675
201004,291,707
201005,254,808
201006,269,826
201007,183,805
201008,158,806
201009,242,960


# Recency, Frequency, & Monetary Customer Analysis (RFM)

In [0]:
#Compute recency metric
r_df = (retail_df.groupBy("customer_id")
             .agg(max("invoice_date").alias("most_recent"))
             .select(col("customer_id"), datediff(current_date(), col("most_recent")).alias("recency"))
            )

#Compute frequency and monetary metrics
fm_df = retail_df.groupBy("customer_id").agg(countDistinct("invoice_no").alias("frequency"), sum("invoice_amount").alias("monetary"))

#Join recency and frequency/monetary tables and display result
rfm_df = r_df.join(fm_df, r_df["customer_id"] == fm_df["customer_id"]).select(r_df["customer_id"], col("recency"), col("frequency"), col("monetary"))
display(rfm_df)

customer_id,recency,frequency,monetary
12467.0,4062,2,0.0
12636.0,4414,1,141.0
12707.0,3967,1,603.4199981689453
12714.0,3685,12,14093.709978342056
12805.0,4057,4,1434.9500012397766
12998.0,4168,4,897.3799979686737
13159.0,3684,5,1379.7500064373016
13268.0,3693,15,3167.639995932579
13796.0,4175,1,289.7000045776367
13836.0,4075,3,602.5199961662292


# RFM Segmentation

In [0]:
def segment(num_records, id):
    if id < (num_records*0.2):
        return "1"
    elif id < (num_records*0.4):
        return "2"
    elif id < (num_records*0.6):
        return "3"
    elif id < (num_records*0.8):
        return "4"
    return "5"

def label(r_val, f_val):
    if r_val in ["1", "2"]:
        if f_val in ["1", "2"]:
            return "Hibernating"
        elif f_val in ["3", "4"]:
            return "At Risk"
        else:
            return "Can\'t Lose"
    elif r_val == "3":
        if f_val in ["1", "2"]:
            return "About to Sleep"
        if f_val == "3":
            return "Need Attention"
        else:
            return "Loyal Customers"
    elif r_val == "4":
        if f_val == "1":
            return "Promising"
        elif f_val in ["2", "3"]:
            return "Potential Loyalists"
        else:
            return "Loyal Customers"
    else:
        if f_val == "1":
            return "New Customers"
        elif f_val in ["2", "3"]:
            return "Potential Loyalists"
        else:
            return "Champions"
        
num_records = rfm_df.count()

udf_segment = udf(lambda id: segment(num_records, id), StringType())
udf_label = udf(lambda r_val, f_val: label(r_val, f_val), StringType())

In [0]:
recency_scores = (rfm_df.selectExpr("customer_id AS customer_id_r", "recency")
                   .orderBy(col("recency").desc())
                   .rdd.zipWithIndex().toDF()
                   .select(col("_1.*"), col("_2").alias("id"))
                   .withColumn("recency_score", udf_segment(col("id")))
                   .select("customer_id_r", "recency_score")
                  )

frequency_scores = (rfm_df.selectExpr("customer_id AS customer_id_f", "frequency")
                   .orderBy("frequency")
                   .rdd.zipWithIndex().toDF()
                   .select(col("_1.*"), col("_2").alias("id"))
                   .withColumn("frequency_score", udf_segment(col("id")))
                   .select("customer_id_f", "frequency_score")
                  )

rfm_labelled_df = (rfm_df.join(recency_scores, rfm_df["customer_id"] == recency_scores["customer_id_r"])
         .join(frequency_scores, rfm_df["customer_id"] == frequency_scores["customer_id_f"])
         .select(col("customer_id"), col("recency"), col("frequency"), col("monetary"), col("recency_score"), col("frequency_score"))
         .withColumn("segment", udf_label(col("recency_score"), col("frequency_score")))
         .groupBy("segment").agg(count("*").alias("count"), avg("recency").alias("mean_recency"),
          avg("frequency").alias("mean_frequency"), avg("monetary").alias("mean_monetary"))
         )

display(rfm_labelled_df)

segment,count,mean_recency,mean_frequency,mean_monetary
Champions,844,3683.3376777251183,23.62085308056872,10597.26181989184
Promising,115,3713.3217391304343,1.0,318.2868691910868
At Risk,755,4049.8225165562912,4.703311258278146,1177.8356536732506
About to Sleep,385,3783.231168831169,1.4597402597402598,484.3731994204217
Hibernating,1539,4141.365172189734,1.3398310591293048,337.2794683281067
Potential Loyalists,723,3700.298755186722,2.947441217150761,877.1983384129644
Loyal Customers,1164,3741.810996563574,11.974226804123711,3990.282684775956
Need Attention,281,3787.430604982206,3.697508896797153,1069.5375823861057
New Customers,53,3685.867924528302,1.0,339.50396216562336
Can't Lose,83,3998.78313253012,17.93975903614458,5726.993476494919
