In [2]:
import pandas as pd
import numpy as np
 
# identify name of xlsx file (which will change when uploaded)
xlsx_filename = "Online Retail.xlsx"
 
# schema of the excel spreadsheet data range
orders_schema = {
  'InvoiceNo':str,
  'StockCode':str,
  'Description':str,
  'Quantity':np.int64,
  'InvoiceDate':np.datetime64,
  'UnitPrice':np.float64,
  'CustomerID':str,
  'Country':str  
  }
 
# read spreadsheet to pandas dataframe
# the xlrd library must be installed for this step to work 
orders_pd = pd.read_excel(
  xlsx_filename, 
  sheet_name='Online Retail',
  header=0, # first row is header
  dtype=orders_schema
  )
 
# display first few rows from the dataset
orders_pd.head(10)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
5,536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850,United Kingdom
6,536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850,United Kingdom
7,536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850,United Kingdom
8,536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850,United Kingdom
9,536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047,United Kingdom


In [3]:
# calculate sales amount as quantity * unit price
orders_pd['SalesAmount'] = orders_pd['Quantity'] * orders_pd['UnitPrice']
 
orders_pd.head(10)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,SalesAmount
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom,15.3
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom,22.0
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
5,536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850,United Kingdom,15.3
6,536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850,United Kingdom,25.5
7,536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850,United Kingdom,11.1
8,536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850,United Kingdom,11.1
9,536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047,United Kingdom,54.08


In [4]:
import lifetimes
 
# set the last transaction date as the end point for this historical dataset
current_date = orders_pd['InvoiceDate'].max()
 
# calculate the required customer metrics
metrics_pd = (
  lifetimes.utils.summary_data_from_transaction_data(
    orders_pd,
    customer_id_col='CustomerID',
    datetime_col='InvoiceDate',
    observation_period_end = current_date, 
    freq='D',
    monetary_value_col='SalesAmount'  # use sales amount to determine monetary value
    )
  )
 
# display first few rows
metrics_pd.head(10)

Unnamed: 0_level_0,frequency,recency,T,monetary_value
CustomerID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
12346,0.0,0.0,325.0,0.0
12347,6.0,365.0,367.0,599.701667
12348,3.0,283.0,358.0,301.48
12349,0.0,0.0,18.0,0.0
12350,0.0,0.0,310.0,0.0
12352,6.0,260.0,296.0,208.151667
12353,0.0,0.0,204.0,0.0
12354,0.0,0.0,232.0,0.0
12355,0.0,0.0,214.0,0.0
12356,2.0,303.0,325.0,269.905


In [5]:
# summary data from lifetimes
metrics_pd.describe()

Unnamed: 0,frequency,recency,T,monetary_value
count,4372.0,4372.0,4372.0,4372.0
mean,3.413541,133.72301,225.304209,213.254316
std,6.674343,133.000474,118.384168,372.810217
min,0.0,0.0,0.0,-3528.34
25%,0.0,0.0,115.0,0.0
50%,1.0,98.0,253.0,148.955
75%,4.0,256.0,331.0,304.9425
max,145.0,373.0,373.0,8866.081538


In [6]:
from datetime import timedelta
 
# set the last transaction date as the end point for this historical dataset
current_date = orders_pd['InvoiceDate'].max()
 
# define end of calibration period
holdout_days = 90
calibration_end_date = current_date - timedelta(days = holdout_days)
 
# calculate the required customer metrics
metrics_cal_pd = (
  lifetimes.utils.calibration_and_holdout_data(
    orders_pd,
    customer_id_col='CustomerID',
    datetime_col='InvoiceDate',
    observation_period_end = current_date,
    calibration_period_end=calibration_end_date,
    freq='D',
    monetary_value_col='SalesAmount'  # use sales amount to determine monetary value
    )
  )
 
# display first few rows
metrics_cal_pd.head(10)

Unnamed: 0_level_0,frequency_cal,recency_cal,T_cal,monetary_value_cal,frequency_holdout,monetary_value_holdout,duration_holdout
CustomerID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
12346,0.0,0.0,235.0,0.0,0.0,0.0,90.0
12347,4.0,238.0,277.0,519.7675,2.0,26.192069,90.0
12348,2.0,110.0,268.0,297.22,1.0,103.333333,90.0
12350,0.0,0.0,220.0,0.0,0.0,0.0,90.0
12352,3.0,34.0,206.0,101.56,3.0,20.09,90.0
12353,0.0,0.0,114.0,0.0,0.0,0.0,90.0
12354,0.0,0.0,142.0,0.0,0.0,0.0,90.0
12355,0.0,0.0,124.0,0.0,0.0,0.0,90.0
12356,1.0,80.0,235.0,481.46,1.0,29.175,90.0
12358,0.0,0.0,60.0,0.0,1.0,97.6,90.0


In [7]:
# summary data from lifetimes
metrics_cal_pd.describe()

Unnamed: 0,frequency_cal,recency_cal,T_cal,monetary_value_cal,frequency_holdout,monetary_value_holdout,duration_holdout
count,3412.0,3412.0,3412.0,3412.0,3412.0,3412.0,3412.0
mean,2.677608,90.587046,185.041618,190.242725,1.502345,17.999563,90.0
std,5.222838,96.077761,80.771943,362.064653,2.495318,77.381388,0.0
min,0.0,0.0,1.0,-1462.5,0.0,-114.0,90.0
25%,0.0,0.0,125.0,0.0,0.0,0.0,90.0
50%,1.0,59.5,197.0,111.0,1.0,6.286336,90.0
75%,3.0,175.0,268.0,276.768,2.0,19.131176,90.0
max,93.0,282.0,283.0,7860.345833,52.0,2685.0,90.0


In [8]:
# summary data from lifetimes
metrics_cal_pd.describe()

Unnamed: 0,frequency_cal,recency_cal,T_cal,monetary_value_cal,frequency_holdout,monetary_value_holdout,duration_holdout
count,3412.0,3412.0,3412.0,3412.0,3412.0,3412.0,3412.0
mean,2.677608,90.587046,185.041618,190.242725,1.502345,17.999563,90.0
std,5.222838,96.077761,80.771943,362.064653,2.495318,77.381388,0.0
min,0.0,0.0,1.0,-1462.5,0.0,-114.0,90.0
25%,0.0,0.0,125.0,0.0,0.0,0.0,90.0
50%,1.0,59.5,197.0,111.0,1.0,6.286336,90.0
75%,3.0,175.0,268.0,276.768,2.0,19.131176,90.0
max,93.0,282.0,283.0,7860.345833,52.0,2685.0,90.0


In [28]:
filtered.corr('frequency', 'monetary_value')

ValueError: method must be either 'pearson', 'spearman', 'kendall', or a callable, 'frequency' was supplied

In [29]:
# remove customers with no repeats (complete dataset)
filtered = metrics_pd[metrics_pd['frequency'] > 0]
 
## remove customers with no repeats in calibration period
filtered_cal = metrics_cal_pd[metrics_cal_pd['frequency_cal'] > 0]

In [30]:
# exclude dates with negative totals (see note above) 
filtered = filtered.where(filtered.monetary_value > 0)
filtered_cal = filtered_cal.where(filtered_cal.monetary_value_cal > 0)

In [31]:
from hyperopt import hp, fmin, tpe, rand, SparkTrials, STATUS_OK, space_eval
 
from lifetimes.fitters.gamma_gamma_fitter import GammaGammaFitter
 
# define search space
search_space = hp.uniform('l2', 0.0, 1.0)
 
# evaluation function
def score_model(actuals, predicted, metric='mse'):
  # make sure metric name is lower case
  metric = metric.lower()
  
  # Mean Squared Error and Root Mean Squared Error
  if metric=='mse' or metric=='rmse':
    val = np.sum(np.square(actuals-predicted))/actuals.shape[0]
    if metric=='rmse':
        val = np.sqrt(val)
  
  # Mean Absolute Error
  elif metric=='mae':
    np.sum(np.abs(actuals-predicted))/actuals.shape[0]
  
  else:
    val = None
  
  return val
 
# define function for model training and evaluation
def evaluate_model(param):
  
  # accesss replicated input_pd dataframe
  data = inputs.value
  
  # retrieve incoming parameters
  l2_reg = param
  
  # instantiate and configure the model
  model = GammaGammaFitter(penalizer_coef=l2_reg)
  
  # fit the model
  model.fit(data['frequency_cal'], data['monetary_value_cal'])
  
  # evaluate the model
  monetary_actual = data['monetary_value_holdout']
  monetary_predicted = model.conditional_expected_average_profit(data['frequency_holdout'], data['monetary_value_holdout'])
  mse = score_model(monetary_actual, monetary_predicted, 'mse')
  
  # return score and status
  return {'loss': mse, 'status': STATUS_OK}

In [45]:
import spark
import pyspark
import pyspark
# configure hyperopt settings to distribute to all executors on workers
spark_trials = SparkTrials(parallelism=2)
 
# select optimization algorithm
algo = tpe.suggest
 
# replicate input_pd dataframe to workers in Spark cluster
input_pd = filtered_cal.where(filtered_cal.monetary_value_cal > 0).toPandas()
inputs = sc.broadcast(input_pd)
 
# perform hyperparameter tuning (logging iterations to mlflow)
argmin = fmin(
  fn=evaluate_model,
  space=search_space,
  algo=algo,
  max_evals=100,
  trials=spark_trials
  )
 
# release the broadcast dataset
inputs.unpersist()

Exception: SparkTrials cannot import pyspark classes.  Make sure that PySpark is available in your environment.  E.g., try running 'import pyspark'

In [43]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
     -------------------------------------- 281.4/281.4 MB 2.4 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
     ------------------------------------- 199.7/199.7 kB 12.6 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=221f2ef60a7b3cc72ed0ce38212ec8b0e202cba26dfdb9baadaeb1a8869dcee8
  Stored in directory: c:\users\vivek ketha\appdata\local\pip\cache\wheels\51\c8\18\298a4ced8ebb3ab8a7d26a7198c0cc7035abb906bde94a4c4b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installa

In [36]:
# # get hyperparameter setting
l2_reg = space_eval(search_space, argmin)
 
# # instantiate and configure model
spend_model = GammaGammaFitter(penalizer_coef=l2_reg)
 
# fit the model
spend_model.fit(input_pd['frequency_cal'], input_pd['monetary_value_cal'])

TypeError: space_eval() missing 1 required positional argument: 'hp_assignment'

In [12]:
# evaluate the model

input_pd = filtered_cal

monetary_actual = input_pd['monetary_value_holdout']
monetary_predicted = spend_model.conditional_expected_average_profit(input_pd['frequency_holdout'], input_pd['monetary_value_holdout'])
mse = score_model(monetary_actual, monetary_predicted, 'mse')
 
print('MSE: {0}'.format(mse))

NameError: name 'spend_model' is not defined

In [41]:
# evaluate the model
monetary_actual = input_pd['monetary_value_holdout']
monetary_predicted = spend_model.conditional_expected_average_profit(input_pd['frequency_holdout'], input_pd['monetary_value_holdout'])
mse = score_model(monetary_actual, monetary_predicted, 'mse')

NameError: name 'spend_model' is not defined