In [157]:
from functools import reduce 
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [158]:
#Connecting to a spark cluster
spark = SparkSession \
        .builder \
        .appName('Ranking-Football') \
        .getOrCreate()

In [159]:
#Read data source from file "online_retail.csv" to a dataframe
online_retail_df = spark.read.format('csv').options(header='true').load('./Data/order/online_retail.csv')

#Select all columns with limit 10 rows of online_retail_df
online_retail_df.limit(10).toPandas()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
5,536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom
6,536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom
7,536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom
8,536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom
9,536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom


In [160]:
# Select column InvoiceDate and CustomerID from online_retail_df
invoiceDate_customerID_df = online_retail_df.withColumn('RelationDate', to_date(split(col('InvoiceDate'), ' ').getItem(0), 'M/d/yyyy')) \
    .where(col('CustomerID').isNotNull()) \
    .select('RelationDate', 'CustomerID') 
invoiceDate_customerID_df.limit(10).toPandas()

Unnamed: 0,RelationDate,CustomerID
0,2010-12-01,17850
1,2010-12-01,17850
2,2010-12-01,17850
3,2010-12-01,17850
4,2010-12-01,17850
5,2010-12-01,17850
6,2010-12-01,17850
7,2010-12-01,17850
8,2010-12-01,17850
9,2010-12-01,13047


In [161]:
#Find first payment date of each customer
first_payment_date = invoiceDate_customerID_df.groupby('CustomerID') \
    .agg(min(col('RelationDate')).alias('FirstPaymentDate')) \
    .withColumnRenamed('CustomerID', 'CusID') 
first_payment_date.limit(10).toPandas()

Unnamed: 0,CusID,FirstPaymentDate
0,16250,2010-12-01
1,15574,2010-12-02
2,15555,2010-12-05
3,15271,2010-12-09
4,17714,2011-01-23
5,17757,2010-12-02
6,17551,2010-12-15
7,13187,2011-01-06
8,16549,2011-01-09
9,12637,2011-01-20


In [162]:
#Calculating total new customers daily
total_new_customer_daily = first_payment_date.groupby('FirstPaymentDate') \
    .agg(count(col('CusID')).alias('TotalNewCustomerDaily')) \
    .withColumnRenamed('FirstPaymentDate', 'Date')
total_new_customer_daily.select('*').orderBy(col('Date').asc()).limit(10).toPandas()

Unnamed: 0,Date,TotalNewCustomerDaily
0,2010-12-01,98
1,2010-12-02,108
2,2010-12-03,49
3,2010-12-05,68
4,2010-12-06,76
5,2010-12-07,53
6,2010-12-08,87
7,2010-12-09,83
8,2010-12-10,38
9,2010-12-12,31


In [163]:
#Calculating retained customer following the day after the first payment date
retained_customer = invoiceDate_customerID_df.join(first_payment_date, col('CustomerID') == col('CusID'), 'left') \
    .groupby('RelationDate', 'FirstPaymentDate') \
    .agg(count(col('CustomerID')).alias('TotalRetainedCustomerDaily')) \
    .orderBy('RelationDate', 'FirstPaymentDate')
retained_customer.limit(10).toPandas()

Unnamed: 0,RelationDate,FirstPaymentDate,TotalRetainedCustomerDaily
0,2010-12-01,2010-12-01,1968
1,2010-12-02,2010-12-01,229
2,2010-12-02,2010-12-02,1815
3,2010-12-03,2010-12-01,119
4,2010-12-03,2010-12-02,4
5,2010-12-03,2010-12-03,994
6,2010-12-05,2010-12-01,388
7,2010-12-05,2010-12-03,23
8,2010-12-05,2010-12-05,2313
9,2010-12-06,2010-12-01,181


In [164]:
# Join retained_customer and total_new_customer_daily dataframe to display TotalNewCustomerDaily and TotalRetainedCustomerDaily 
# following the first payment and the following days
cohort_table = retained_customer.join(total_new_customer_daily, col('FirstPaymentDate') == col('Date'), 'left') \
    .select('Date', 'RelationDate', 'TotalNewCustomerDaily','TotalRetainedCustomerDaily') \
    .orderBy('Date', 'RelationDate')
cohort_table.limit(20).toPandas()

Unnamed: 0,Date,RelationDate,TotalNewCustomerDaily,TotalRetainedCustomerDaily
0,2010-12-01,2010-12-01,98,1968
1,2010-12-01,2010-12-02,98,229
2,2010-12-01,2010-12-03,98,119
3,2010-12-01,2010-12-05,98,388
4,2010-12-01,2010-12-06,98,181
5,2010-12-01,2010-12-07,98,130
6,2010-12-01,2010-12-08,98,223
7,2010-12-01,2010-12-09,98,187
8,2010-12-01,2010-12-10,98,258
9,2010-12-01,2010-12-13,98,44


In [165]:
cohort_anlysis = cohort_table.groupby('Date') \
    .agg(collect_list('TotalRetainedCustomerDaily')) \
    .orderBy('Date')
cohort_anlysis.limit(10).toPandas()

Unnamed: 0,Date,collect_list(TotalRetainedCustomerDaily)
0,2010-12-01,"[1968, 229, 119, 388, 181, 130, 223, 187, 258,..."
1,2010-12-02,"[1815, 4, 44, 81, 35, 75, 169, 31, 99, 63, 87,..."
2,2010-12-03,"[994, 23, 36, 2, 6, 72, 2, 11, 22, 56, 54, 27,..."
3,2010-12-05,"[2313, 58, 65, 6, 67, 33, 145, 169, 22, 40, 53..."
4,2010-12-06,"[1655, 6, 36, 103, 58, 37, 68, 240, 85, 70, 9,..."
5,2010-12-07,"[849, 84, 33, 4, 33, 31, 1, 5, 15, 97, 2, 41, ..."
6,2010-12-08,"[1631, 8, 9, 46, 38, 128, 81, 38, 22, 56, 130,..."
7,2010-12-09,"[1277, 10, 10, 30, 2, 31, 53, 57, 4, 16, 34, 4..."
8,2010-12-10,"[908, 11, 25, 8, 29, 10, 10, 6, 89, 4, 3, 17, ..."
9,2010-12-12,"[1187, 5, 29, 13, 29, 30, 33, 10, 49, 1, 51, 1..."
