## Instantiate a SparkSession

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

spark = SparkSession.builder.appName("MyApp").master("local[*]").getOrCreate()

# check if works
dftest = spark.range(100)
print(dftest.count())

100


## Load dataset

In [7]:
# load dataset
df = spark.read.option("delimiter", ",").option("header", True).csv("../data/data.csv")
df.printSchema()
print("Cardinality:", df.count())

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)

Cardinality: 541909


## EDA

In [8]:
print("Any null records?")
(df.select(
        [
            sf.round((sf.sum(sf.col(col).isNull().cast("int"))/sf.lit(df.count())*100), 2)
            .alias(f"{col} [%]")
            for col in df.columns
        ]
    )
    .show()
)

Any null records?
+-------------+-------------+---------------+------------+---------------+-------------+--------------+-----------+
|InvoiceNo [%]|StockCode [%]|Description [%]|Quantity [%]|InvoiceDate [%]|UnitPrice [%]|CustomerID [%]|Country [%]|
+-------------+-------------+---------------+------------+---------------+-------------+--------------+-----------+
|          0.0|          0.0|           0.27|         0.0|            0.0|          0.0|         24.93|        0.0|
+-------------+-------------+---------------+------------+---------------+-------------+--------------+-----------+



                                                                                

In [9]:
print("Distribution of the two continuous attributes")
df.select(["UnitPrice", "Quantity"]).summary().show()

# back_transactions are those with negative Quantity
df.select("*").filter(sf.col("Quantity") < 0).createOrReplaceTempView("back_transactions")

print("Distribution of the negative Quantity")
back_transactions = spark.table("back_transactions").select(sf.col("Quantity")).summary().show()

print("Distribution of the negative UnitPrice")
df.select("UnitPrice").filter(sf.col("UnitPrice") < 0).summary().show()

print("Any negative UnitPrice for back_transactions?")
spark.sql("SELECT COUNT(UnitPrice) FROM back_transactions WHERE UnitPrice < 0").show()

Distribution of the two continuous attributes


                                                                                

+-------+------------------+------------------+
|summary|         UnitPrice|          Quantity|
+-------+------------------+------------------+
|  count|            541909|            541909|
|   mean|4.6111136260897085|  9.55224954743324|
| stddev| 96.75985306117963|218.08115785023438|
|    min|         -11062.06|                -1|
|    25%|              1.25|               1.0|
|    50%|              2.08|               3.0|
|    75%|              4.13|              10.0|
|    max|             99.96|               992|
+-------+------------------+------------------+

Distribution of the negative Quantity
+-------+------------------+
|summary|          Quantity|
+-------+------------------+
|  count|             10624|
|   mean|-45.60721009036145|
| stddev|   1092.2142164236|
|    min|                -1|
|    25%|             -10.0|
|    50%|              -2.0|
|    75%|              -1.0|
|    max|              -990|
+-------+------------------+

Distribution of the negative UnitPri

In [5]:
print("Ghost customers (i.e. null CustomerID) distribution of Quantity and UnitPrice")
(df.select(["UnitPrice", "Quantity", "CustomerID"])
    .filter(sf.col("CustomerID").isNull() == True)
    .select(["UnitPrice", "Quantity"])
    .summary()
    .show()
)

Ghost customers (i.e. null CustomerID) distribution of Quantity and UnitPrice




+-------+-----------------+------------------+
|summary|        UnitPrice|          Quantity|
+-------+-----------------+------------------+
|  count|           135080|            135080|
|   mean|8.076576917382749|1.9955729937814628|
| stddev|151.9008162787955| 66.69615267858345|
|    min|        -11062.06|                -1|
|    25%|             1.63|               1.0|
|    50%|             3.29|               1.0|
|    75%|              5.4|               3.0|
|    max|            99.96|                99|
+-------+-----------------+------------------+



                                                                                

## Data engineering
* Drop rows with negative UnitPrice, no interpretation available.
* Create new attributes out of the original dataset
  1. Revenue = Quantity*UnitPrice

In [12]:
clean_df = df.filter(sf.col("UnitPrice") >= 0)
print("Cardinality:", clean_df.count())

Cardinality: 541907


In [13]:
clean_df = clean_df.withColumn("Revenue", df.UnitPrice * df.Quantity)
tot_revenue = clean_df.agg(sf.sum("Revenue")).collect()[0][0]
print("Global revenues, including back transactions:",tot_revenue)

Global revenues, including back transactions: 9769872.053999126


In [23]:
print("How much revenue do ghost customers bring?")
clean_df.createOrReplaceTempView("clean_df")
spark.sql("SELECT 100.*SUM(CASE WHEN CustomerID IS NULL THEN Revenue ELSE 0 END)/SUM(Revenue) AS ghost_customers_revenues_share FROM clean_df").show()

print("Distribution of revenues from ghost customers")
(clean_df
    .select(["CustomerID", "Revenue"])
    .filter(sf.col("CustomerID").isNull())
    .select(["Revenue"])
    .summary()
    .show()
)

no_ghosts = clean_df.filter(sf.col("CustomerID").isNull() == False)

How much revenue do ghost customers bring?
+------------------------------+
|ghost_customers_revenues_share|
+------------------------------+
|             15.04427316833096|
+------------------------------+

Distribution of revenues from ghost customers
+-------+------------------+
|summary|           Revenue|
+-------+------------------+
|  count|            135078|
|   mean|10.881166733295247|
| stddev|152.11456887118285|
|    min|         -17836.46|
|    25%|              2.46|
|    50%|              4.96|
|    75%|             10.79|
|    max|          13541.33|
+-------+------------------+



In [31]:
no_ghosts.select("InvoiceDate").show()

+--------------+
|   InvoiceDate|
+--------------+
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:28|
|12/1/2010 8:28|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
|12/1/2010 8:34|
+--------------+
only showing top 20 rows



In [67]:
print("Parse InvoiceDate attribute from string to datetime")

import re
from datetime import datetime
from pyspark.sql import types as st

def parse_datetime(_dt: str):
    try:
        _dt = re.sub(r'(\d+)/(\d+)/(\d+) (\d+):(\d+)', lambda m: f"{int(m.group(1)):02d}/{int(m.group(2)):02d}/{m.group(3)} {int(m.group(4)):02d}:{m.group(5)}", _dt)
        return datetime.strptime(_dt, "%m/%d/%Y %H:%M")
    except ValueError:
        return None
parse_datetime_udf = sf.udf(parse_datetime, st.TimestampType())

no_ghosts = no_ghosts.withColumn("parsedInvoiceDate", parse_datetime_udf(sf.col("InvoiceDate")))
assert no_ghosts.filter(sf.col("parsedInvoiceDate").isNull()).count() == 0

Parse InvoiceDate attribute from string to datetime


                                                                                

In [None]:
days = 90

print(f"Evaluate Recency by {days} days")
recency_by_customer = (no_ghosts
                        .filter(sf.col("parsedInvoiceDate") >= sf.sub_date(sf.current_date(), days))
                        .groupBy("customerID")
                        .agg()
                    )