In [73]:
from google.colab import drive
drive.mount('/content/drive')
%cd 'drive/My Drive/CLV-Prediction'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[Errno 2] No such file or directory: 'drive/My Drive/CLV-Prediction'
/content/drive/My Drive/CLV-Prediction


In [74]:
!pip install -q pyspark

In [75]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, Tokenizer, StopWordsRemover, Normalizer, Word2Vec
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import FloatType
import pyspark.sql.functions as f

In [4]:
import pandas as pd

In [5]:
original_data = pd.read_csv("company.csv", encoding='unicode_escape')
original_data = original_data.dropna()
original_data = original_data[original_data.Quantity > 0]
original_data = original_data.drop_duplicates()

original_data

Unnamed: 0,CustomerID,InvoiceNo,InvoiceDate,StockCode,Description,Quantity,UnitPrice
0,17850.0,536365,25/09/2020,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,8.415
1,17850.0,536365,25/09/2020,71053,WHITE METAL LANTERN,6,11.187
2,17850.0,536365,25/09/2020,84406B,CREAM CUPID HEARTS COAT HANGER,8,9.075
3,17850.0,536365,25/09/2020,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,11.187
4,17850.0,536365,25/09/2020,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,11.187
...,...,...,...,...,...,...,...
495473,15804.0,581585,03/10/2021,22466,FAIRY TALE COTTAGE NIGHT LIGHT,12,6.435
495474,13113.0,581586,04/10/2021,22061,LARGE CAKE STAND HANGING STRAWBERY,8,9.735
495475,13113.0,581586,04/10/2021,23275,SET OF 3 HANGING OWLS OLLIE BEAK,24,4.125
495476,13113.0,581586,04/10/2021,21217,RED RETROSPOT ROUND CAKE TINS,24,29.535


In [6]:
session = SparkSession.builder.getOrCreate()

In [7]:
df = session \
    .read \
    .option('header', True) \
    .option('delimiter', ',') \
    .option('mode', 'FAILFAST') \
    .option('encoding', 'UTF-8') \
    .csv("company.csv")

In [None]:
df.show()

+----------+---------+-----------+---------+--------------------+--------+---------+
|CustomerID|InvoiceNo|InvoiceDate|StockCode|         Description|Quantity|UnitPrice|
+----------+---------+-----------+---------+--------------------+--------+---------+
|     17850|   536365| 25/09/2020|   85123A|WHITE HANGING HEA...|       6|    8.415|
|     17850|   536365| 25/09/2020|    71053| WHITE METAL LANTERN|       6|   11.187|
|     17850|   536365| 25/09/2020|   84406B|CREAM CUPID HEART...|       8|    9.075|
|     17850|   536365| 25/09/2020|   84029G|KNITTED UNION FLA...|       6|   11.187|
|     17850|   536365| 25/09/2020|   84029E|RED WOOLLY HOTTIE...|       6|   11.187|
|     17850|   536365| 25/09/2020|    22752|SET 7 BABUSHKA NE...|       2|   25.245|
|     17850|   536365| 25/09/2020|    21730|GLASS STAR FROSTE...|       6|   14.025|
|     17850|   536366| 25/09/2020|    22633|HAND WARMER UNION...|       6|    6.105|
|     17850|   536366| 25/09/2020|    22632|HAND WARMER RED P...|

In [None]:
df_product_description = df.select(['StockCode', 'Description']).distinct()
df_product_description = df_product_description.dropna()
df_product_description.show()

+---------+--------------------+
|StockCode|         Description|
+---------+--------------------+
|   84279P|CHERRY BLOSSOM  D...|
|    85015|SET OF 12  VINTAG...|
|    21249|WOODLAND  HEIGHT ...|
|    21002|ROSE DU SUD DRAWS...|
|    84987|SET OF 36 TEATIME...|
|    20671|BLUE TEATIME PRIN...|
|    22690|DOORMAT HOME SWEE...|
|    22708|     WRAP DOLLY GIRL|
|   90184A|AMBER CHUNKY BEAD...|
|    21285|RETROSPOT CANDLE ...|
|    22236|CAKE STAND 3 TIER...|
|   47593A|CAROUSEL PONIES B...|
|   35637C|PINK STRING CURTA...|
|    37423|WHITE WITH BLACK ...|
|   37444A|YELLOW BREAKFAST ...|
|    10002|INFLATABLE POLITI...|
|    22197|SMALL POPCORN HOLDER|
|    22423|REGENCY CAKESTAND...|
|    20973|12 PENCIL SMALL T...|
|   47559B| TEA TIME OVEN GLOVE|
+---------+--------------------+
only showing top 20 rows



In [None]:
tokenizer = Tokenizer(inputCol="Description", outputCol="DescriptionWords")
df_tmp_1 = tokenizer.transform(df_product_description)

In [None]:
df_tmp_1.show(5)

+---------+--------------------+--------------------+
|StockCode|         Description|    DescriptionWords|
+---------+--------------------+--------------------+
|   84279P|CHERRY BLOSSOM  D...|[cherry, blossom,...|
|    85015|SET OF 12  VINTAG...|[set, of, 12, , v...|
|    21249|WOODLAND  HEIGHT ...|[woodland, , heig...|
|    21002|ROSE DU SUD DRAWS...|[rose, du, sud, d...|
|    84987|SET OF 36 TEATIME...|[set, of, 36, tea...|
+---------+--------------------+--------------------+
only showing top 5 rows



In [None]:
remover = StopWordsRemover(inputCol="DescriptionWords", outputCol="DescriptionFilteredWords")
df_tmp_2 = remover.transform(df_tmp_1)
df_tmp_2.show(5)

+---------+--------------------+--------------------+------------------------+
|StockCode|         Description|    DescriptionWords|DescriptionFilteredWords|
+---------+--------------------+--------------------+------------------------+
|   84279P|CHERRY BLOSSOM  D...|[cherry, blossom,...|    [cherry, blossom,...|
|    85015|SET OF 12  VINTAG...|[set, of, 12, , v...|    [set, 12, , vinta...|
|    21249|WOODLAND  HEIGHT ...|[woodland, , heig...|    [woodland, , heig...|
|    21002|ROSE DU SUD DRAWS...|[rose, du, sud, d...|    [rose, du, sud, d...|
|    84987|SET OF 36 TEATIME...|[set, of, 36, tea...|    [set, 36, teatime...|
+---------+--------------------+--------------------+------------------------+
only showing top 5 rows



In [None]:
word_2_vec = Word2Vec(vectorSize=20, inputCol="DescriptionFilteredWords", outputCol="DescriptionFeatures")
word_2_vec_model = word_2_vec.fit(df_tmp_2)
df_tmp_3 = word_2_vec_model.transform(df_tmp_2)
df_tmp_3.show(5)

+---------+--------------------+--------------------+------------------------+--------------------+
|StockCode|         Description|    DescriptionWords|DescriptionFilteredWords| DescriptionFeatures|
+---------+--------------------+--------------------+------------------------+--------------------+
|   84279P|CHERRY BLOSSOM  D...|[cherry, blossom,...|    [cherry, blossom,...|[0.00443206522613...|
|    85015|SET OF 12  VINTAG...|[set, of, 12, , v...|    [set, 12, , vinta...|[0.03355824916313...|
|    21249|WOODLAND  HEIGHT ...|[woodland, , heig...|    [woodland, , heig...|[0.00716924406588...|
|    21002|ROSE DU SUD DRAWS...|[rose, du, sud, d...|    [rose, du, sud, d...|[0.01107154209166...|
|    84987|SET OF 36 TEATIME...|[set, of, 36, tea...|    [set, 36, teatime...|[0.02180505786091...|
+---------+--------------------+--------------------+------------------------+--------------------+
only showing top 5 rows



In [None]:
normalizer = Normalizer(inputCol="DescriptionFeatures", outputCol="DescriptionNormalizedFeatures")
df_tmp_4 = normalizer.transform(df_tmp_3)
df_tmp_4.show(5)

+---------+--------------------+--------------------+------------------------+--------------------+-----------------------------+
|StockCode|         Description|    DescriptionWords|DescriptionFilteredWords| DescriptionFeatures|DescriptionNormalizedFeatures|
+---------+--------------------+--------------------+------------------------+--------------------+-----------------------------+
|   84279P|CHERRY BLOSSOM  D...|[cherry, blossom,...|    [cherry, blossom,...|[0.00443206522613...|         [0.05115685297140...|
|    85015|SET OF 12  VINTAG...|[set, of, 12, , v...|    [set, 12, , vinta...|[0.03355824916313...|         [0.15329207176664...|
|    21249|WOODLAND  HEIGHT ...|[woodland, , heig...|    [woodland, , heig...|[0.00716924406588...|         [0.13115493613531...|
|    21002|ROSE DU SUD DRAWS...|[rose, du, sud, d...|    [rose, du, sud, d...|[0.01107154209166...|         [0.11318377266230...|
|    84987|SET OF 36 TEATIME...|[set, of, 36, tea...|    [set, 36, teatime...|[0.021805057

In [None]:
df_product_description = df_tmp_4.select(['StockCode', 'DescriptionNormalizedFeatures'])
df_product_description = df_product_description.withColumnRenamed('DescriptionNormalizedFeatures', 'Description')
df_product_description.show(5)

+---------+--------------------+
|StockCode|         Description|
+---------+--------------------+
|   84279P|[0.05115685297140...|
|    85015|[0.15329207176664...|
|    21249|[0.13115493613531...|
|    21002|[0.11318377266230...|
|    84987|[0.16246270467030...|
+---------+--------------------+
only showing top 5 rows



In [None]:
dot_udf = f.udf(lambda x, y: float(x.dot(y)), FloatType())

In [None]:
df_test_3 = df_product_description.sample(fraction=0.01, seed=42).limit(3)
column_names = ['P', 'D']
df_test_3 = df_test_3.toDF(*column_names)

df_test_all = df_product_description.crossJoin(df_test_3)

df_cosine_similarity = df_test_all.withColumn(
    "CosineSimilarity",
    dot_udf(f.col("Description"), f.col("D"))
)

df_cosine_similarity = df_cosine_similarity.select(['StockCode', 'P', 'CosineSimilarity'])
df_cosine_similarity.show(5)

+---------+------+----------------+
|StockCode|     P|CosineSimilarity|
+---------+------+----------------+
|   84279P|35833P|      0.77215964|
|   84279P| 22048|       0.7845501|
|   84279P|35598B|       0.3771917|
|    85015|35833P|      0.38150942|
|    85015| 22048|       0.5128573|
+---------+------+----------------+
only showing top 5 rows



In [None]:
df_customer_products = df.select(['CustomerId', 'InvoiceNo', 'StockCode']).distinct()
df_customer_products = df_customer_products.dropna()
df_customer_products = df_customer_products.groupBy(['CustomerId', 'StockCode']).count().withColumnRenamed('count', 'Rating')
df_customer_products.show(5)

+----------+---------+------+
|CustomerId|StockCode|Rating|
+----------+---------+------+
|     15039|   47566B|     8|
|     14709|    23283|     1|
|     16553|   15044B|     1|
|     14918|    21212|     1|
|     14587|    21754|     5|
+----------+---------+------+
only showing top 5 rows



In [None]:
customer_indexer = StringIndexer(inputCol="CustomerId", outputCol="CustomerIdIndex").fit(df_customer_products)
customer_indexer_labels = customer_indexer.labels
product_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCodeIndex").fit(df_customer_products)
product_indexer_labels = product_indexer.labels

df_customer_products = customer_indexer.transform(df_customer_products)
df_customer_products = product_indexer.transform(df_customer_products)

In [None]:
df_customer_products.show(5)

+----------+---------+------+---------------+--------------+
|CustomerId|StockCode|Rating|CustomerIdIndex|StockCodeIndex|
+----------+---------+------+---------------+--------------+
|     15039|   47566B|     8|           22.0|         311.0|
|     14709|    23283|     1|         1072.0|         532.0|
|     16553|   15044B|     1|         1227.0|        1834.0|
|     14918|    21212|     1|         1830.0|           9.0|
|     14587|    21754|     5|          266.0|          62.0|
+----------+---------+------+---------------+--------------+
only showing top 5 rows



In [None]:
(training_data, testing_data) = df_customer_products.randomSplit([0.8, 0.2], seed=2023)

In [None]:
als = ALS(userCol="CustomerIdIndex", itemCol="StockCodeIndex", ratingCol="Rating", coldStartStrategy="drop")
model = als.fit(training_data)

In [None]:
predictions = model.transform(testing_data)
predictions.show(5)

+----------+---------+------+---------------+--------------+----------+
|CustomerId|StockCode|Rating|CustomerIdIndex|StockCodeIndex|prediction|
+----------+---------+------+---------------+--------------+----------+
|     12747|    23201|     1|         1713.0|          46.0| 2.3240733|
|     12747|   85123A|     4|         1713.0|           0.0| 4.8551207|
|     12748|    16008|     1|            0.0|        1567.0| 1.0283847|
|     12748|    20658|     2|            0.0|        1771.0| 2.6899908|
|     12748|    20675|     2|            0.0|         665.0| 1.9968646|
+----------+---------+------+---------------+--------------+----------+
only showing top 5 rows



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error: {rmse}')

Root Mean Squared Error: 1.080778412641624


In [None]:
customer_ids = ['17850', '13047', '15520', '13298', '14569']
df_input = customer_indexer.transform(session.createDataFrame([(i,) for i in customer_ids], ["CustomerId"]))

In [None]:
df_input.show(5)

+----------+---------------+
|CustomerId|CustomerIdIndex|
+----------+---------------+
|     17850|         2489.0|
|     13047|          632.0|
|     15520|         2779.0|
|     13298|         3797.0|
|     14569|         3278.0|
+----------+---------------+



In [None]:
df_output = model.recommendForUserSubset(df_input, 5)
df_output.show(5)

+---------------+--------------------+
|CustomerIdIndex|     recommendations|
+---------------+--------------------+
|           2489|[{2279, 46.846813...|
|           2779|[{244, 2.1073525}...|
|           3797|[{2279, 2.1836727...|
|           3278|[{244, 1.6060632}...|
|            632|[{2279, 4.6991425...|
+---------------+--------------------+



In [None]:
df_tmp_1 = df_output.select(df_output.CustomerIdIndex, f.explode(df_output.recommendations))
df_tmp_1.show(5)

+---------------+-----------------+
|CustomerIdIndex|              col|
+---------------+-----------------+
|           2489|{2279, 46.846813}|
|           2489|   {125, 25.1332}|
|           2489|   {41, 20.98842}|
|           2489| {481, 20.367277}|
|           2489|  {72, 19.462841}|
+---------------+-----------------+
only showing top 5 rows



In [None]:
df_tmp_2 = df_tmp_1.select('CustomerIdIndex', 'col.StockCodeIndex')
df_tmp_2.show(5)

+---------------+--------------+
|CustomerIdIndex|StockCodeIndex|
+---------------+--------------+
|           2489|          2279|
|           2489|           125|
|           2489|            41|
|           2489|           481|
|           2489|            72|
+---------------+--------------+
only showing top 5 rows



In [None]:
customer_converter = IndexToString(inputCol="CustomerIdIndex", outputCol="CustomerId", labels=customer_indexer_labels)
product_converter = IndexToString(inputCol="StockCodeIndex", outputCol="StockCode", labels=product_indexer_labels)
df_output = customer_converter.transform(df_tmp_2)
df_output = product_converter.transform(df_output)
df_output = df_output.select(['CustomerId', 'StockCode'])
df_output.show(5)

+----------+---------+
|CustomerId|StockCode|
+----------+---------+
|     17850|        D|
|     17850|    82484|
|     17850|    22197|
|     17850|    22328|
|     17850|    20914|
+----------+---------+
only showing top 5 rows



In [None]:
df_output_products = df_output.select('StockCode').distinct()
df_output_products = df_output_products.join(df_product_description, 'StockCode', 'inner')
df_output_products.show(5)

+---------+--------------------+
|StockCode|         Description|
+---------+--------------------+
|    82486|[0.20468775687811...|
|    82486|[0.24597075130717...|
|    20914|[0.18436166769614...|
|    22726|[-0.2623175862259...|
|    79321|[0.06863859365194...|
+---------+--------------------+
only showing top 5 rows



In [None]:
column_names = ['P', 'D']
df_output_products = df_output_products.toDF(*column_names)

df_test_all = df_product_description.crossJoin(df_output_products)
df_test_all = df_test_all.where(df_test_all.StockCode != df_test_all.P)

df_cosine_similarity = df_test_all.withColumn(
    "CosineSimilarity",
    dot_udf(f.col("Description"), f.col("D"))
)
df_cosine_similarity.printSchema()

df_cosine_similarity = df_cosine_similarity.select(['StockCode', 'P', 'CosineSimilarity'])
df_cosine_similarity = df_cosine_similarity.orderBy(f.col("CosineSimilarity").desc())

df_cosine_similarity.show(10)

root
 |-- StockCode: string (nullable = true)
 |-- Description: vector (nullable = true)
 |-- P: string (nullable = true)
 |-- D: vector (nullable = true)
 |-- CosineSimilarity: float (nullable = true)

+---------+------+----------------+
|StockCode|     P|CosineSimilarity|
+---------+------+----------------+
|   51020A|84507C|             1.0|
|    37327|84507C|             1.0|
|   85017C|84507C|             1.0|
|    21319|84507C|             1.0|
|    20665|84507C|             1.0|
|    21829|84507C|             1.0|
|   35818B|84507C|             1.0|
|    21794|84507C|             1.0|
|   47566B|84507C|             1.0|
|    22687|84507C|             1.0|
+---------+------+----------------+
only showing top 10 rows



# Inference

In [None]:
model

ALSModel: uid=ALS_713f857dd6a0, rank=10

In [None]:
model.save('recommendation_model')

In [None]:
customer_indexer.save('customer_indexer')

In [None]:
product_indexer.save('product_indexer')

In [None]:
from pyspark.ml.recommendation import ALSModel
model_saved = ALSModel.load('recommendation_model')
model_saved

ALSModel: uid=ALS_713f857dd6a0, rank=10

In [None]:
from pyspark.ml.feature import StringIndexerModel

In [None]:
customer_indexer_saved = StringIndexerModel.load('customer_indexer')
customer_indexer_saved

StringIndexerModel: uid=StringIndexer_5429c40b6a99, handleInvalid=error

In [None]:
product_indexer_saved = StringIndexerModel.load('product_indexer')
product_indexer_saved

StringIndexerModel: uid=StringIndexer_e4adec5fd9cd, handleInvalid=error

In [8]:
session.stop()

In [76]:
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.feature import StringIndexerModel

def make_recommendation(original_data, n=5):
  customer_ids = original_data.CustomerID.astype(int).astype(str).unique()
  session = SparkSession.builder.getOrCreate()

  model = ALSModel.load('recommendation_model')
  product_indexer = StringIndexerModel.load('product_indexer')
  customer_indexer = StringIndexerModel.load('customer_indexer')

  df_input = customer_indexer.transform(session.createDataFrame([(i,) for i in customer_ids], ["CustomerId"]))
  df_output = model.recommendForUserSubset(df_input, n)

  df_tmp_1 = df_output.select(df_output.CustomerIdIndex, f.explode(df_output.recommendations))
  df_tmp_2 = df_tmp_1.select('CustomerIdIndex', 'col.StockCodeIndex')
  customer_converter = IndexToString(inputCol="CustomerIdIndex", outputCol="CustomerId", labels=customer_indexer.labels)
  product_converter = IndexToString(inputCol="StockCodeIndex", outputCol="StockCode", labels=product_indexer.labels)
  df_output = customer_converter.transform(df_tmp_2)
  df_output = product_converter.transform(df_output)
  df_output = df_output.select(['CustomerId', 'StockCode'])
  df = df_output.toPandas()
  session.stop()

  product_infos = original_data[["StockCode", "Description"]]
  product_infos.Description = product_infos.Description.str.strip()
  product_infos = product_infos.drop_duplicates()
  product_infos.StockCode = product_infos.StockCode.astype(str)

  df = df.merge(product_infos, on="StockCode", how="inner").drop_duplicates()
  return df

In [77]:
data_recommendation = make_recommendation(original_data)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  product_infos.Description = product_infos.Description.str.strip()


In [79]:
data_recommendation

Unnamed: 0,CustomerId,StockCode,Description
0,12748,22197,SMALL POPCORN HOLDER
1,12748,22197,POPCORN HOLDER
2,14298,22197,SMALL POPCORN HOLDER
3,14298,22197,POPCORN HOLDER
4,14456,22197,SMALL POPCORN HOLDER
...,...,...,...
19709,13672,85232B,SET OF 3 BABUSHKA STACKING TINS
19710,14589,22988,SOLDIERS EGG CUP
19711,12823,22988,SOLDIERS EGG CUP
19712,13302,22988,SOLDIERS EGG CUP


In [51]:
data_recommendation.Description = data_recommendation.Description.str.strip()

In [83]:
data_recommendation[data_recommendation.CustomerId	== "12749"]

Unnamed: 0,CustomerId,StockCode,Description
3087,12749,85099B,JUMBO BAG RED RETROSPOT
6485,12749,22423,REGENCY CAKESTAND 3 TIER
9511,12749,85123A,WHITE HANGING HEART T-LIGHT HOLDER
9512,12749,85123A,CREAM HANGING HEART T-LIGHT HOLDER
12268,12749,82486,WOOD S/3 CABINET ANT WHITE FINISH
12269,12749,82486,3 DRAWER ANTIQUE WHITE WOOD CABINET
14613,12749,84507C,BLUE CIRCLES DESIGN MONKEY DOLL


In [81]:
original_data[original_data.CustomerID == 14298.0]

Unnamed: 0,CustomerID,InvoiceNo,InvoiceDate,StockCode,Description,Quantity,UnitPrice
28350,14298.0,538827,09/10/2020,84660A,WHITE STITCHED WALL CLOCK,60,4.125
28351,14298.0,538827,09/10/2020,84509E,SET OF 4 CAROUSEL PLACEMATS,24,4.125
28352,14298.0,538827,09/10/2020,84356,POMPOM CURTAIN,24,6.435
28353,14298.0,538827,09/10/2020,82613C,"METAL SIGN,CUPCAKE SINGLE HOOK",20,1.386
28354,14298.0,538827,09/10/2020,82613B,"METAL SIGN,CUPCAKE SINGLE HOOK",20,1.386
...,...,...,...,...,...,...,...
472554,14298.0,580055,26/09/2021,23581,JUMBO BAG PAISLEY PARK,30,5.907
472555,14298.0,580055,26/09/2021,85099C,JUMBO BAG BAROQUE BLACK WHITE,30,5.907
472556,14298.0,580055,26/09/2021,23343,JUMBO BAG VINTAGE CHRISTMAS,40,5.907
472557,14298.0,580055,26/09/2021,85099F,JUMBO BAG STRAWBERRY,10,5.907


In [82]:
data_recommendation.to_csv("recommendation_data.csv", index=False)