# HBC Prediction & Recommendation Model

 ### This Model is developed using PySpark and the Motivation behind this Model is to predict the probablity that the Customer will buy the product or buy a specific's brand product that they have already purchased before.

## Importing the Necessary Libraries

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [2]:
file_path = 'C:/Users/uttas/Desktop/Resume/HBC/Customer_Tran_Sample.csv'

In [3]:
sdf = spark.read.csv(file_path, header=True, inferSchema=True)

In [4]:
from datetime import timedelta
import pandas as pd

## EXPLORATORY DATA ANALYSIS

In [5]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [6]:
sdf.show()

+--------------------+-------------------+---------+--------+----------+------+---------+
|         CUSTOMER_ID|            TRAN_DT|   UPC_ID|ITEM_QTY|DOLLAR_AMT|DIV_CD| BRAND_ID|
+--------------------+-------------------+---------+--------+----------+------+---------+
|723d45afb1ae93e9e...|2018-11-27 00:00:00|UPC_10165|       2|    407.06|     8| BRAND_77|
|33b9301f1d4a6df09...|2017-12-15 00:00:00|UPC_19768|       1|      18.1|     8|BRAND_132|
|2aa43189e547e4f8c...|2017-12-23 00:00:00|UPC_11067|       1|     31.67|     8| BRAND_85|
|92d970aefc6bfead8...|2018-12-06 00:00:00|UPC_15119|       3|    115.77|     8| BRAND_99|
|8db5209a423f272bc...|2018-04-24 00:00:00| UPC_8131|       1|     39.81|     8|BRAND_124|
|a1b46827742ff2e66...|2019-11-02 00:00:00|UPC_10203|       1|     91.74|     8| BRAND_77|
|a339330079c7b6f47...|2019-08-12 00:00:00| UPC_7200|       1|     42.91|     8| BRAND_22|
|63503d2c4307ace47...|2020-01-23 00:00:00| UPC_8294|       1|     37.48|     8|BRAND_124|
|6d8feb66e

In [5]:
sdf.printSchema()

root
 |-- CUSTOMER_ID: string (nullable = true)
 |-- TRAN_DT: timestamp (nullable = true)
 |-- UPC_ID: string (nullable = true)
 |-- ITEM_QTY: integer (nullable = true)
 |-- DOLLAR_AMT: double (nullable = true)
 |-- DIV_CD: integer (nullable = true)
 |-- BRAND_ID: string (nullable = true)



In [8]:
sdf.describe().show()

+-------+--------------------+--------+------------------+-----------------+-------+--------+
|summary|         CUSTOMER_ID|  UPC_ID|          ITEM_QTY|       DOLLAR_AMT| DIV_CD|BRAND_ID|
+-------+--------------------+--------+------------------+-----------------+-------+--------+
|  count|             9961346| 9961346|           9961346|          9961346|9961346| 9961346|
|   mean|                null|    null|1.2708712256355716|96.00257169253813|    8.0|    null|
| stddev|                null|    null|0.9931223994824471|118.5588148844263|    0.0|    null|
|    min|0000036fb27b87150...|   UPC_1|                 0|         -1605.93|      8| BRAND_1|
|    max|fffffe4d5084d9ed6...|UPC_9999|                99|         14514.22|      8|BRAND_99|
+-------+--------------------+--------+------------------+-----------------+-------+--------+



In [6]:
sdf = sdf.withColumnRenamed('CUSTOMER_ID', 'customer_id')
sdf = sdf.withColumnRenamed('UPC_ID', 'upc_id')
sdf = sdf.withColumnRenamed('ITEM_QTY', 'quantity')
sdf = sdf.withColumnRenamed('DOLLAR_AMT', 'dollar_amt')
sdf = sdf.withColumnRenamed('DIV_CD', 'div_cd')
sdf = sdf.withColumnRenamed('TRAN_DT', 'date')
sdf = sdf.withColumnRenamed('BRAND_ID', 'brand_id')

In [7]:
sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- dollar_amt: double (nullable = true)
 |-- div_cd: integer (nullable = true)
 |-- brand_id: string (nullable = true)



In [8]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
sdf2 = sdf.withColumn("unique_customer_id", F.dense_rank().over(Window.orderBy(sdf.customer_id)))

In [9]:
sdf3 = sdf2.withColumn("unique_product_id", F.dense_rank().over(Window.orderBy(sdf.upc_id)))

In [10]:
sdf2.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- dollar_amt: double (nullable = true)
 |-- div_cd: integer (nullable = true)
 |-- brand_id: string (nullable = true)
 |-- unique_customer_id: integer (nullable = true)



In [11]:
sdf3.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- dollar_amt: double (nullable = true)
 |-- div_cd: integer (nullable = true)
 |-- brand_id: string (nullable = true)
 |-- unique_customer_id: integer (nullable = true)
 |-- unique_product_id: integer (nullable = true)



In [12]:
sdf4 = sdf3.withColumn("brand_id", F.dense_rank().over(Window.orderBy(sdf.brand_id)))

## Columns considered for Model Building

In [13]:
cols = ['unique_customer_id','unique_product_id','quantity','brand_id']
sdf5 = sdf4.select(*cols)

In [14]:
sdf5.printSchema()

root
 |-- unique_customer_id: integer (nullable = true)
 |-- unique_product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- brand_id: integer (nullable = true)



In [11]:
single_user1 = sdf5.filter(sdf5['unique_customer_id']==393020).select(['unique_product_id','quantity', 'brand_id'])

In [21]:
single_user1.show()

+-----------------+--------+--------+
|unique_product_id|quantity|brand_id|
+-----------------+--------+--------+
|            30135|       1|      80|
|               59|       1|      80|
|              103|       1|      80|
|              115|       1|      80|
|              148|       1|      80|
|              481|       1|      80|
|              936|       1|      80|
|             6947|       1|      80|
|             1625|       1|      80|
|             7469|       1|      80|
|             2359|       1|      80|
|            34723|       1|      80|
|             8068|       1|      96|
|             8073|       1|      96|
|            14435|       1|      96|
|             8229|       1|      96|
|            11259|       1|      96|
|            11259|       1|      96|
|            11314|       1|      96|
|            11314|       1|      96|
+-----------------+--------+--------+



## Calculating Sparsity in the Dataset

In [18]:
numerator = sdf5.select("quantity").count()

## Total Count of Number of Unique Customers

In [19]:
num_users = sdf5.select("unique_customer_id").distinct().count()
num_users

1810904

## Total Count of Number of Unique Products

In [20]:
num_items = sdf5.select("unique_product_id").distinct().count()
num_items

35288

## Total Count of Number of Unique Brands

In [66]:
num_brands = sdf5.select("brand_id").distinct().count()
num_brands

147

In [74]:
denominator1 = num_users * num_items

In [75]:
sparsity = (1.0 - (numerator *1.0)/denominator1)*100

## Sparsity based on distinct Number of Products

In [76]:
print("The Customer Data Set is ", "%.2f" % sparsity + "% Sparse.")

The Customer Data Set is  99.98% Sparse.


In [77]:
denominator2 = num_users * num_brands

In [78]:
sparsity = (1.0 - (numerator *1.0)/denominator2)*100

## Sparsity based on distinct Number of Brands

In [79]:
print("The Customer Data Set is ", "%.2f" % sparsity + "% Sparse.")

The Customer Data Set is  96.26% Sparse.


In [24]:
userId_ratings = sdf5.groupBy("unique_customer_id").count().orderBy('count', ascending=False)
userId_ratings.show()

+------------------+-----+
|unique_customer_id|count|
+------------------+-----+
|            469001|99267|
|            655913|91997|
|            834345|28390|
|           1533671|20940|
|           1786597|18184|
|            674614|11987|
|           1012862| 6567|
|           1595349| 4686|
|              2902| 4472|
|            460479| 4364|
|           1181220| 4362|
|           1642696| 4282|
|           1399599| 4263|
|            876688| 3463|
|            828731| 3361|
|            820853| 3297|
|            813335| 3248|
|            753233| 3110|
|            417310| 2915|
|           1698199| 2734|
+------------------+-----+
only showing top 20 rows



In [25]:
product_id = sdf5.groupBy("unique_product_id").count().orderBy('count', ascending=False)

In [26]:
brand_id = sdf5.groupBy("brand_id").count().orderBy('count', ascending=False)

### Building a Recommendation System Model using Alternating Least Squares Matrix Factorisation

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [16]:
(train, test) = sdf5.randomSplit([0.7, 0.3])

In [17]:
als = ALS(
         userCol="unique_customer_id", 
         itemCol="unique_product_id",
         ratingCol="quantity", 
         nonnegative = True, 
         implicitPrefs = True,
         coldStartStrategy="drop"
)

In [18]:
type(als)

pyspark.ml.recommendation.ALS

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10]) \
            .addGrid(als.regParam, [.01]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="quantity", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  1


In [20]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)

In [21]:
model = cv.fit(train)

In [24]:
best_model.save("C:/Users/uttas/Desktop/Resume/HBCALSModel4")

In [22]:
best_model = model.bestModel

In [25]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 10
  MaxIter: 10
  RegParam: 0.01


In [26]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.5060928088855827


In [34]:
test_predictions.show()

+------------------+-----------------+--------+--------+----------+
|unique_customer_id|unique_product_id|quantity|brand_id|prediction|
+------------------+-----------------+--------+--------+----------+
|            393020|              148|       1|      80| 0.8266336|
|            393060|              148|       1|      80|0.73733413|
|           1610972|              148|       1|      80|0.27114624|
|           1372880|              148|       1|      80| 0.6400181|
|           1719253|              148|       1|      80|0.93658596|
|           1685043|              148|       1|      80| 0.9255763|
|           1630673|              148|       1|      80|0.12159253|
|           1712577|              148|       1|      80| 0.9462562|
|            254825|              148|       3|      80| 3.1893854|
|            972175|              148|       1|      80| 0.9345626|
|            210582|              148|       1|      80|0.25051877|
|           1697454|              148|       1| 

In [37]:
single_user = test_predictions.filter(test_predictions['unique_customer_id']==393020).select(['unique_product_id','quantity', 'prediction'])

In [36]:
single_user.show()

+-----------------+--------+----------+
|unique_product_id|quantity|prediction|
+-----------------+--------+----------+
|              148|       1| 0.8266336|
|              103|       1| 0.7863437|
|            11314|       1| 0.9570022|
|               59|       1| 0.8558659|
|             8073|       1|0.78083205|
|             2359|       1| 0.8254836|
|            34723|       1| 1.0496634|
+-----------------+--------+----------+



In [38]:
single_user2 = test.filter(test['unique_customer_id']==393020).select(['unique_product_id','quantity'])

In [25]:
nrecommendations = best_model.recommendForAllUsers(10)

In [None]:
from pyspark.sql.functions import col, explode
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('unique_customer_id', col("rec_exp.unique_product_id"), col("rec_exp.rating"))
    
nrecommendations.limit(10).show()

In [None]:
type(nrecommendations)

In [None]:
nrecommendations_unique.show()

In [41]:
nrecommendations_393020 = nrecommendations.filter(nrecommendations['unique_customer_id']==393020).select(['unique_product_id','rating'])

In [42]:
nrecommendations_393020.show()

+-----------------+---------+
|unique_product_id|   rating|
+-----------------+---------+
|             4082|11.473021|
|             4589|11.318115|
|             4024|10.535624|
|              584|7.7321053|
|            14572| 6.967832|
|            14004| 6.778577|
|            11725|5.9410577|
|             5538| 5.891152|
|            19023|5.7766814|
|             7187| 5.708955|
+-----------------+---------+



In [44]:
trained_393020 = train.filter(train['unique_customer_id']==393020).select(['unique_product_id','quantity'])

In [45]:
trained_393020.show()

+-----------------+--------+
|unique_product_id|quantity|
+-----------------+--------+
|               59|       1|
|              103|       1|
|              115|       1|
|              481|       1|
|              936|       1|
|             1625|       1|
|             2359|       1|
|             8073|       1|
|             8229|       1|
|            11259|       1|
|            11314|       1|
|            11314|       1|
|            14435|       1|
|            30135|       1|
|            34723|       1|
+-----------------+--------+



In [49]:
test_393020 = test.filter(test['unique_customer_id']==393020).select(['unique_product_id','quantity'])

In [50]:
test_393020.show()

+-----------------+--------+
|unique_product_id|quantity|
+-----------------+--------+
|              148|       1|
|             6947|       1|
|             7469|       1|
|             8068|       1|
|            11259|       1|
+-----------------+--------+



In [56]:
test_prediction_393020 = test_predictions.filter(test_predictions['unique_customer_id']==393020).select(['unique_product_id','quantity','prediction'])

In [57]:
test_prediction_393020.show()

+-----------------+--------+----------+
|unique_product_id|quantity|prediction|
+-----------------+--------+----------+
|              148|       1| 0.9285966|
|            11259|       1| 1.0873121|
|             8068|       1|0.87903464|
|             6947|       1| 1.1971238|
|             7469|       1|  1.128898|
+-----------------+--------+----------+



In [None]:
nrecommendations

In [55]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

model.save("C:/Users/uttas/Desktop/Resume/HBCALSModel")

In [24]:
type(model)

pyspark.ml.tuning.CrossValidatorModel

In [22]:
best_model.save("C:/Users/uttas/Desktop/Resume/HBCALSModel2")

In [23]:
type(best_model)

pyspark.ml.recommendation.ALSModel

In [21]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator,CrossValidatorModel

In [64]:
sameModel = CrossValidatorModel.load("C:/Users/uttas/Desktop/Resume/HBCALSModel")

In [65]:
type(sameModel)

pyspark.ml.tuning.CrossValidatorModel