In [1]:
!kaggle datasets download -d mashlyn/online-retail-ii-uci -p data/online_retail --unzip

Dataset URL: https://www.kaggle.com/datasets/mashlyn/online-retail-ii-uci
License(s): CC0-1.0


In [4]:
import pandas as pd

df = pd.read_csv("data/online_retail/online_retail_II.csv")
df

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01 07:45:00,2.10,13085.0,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085.0,United Kingdom
...,...,...,...,...,...,...,...,...
1067366,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,2011-12-09 12:50:00,2.10,12680.0,France
1067367,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,2011-12-09 12:50:00,4.15,12680.0,France
1067368,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,2011-12-09 12:50:00,4.15,12680.0,France
1067369,581587,22138,BAKING SET 9 PIECE RETROSPOT,3,2011-12-09 12:50:00,4.95,12680.0,France


### Historical CLV definition for `online_retail_II`

In this notebook we compute **historical CLV**, not predictive CLV.

We use the invoice-level data:

- `Quantity`
- `Price`
- `InvoiceDate`
- `Invoice`
- `CustomerID`
- `Country`

and define:

1. **Order revenue** (per invoice)

For each invoice $j$ of customer $i$:
$$
\text{order\_revenue}_{ij}
= \sum_{\text{lines } \ell \in j} \text{Quantity}_{\ell} \times \text{Price}_{\ell}
$$
2. **Historical CLV** (total revenue over the observation window 2009–2011)

$$
\text{clv\_hist}_i
= \sum_{\text{orders } j \text{ of customer } i}
\text{order\_revenue}_{ij}
$$

3. **Customer lifetime**

Let:

- $\text{first\_order\_date}_i$ = first purchase date of customer $i$
- $\text{last\_order\_date}_i$  = last purchase date of customer $i$

Then:

$$
\text{lifetime\_days}_i
= \text{last\_order\_date}_i - \text{first\_order\_date}_i + 1
$$

Approximate months:

$$
\text{lifetime\_months}_i
= \max\left(\frac{\text{lifetime\_days}_i}{30},\; 1\right)
$$

4. **Average monthly value**

$$
\text{avg\_monthly\_value}_i
= \frac{\text{clv\_hist}_i}{\text{lifetime\_months}_i}
$$

So in this section, **CLV = historical value** based on all observed orders.  
Later, we can combine this with churn / survival models to move towards **predictive CLV**.


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("CLVOnlineRetail")
    .config(
        "spark.jars.packages",
        "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.37.0"
    )
    .getOrCreate()
)


25/12/01 19:18:27 WARN Utils: Your hostname, Ryojis-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.14 instead (on interface en0)
25/12/01 19:18:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/ryoji/.ivy2/cache
The jars for the packages stored in: /Users/ryoji/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-40f24b8a-8751-4ea6-91eb-410df3f22198;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/miniconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.37.0 in central
downloading https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.37.0/spark-bigquery-with-dependencies_2.12-0.37.0.jar ...
	[SUCCESSFUL ] com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.37.0!spark-bigquery-with-dependencies_2.12.jar (943ms)
:: resolution report :: resolve 16029ms :: artifacts dl 944ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.37.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1   |   1   |
	---------------------------------------------------------------------
:: retrieving 

25/12/01 19:18:59 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
spark.conf.set("spark.bigquery.project", "able-balm-454718-n8")
spark.conf.set("spark.bigquery.dataset", "ryoji_demos")
# Optional: temporary GCS bucket for large reads/writes
# spark.conf.set("temporaryGcsBucket", "your-temp-bucket")


In [3]:
table_fqn = "able-balm-454718-n8.ryoji_demos.online_retail_ii"

df = (
    spark.read.format("bigquery")
    .option("table", table_fqn)
    .load()
)

df.printSchema()
df.show(5, truncate=False)


root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)



                                                                                

+-------+---------+-----------------------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|Description                        |Quantity|InvoiceDate        |Price|Customer ID|Country       |
+-------+---------+-----------------------------------+--------+-------------------+-----+-----------+--------------+
|489434 |85048    |15CM CHRISTMAS GLASS BALL 20 LIGHTS|12      |2009-12-01 08:45:00|6.95 |13085.0    |United Kingdom|
|489434 |79323P   |PINK CHERRY LIGHTS                 |12      |2009-12-01 08:45:00|6.75 |13085.0    |United Kingdom|
|489434 |79323W   | WHITE CHERRY LIGHTS               |12      |2009-12-01 08:45:00|6.75 |13085.0    |United Kingdom|
|489434 |22041    |RECORD FRAME 7" SINGLE SIZE        |48      |2009-12-01 08:45:00|2.1  |13085.0    |United Kingdom|
|489434 |21232    |STRAWBERRY CERAMIC TRINKET BOX     |24      |2009-12-01 08:45:00|1.25 |13085.0    |United Kingdom|
+-------+---------+-----------------------------------+-

In [4]:
# Clean + compute line_amount in PySpark

from pyspark.sql import functions as F

# Rename Customer ID -> CustomerID, and ensure types
df_clean = (
    df
    .withColumnRenamed("Customer ID", "CustomerID")
    .withColumn("InvoiceDate", F.to_timestamp("InvoiceDate"))
    .withColumn("Quantity", F.col("Quantity").cast("double"))
    .withColumn("Price", F.col("Price").cast("double"))
    .withColumn("CustomerID", F.col("CustomerID").cast("double"))  # may be float in source
)

# Filter: non-null customer, positive quantity/price, drop cancellations (Invoice starts with 'C')
df_clean = (
    df_clean
    .filter(F.col("CustomerID").isNotNull())
    .filter(F.col("Quantity") != 0)
    .filter(F.col("Price") > 0)
    .filter(~F.col("Invoice").cast("string").startswith("C"))
)

# Line amount
df_clean = df_clean.withColumn("line_amount", F.col("Quantity") * F.col("Price"))


In [5]:
# Aggregate to invoice (order) level in PySpark

orders = (
    df_clean
    .groupBy("CustomerID", "Country", "Invoice")
    .agg(
        F.min("InvoiceDate").alias("invoice_date"),
        F.sum("line_amount").alias("order_revenue")
    )
)

# Use date only for consistency
orders = orders.withColumn("invoice_date", F.to_date("invoice_date"))

orders.show(5, truncate=False)


[Stage 1:>                                                          (0 + 1) / 1]

+----------+--------------+-------+------------+------------------+
|CustomerID|Country       |Invoice|invoice_date|order_revenue     |
+----------+--------------+-------+------------+------------------+
|17865.0   |United Kingdom|490689 |2009-12-07  |307.91            |
|15912.0   |United Kingdom|491196 |2009-12-10  |165.5             |
|14527.0   |United Kingdom|491750 |2009-12-14  |572.05            |
|12613.0   |Germany       |491805 |2009-12-14  |1350.8100000000002|
|15499.0   |United Kingdom|492319 |2009-12-16  |161.6             |
+----------+--------------+-------+------------+------------------+
only showing top 5 rows



                                                                                

In [6]:
# Customer-level CLV & cohorts in PySpark

customer_summary = (
    orders
    .groupBy("CustomerID", "Country")
    .agg(
        F.count("*").alias("n_orders"),
        F.sum("order_revenue").alias("total_revenue"),
        F.min("invoice_date").alias("first_order_date"),
        F.max("invoice_date").alias("last_order_date"),
    )
)

# lifetime_days = last - first + 1
customer_summary = customer_summary.withColumn(
    "lifetime_days",
    F.datediff("last_order_date", "first_order_date") + F.lit(1)
)

# lifetime_months ~ days / 30
customer_summary = customer_summary.withColumn(
    "lifetime_months",
    F.when(F.col("lifetime_days") <= 0, F.lit(1.0))
     .otherwise(F.col("lifetime_days") / F.lit(30.0))
)

# Historical CLV = total_revenue, and avg monthly value
customer_summary = (
    customer_summary
    .withColumn("clv_hist", F.col("total_revenue"))
    .withColumn("avg_monthly_value", F.col("total_revenue") / F.col("lifetime_months"))
)

customer_summary.show(10, truncate=False)


[Stage 4:>                                                          (0 + 1) / 1]

+----------+--------------+--------+------------------+----------------+---------------+-------------+------------------+------------------+------------------+
|CustomerID|Country       |n_orders|total_revenue     |first_order_date|last_order_date|lifetime_days|lifetime_months   |clv_hist          |avg_monthly_value |
+----------+--------------+--------+------------------+----------------+---------------+-------------+------------------+------------------+------------------+
|15224.0   |United Kingdom|5       |871.3599999999999 |2010-01-15      |2010-12-16     |336          |11.2              |871.3599999999999 |77.8              |
|12689.0   |France        |2       |1247.6100000000004|2011-08-04      |2011-11-14     |103          |3.433333333333333 |1247.6100000000004|363.3815533980584 |
|17428.0   |United Kingdom|45      |31819.76000000001 |2009-12-01      |2011-12-09     |739          |24.633333333333333|31819.76000000001 |1291.7358592692833|
|12684.0   |France        |7       |2283

                                                                                

In [7]:
# Cohort month

customer_summary = customer_summary.withColumn(
    "cohort_month",
    F.date_trunc("month", "first_order_date")
)

customer_summary.select(
    "CustomerID",
    "Country",
    "cohort_month",
    "clv_hist",
    "avg_monthly_value",
    "n_orders",
    "first_order_date",
    "last_order_date",
    "lifetime_days",
    "lifetime_months"
).show(10, truncate=False)


[Stage 10:>                                                         (0 + 1) / 1]

+----------+--------------+-------------------+------------------+------------------+--------+----------------+---------------+-------------+------------------+
|CustomerID|Country       |cohort_month       |clv_hist          |avg_monthly_value |n_orders|first_order_date|last_order_date|lifetime_days|lifetime_months   |
+----------+--------------+-------------------+------------------+------------------+--------+----------------+---------------+-------------+------------------+
|15224.0   |United Kingdom|2010-01-01 00:00:00|871.3599999999999 |77.8              |5       |2010-01-15      |2010-12-16     |336          |11.2              |
|12689.0   |France        |2011-08-01 00:00:00|1247.6100000000004|363.3815533980584 |2       |2011-08-04      |2011-11-14     |103          |3.433333333333333 |
|17428.0   |United Kingdom|2009-12-01 00:00:00|31819.76000000001 |1291.7358592692833|45      |2009-12-01      |2011-12-09     |739          |24.633333333333333|
|12684.0   |France        |2011-05

                                                                                

In [8]:
# Cohort-level CLV summaries in PySpark

cohort_stats = (
    customer_summary
    .groupBy("cohort_month", "Country")
    .agg(
        F.countDistinct("CustomerID").alias("n_customers"),
        F.avg("clv_hist").alias("avg_clv_hist"),
        F.avg("avg_monthly_value").alias("avg_monthly_value")
    )
    .orderBy("cohort_month", "Country")
)

cohort_stats.show(20, truncate=False)


[Stage 16:>                                                         (0 + 1) / 1]

+-------------------+---------------+-----------+------------------+------------------+
|cohort_month       |Country        |n_customers|avg_clv_hist      |avg_monthly_value |
+-------------------+---------------+-----------+------------------+------------------+
|2009-12-01 00:00:00|Australia      |2          |1706.3250000000003|80.7750030487149  |
|2009-12-01 00:00:00|Austria        |2          |2373.135          |130.1057883036936 |
|2009-12-01 00:00:00|Belgium        |2          |5767.389999999999 |235.94512793299828|
|2009-12-01 00:00:00|Channel Islands|1          |2605.62           |258.83642384105957|
|2009-12-01 00:00:00|Cyprus         |3          |5068.703333333334 |514.7399877094712 |
|2009-12-01 00:00:00|Denmark        |2          |24010.595         |5816.214163688874 |
|2009-12-01 00:00:00|EIRE           |2          |304959.5          |12466.656670007795|
|2009-12-01 00:00:00|Finland        |1          |6070.040000000001 |253.27009735744093|
|2009-12-01 00:00:00|France     

                                                                                

In [9]:
# Top CLV customers:

top_clv = (
    customer_summary
    .orderBy(F.col("clv_hist").desc())
    .select("CustomerID", "Country", "clv_hist", "avg_monthly_value",
            "n_orders", "first_order_date", "last_order_date", "lifetime_months")
    .limit(20)
)

top_clv.show(truncate=False)




[Stage 31:>                                                         (0 + 1) / 1]

+----------+--------------+------------------+------------------+--------+----------------+---------------+------------------+
|CustomerID|Country       |clv_hist          |avg_monthly_value |n_orders|first_order_date|last_order_date|lifetime_months   |
+----------+--------------+------------------+------------------+--------+----------------+---------------+------------------+
|18102.0   |United Kingdom|608821.65         |24715.357916102843|145     |2009-12-01      |2011-12-09     |24.633333333333333|
|14646.0   |Netherlands   |528602.5199999999 |21517.063229308   |151     |2009-12-02      |2011-12-08     |24.566666666666666|
|14156.0   |EIRE          |313946.37000000005|12901.905616438358|156     |2009-12-01      |2011-11-30     |24.333333333333332|
|14911.0   |EIRE          |295972.6299999999 |12031.407723577231|398     |2009-12-01      |2011-12-08     |24.6              |
|17450.0   |United Kingdom|246973.08999999994|17190.70232018561 |51      |2010-09-27      |2011-12-01     |14.3

                                                                                