In [None]:
# 
# Build pLTV using GAP bigquery and Lifetimes ML library 
# Used calibration period for training & holdout period for validation without considering cohort transactions
# Script can be run on Compute Engine of GCP
#
# THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND
#
# By JeeWook Kim
#

In [1]:
%%bash
pip install --upgrade pip
pip install --upgrade google-api-python-client
pip install --upgrade gcloud
pip install lifetimes

Collecting pip
  Using cached https://files.pythonhosted.org/packages/0f/74/ecd13431bcc456ed390b44c8a6e917c1820365cbebcb6a8974d1cd045ab4/pip-10.0.1-py2.py3-none-any.whl
Installing collected packages: pip
  Found existing installation: pip 9.0.1
    Uninstalling pip-9.0.1:
      Successfully uninstalled pip-9.0.1
Successfully installed pip-10.0.1
Collecting google-api-python-client
  Downloading https://files.pythonhosted.org/packages/bd/40/bc3b4e7c7c65f9548024dde5c7bad60e0e078b2d2a0ee8c426a5639c2cc9/google_api_python_client-1.7.3-py2.py3-none-any.whl (55kB)
Collecting google-auth>=1.4.1 (from google-api-python-client)
  Downloading https://files.pythonhosted.org/packages/53/06/6e6d5bfa4d23ee40efd772d6b681a7afecd859a9176e564b8c329382370f/google_auth-1.5.0-py2.py3-none-any.whl (65kB)
Collecting google-auth-httplib2>=0.0.3 (from google-api-python-client)
  Downloading https://files.pythonhosted.org/packages/33/49/c814d6d438b823441552198f096fcd0377fd6c88714dbed34f1d3c8c4389/google_auth_htt

google-cloud-resource-manager 0.26.0 has requirement google-cloud-core<0.27dev,>=0.26.0, but you'll have google-cloud-core 0.27.1 which is incompatible.
grpcio 1.6.3 has requirement protobuf>=3.3.0, but you'll have protobuf 3.2.0 which is incompatible.
google-cloud-error-reporting 0.26.0 has requirement google-cloud-core<0.27dev,>=0.26.0, but you'll have google-cloud-core 0.27.1 which is incompatible.
google-cloud-monitoring 0.26.0 has requirement google-cloud-core<0.27dev,>=0.26.0, but you'll have google-cloud-core 0.27.1 which is incompatible.
google-cloud 0.27.0 has requirement google-cloud-core<0.27dev,>=0.26.0, but you'll have google-cloud-core 0.27.1 which is incompatible.
google-cloud 0.27.0 has requirement google-cloud-storage<1.4dev,>=1.3.0, but you'll have google-cloud-storage 1.4.0 which is incompatible.
google-cloud-vision 0.26.0 has requirement google-cloud-core<0.27dev,>=0.26.0, but you'll have google-cloud-core 0.27.1 which is incompatible.
google-cloud-logging 1.2.0 has

In [25]:


import datetime
from dateutil.relativedelta import relativedelta

today = datetime.date.today().strftime("%Y%m%d")
# calibration begin date => 12 months ago
# begin_date = (datetime.date.today() + relativedelta(months=-12)).strftime("%Y%m%d")
begin_date = '20160801'
# obserbation end date => 3 days ago
# end_date = (datetime.date.today() + relativedelta(days=-3)).strftime("%Y%m%d")
end_date = '20170801'
# calibration end date => 6 months ago
# calibration_end_date = (datetime.date.today() + relativedelta(months=-6)).strftime("%Y%m%d")
calibration_end_date = '20170201'
# Animals In Space table
# gap_table = 'google.com:bigquery-150208.90624960.ga_sessions_*'
# Googel Store demo table
gap_table = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'

print('# today: {}'.format(today))
print('# begin_date: {}'.format(begin_date))
print('# end_date: {}'.format(end_date))
print('# calibration_end_date: {}'.format(calibration_end_date))

# query to retrieve GAP exported BigQuery ecommerce tranactions (users with purchases in the calibration period)

sql_train = """ 
  SELECT
  date,
  obs_data.fullVisitorId as fullVisitorId,
  sum(totals.transactionRevenue) as transactionRevenue,
  sum(totals.transactions) as transactions
  FROM
  `"""+gap_table+"""` obs_data
  
  INNER JOIN 
     ( SELECT 
         fullVisitorId
       FROM `"""+gap_table+"""`
       WHERE
       (_TABLE_SUFFIX >= '"""+begin_date+"""' AND _TABLE_SUFFIX <= '"""+calibration_end_date+"""') AND
       totals.transactions IS NOT NULL
     ) cal_data ON cal_data.fullVisitorId = obs_data.fullVisitorId 
  WHERE
  (_TABLE_SUFFIX >= '"""+begin_date+"""' AND _TABLE_SUFFIX <= '"""+end_date+"""') AND
  totals.transactions IS NOT NULL
  GROUP BY 1,2
  ORDER BY date
"""

print ('# BigQuery SQL - training data')  
print (sql_train)

# execute the query using datalab lib
import google.datalab.bigquery as bq
# Pandas lib to handle table data
import pandas as pd
transaction_query = bq.Query(sql_train)
query_result = transaction_query.execute()

# copy the query result to pandas dataframe
transaction_query_data = query_result.result().to_dataframe()
transaction_query_data['transactionRevenue'] = transaction_query_data['transactionRevenue'] / 1000
transaction_query_data.head(50)

 
# datalab kernel needs to be python2 to be compatible with Lifetimes lib
from lifetimes.utils import calibration_and_holdout_data
# transform the query result dataframe into calibraion and holdout RFM data to be used lifetimes model
summary_cal_holdout = calibration_and_holdout_data(transaction_query_data, 'fullVisitorId', 'date',
                                        calibration_period_end=calibration_end_date,
                                        observation_period_end=end_date, monetary_value_col='transactionRevenue')   
print('# summary_cal_holdout')
print(summary_cal_holdout.head(50))

# train the lifetime BGF modle using the calibration data
from lifetimes.plotting import plot_calibration_purchases_vs_holdout_purchases
from lifetimes import BetaGeoFitter

# similar API to scikit-learn and lifelines.
bgf = BetaGeoFitter(penalizer_coef=0.0)
# adjust wheather to use returning users or all the users to train the model 
returned_summary_cal_holdout = summary_cal_holdout[summary_cal_holdout['frequency_cal']>=0]
bgf.fit(returned_summary_cal_holdout['frequency_cal'], returned_summary_cal_holdout['recency_cal'], returned_summary_cal_holdout['T_cal'])

print ('# BetaGeoFitter')
print(bgf)
# Save the BGF model to be used later
bgf.save_model('bgf_'+today+'.pkl')

# Validate the model by comparing the predicted frequecy with real frequecy data of holdout period
import numpy as np
vaildate_summary = returned_summary_cal_holdout.copy()
duration_holdout = vaildate_summary.iloc[0]['duration_holdout']
print("# duration_holdout: {}".format(duration_holdout))
# predict expected number of transactions
vaildate_summary['p_frequency'] = \
          vaildate_summary.apply(lambda r: bgf.conditional_expected_number_of_purchases_up_to_time(duration_holdout, \
          r['frequency_cal'], r['recency_cal'], r['T_cal']), axis=1)

print ('# vaildate_summary - model_predictions - expected_number_of_purchases')
print vaildate_summary['p_frequency'].head(50)
print ('# vaildate_summary - frequency_holdout - real data')
print vaildate_summary['frequency_holdout'].head(50)
from sklearn.metrics import mean_absolute_error
mae = mean_absolute_error(vaildate_summary['frequency_holdout'], vaildate_summary['p_frequency'])
print ("# mean absolute error regression loss: {}".format(mae))
from sklearn.metrics import mean_squared_error
mse = mean_squared_error(vaildate_summary['frequency_holdout'], vaildate_summary['p_frequency'])
print ("# mean squared error regression loss: {}".format(mse))

# Validate the model by calculating recall rate and precision rate
# predict p alive
vaildate_summary['p_alive'] = \
          vaildate_summary.apply(lambda r: bgf.conditional_probability_alive(duration_holdout, \
          r['frequency_cal'], r['recency_cal'], r['T_cal']), axis=1)

print ('# vaildate_summary - p_alive')
print vaildate_summary.head(100)
 
# Recall rate
a_purchasers = 0
p_purchasers = 0
for index, row in vaildate_summary.iterrows():
  if row['frequency_cal'] > 1 : # calculate recall rate only using the users with frequency > 1
    if row['frequency_holdout'] > 0 :  # actual purchaser
        a_purchasers = a_purchasers + 1
        if row['p_frequency'] >= 0.8 :  # predicted purchaser
          p_purchasers = p_purchasers + 1
recall_rate = float(p_purchasers)/a_purchasers
print ("# recall rate: {:.2f}, toal actual purchasers: {}, predicted purchasers out of actual purchasers: {}".
       format(recall_rate, a_purchasers, p_purchasers))

# Precision rate
a_purchasers = 0
p_purchasers = 0
for index, row in vaildate_summary.iterrows():
  if row['frequency_cal'] > 1 : # calculate recall rate only using the users with frequency > 1
    if row['p_frequency'] >= 0.8: # predicted purchaser
        p_purchasers = p_purchasers + 1
        if row['frequency_holdout'] > 0 :  # actual purchaser
          a_purchasers = a_purchasers + 1
precision_rate = float(a_purchasers)/p_purchasers
print ("# precision_rate: {:.2f}, total predicted purchasers: {}, actual purchasers out of predicted purchasers: {}".
       format(precision_rate, p_purchasers, a_purchasers))

import matplotlib.pyplot as plt
# Validate the model using lifetimes provided plot
#ax = plot_calibration_purchases_vs_holdout_purchases(bgf, summary_cal_holdout)
ax = plot_calibration_purchases_vs_holdout_purchases(bgf, returned_summary_cal_holdout)
ax.figure.savefig('calibration_purchases_vs_holdout_purchases_'+today+'.png')
plt.close(ax.figure)
# Plot predicted purchases vs. actual purchases in calibration period
from lifetimes.plotting import plot_period_transactions
ax2 = plot_period_transactions(bgf)
ax2.figure.savefig('plot_period_transactions_'+today+'.png')
plt.close(ax2.figure)
from lifetimes.plotting import plot_frequency_recency_matrix
ax3 = plot_frequency_recency_matrix(bgf)
ax3.figure.savefig('plot_frequency_recency_matrix_'+today+'.png')
plt.close(ax3.figure)
from lifetimes.plotting import plot_probability_alive_matrix
ax4 = plot_probability_alive_matrix(bgf)
ax4.figure.savefig('plot_probability_alive_matrix_'+today+'.png')
plt.close(ax4.figure)

# query to retrieve all the customers who purchases at least once
sql_predict = """ 
  SELECT
  date,
  fullVisitorId,
  sum(totals.transactionRevenue) as transactionRevenue,
  sum(totals.transactions) as transactions
  FROM
  `"""+gap_table+"""`
  WHERE
  (_TABLE_SUFFIX >= '"""+begin_date+"""' AND _TABLE_SUFFIX <= '"""+end_date+"""') AND
  totals.transactions IS NOT NULL
  GROUP BY 1,2
  ORDER BY date
  
"""
print ('# BigQuery SQL - predict')  
print (sql_predict)
transaction_query = bq.Query(sql_predict)
query_result = transaction_query.execute()

# copy the query result to pandas dataframe
transaction_query_data = query_result.result().to_dataframe()
transaction_query_data['transactionRevenue'] = transaction_query_data['transactionRevenue'] / 1000
transaction_query_data.head(50)

# Transform the query result dataframe into RFM (use whole period of data)
from lifetimes.utils import summary_data_from_transaction_data
summary = \
  summary_data_from_transaction_data(transaction_query_data, 'fullVisitorId', 'date', \
  observation_period_end=end_date, monetary_value_col='transactionRevenue')
print ('# summary_data_from_transaction_data')
print(summary.head(50))

# Prepare returning customers data to be used in GGF model, GGF training works with customers with monetary value
returning_customers_summary = summary[(summary['frequency']>0) & (summary['monetary_value']>0)]
print ('# returning_customers_summary')
print(returning_customers_summary.head())

# Predict predicted_purchases using the trained BGF model
t = 181 # 181 time units
returning_customers_summary['predicted_purchases'] = \
  bgf.conditional_expected_number_of_purchases_up_to_time(
    t,
    returning_customers_summary['frequency'],
    returning_customers_summary['recency'], 
    returning_customers_summary['T']
  )
returning_customers_summary['p_alive'] = \
  bgf.conditional_probability_alive(
    t, 
    returning_customers_summary['frequency'], 
    returning_customers_summary['recency'], 
    returning_customers_summary['T']
  )
returning_customers_summary.sort_values(by='predicted_purchases',ascending=False).head(50)

# Train GGF model using the returning customers data
from lifetimes import GammaGammaFitter

ggf = GammaGammaFitter(penalizer_coef = 0)
ggf.fit(returning_customers_summary['frequency'],
        returning_customers_summary['monetary_value'])
print ('# GammaGammaFitter')
print(ggf)

# Save the GGF model to be used later
ggf.save_model('ggf_'+today+'.pkl')

# Predict expected_average_profit using the trained GGF model
returning_customers_summary['expected_average_profit'] =  ggf.conditional_expected_average_profit(
        returning_customers_summary['frequency'],
        returning_customers_summary['monetary_value']
    )
returning_customers_summary.sort_values(by='expected_average_profit',ascending=False).head(50)

# Predict customer_lifetime_value using the trained GGF model and BGF model

returning_customers_summary['customer_lifetime_value'] = ggf.customer_lifetime_value(
    bgf, # the trained model to use to predict the number of future transactions
    returning_customers_summary['frequency'],
    returning_customers_summary['recency'],
    returning_customers_summary['T'],
    returning_customers_summary['monetary_value'],
    time=12, # months
    discount_rate=0.01)
returning_customers_summary.sort_values(by='customer_lifetime_value',ascending=False).head(50)
print ('# returning_customers_summary')
print returning_customers_summary
# fill na with 0
returning_customers_summary.fillna(value=0, inplace=True)
returning_customers_summary.dtypes

# Store output to cloud storage
import datalab.storage as gcs
gcs.Bucket('bq_jeewook').item('pltv_demo_output_'+today+'.csv').write_to(returning_customers_summary.to_csv(),'text/csv')

# Store output to BQ table
summary_with_index_column = returning_customers_summary.reset_index()
print ('# summary_with_index_column')
print summary_with_index_column
schema = bq.Schema.from_data(summary_with_index_column)
print ('# BQ schema')
print schema

output_table = bq.Table('gap_bq_r.pltv_demo_output_'+today)
output_table.create(schema,overwrite=True)

output_table.insert(summary_with_index_column)




# today: 20180611
# begin_date: 20160801
# end_date: 20170801
# calibration_end_date: 20170201
# BigQuery SQL - train
 
  SELECT
  date,
  obs_data.fullVisitorId as fullVisitorId,
  sum(totals.transactionRevenue) as transactionRevenue,
  sum(totals.transactions) as transactions
  FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*` obs_data
  
  INNER JOIN 
   ( SELECT 
       fullVisitorId
     FROM `bigquery-public-data.google_analytics_sample.ga_sessions_*`
     WHERE
     (_TABLE_SUFFIX >= '20160801' AND _TABLE_SUFFIX <= '20170201') AND
     totals.transactions IS NOT NULL
   ) cal_data ON cal_data.fullVisitorId = obs_data.fullVisitorId 
   WHERE
  (_TABLE_SUFFIX >= '20160801' AND _TABLE_SUFFIX <= '20170801') AND
  totals.transactions IS NOT NULL
  GROUP BY 1,2
  ORDER BY date

# summary_cal_holdout
                     frequency_cal  recency_cal  T_cal  monetary_value_cal  \
fullVisitorId                                                                
00006776957789

fullVisitorId,frequency,recency,T,monetary_value,predicted_purchases,p_alive,expected_average_profit,customer_lifetime_value
9304032725324091683,1.0,9.0,28.0,796000.0,0.37265789726,0.0,415771.904667,182103.531932
7314823991392094389,2.0,43.0,253.0,134000.0,0.035630918166,1.82950167216e-105,168023.492227,8683.51931482
7100576777601570735,1.0,13.0,240.0,119000.0,0.0283708839211,1.29459406478e-49,174794.872864,7472.58970493
9791048489281287821,1.0,62.0,168.0,55990.0,0.139762072964,0.0,152366.567438,30650.536603
9446116434630123396,1.0,1.0,14.0,347400.0,0.410202978004,0.984624151427,256093.475617,118149.168791
4370268639929292773,1.0,24.0,243.0,18990.0,0.0400421473563,4.78543578957e-78,139196.47855,8411.79155788
4305270415377533885,1.0,76.0,263.0,80760.0,0.0825522259378,0.0,161183.408025,20278.5023642
512793101313849434,1.0,59.0,265.0,158000.0,0.0669296278478,0.0,188676.858448,19263.1002089
6191832003278290813,1.0,46.0,226.0,119000.0,0.0714974028147,0.0,174794.872864,18689.9051621
281795238965680893,1.0,12.0,356.0,120300.0,0.0137455780173,1.94649929038e-46,175257.605717,3804.08516346
