In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lower
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import itertools

In [2]:
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
# Load Data from csv
data = spark.read.csv("data.csv",header=True,schema="user_id INT, product_id INT, reordered INT, product_name STRING")
data_clean = data.drop("product_name")
#data_clean.show(3)

In [22]:
data_clean.show()

+-------+----------+---------+
|user_id|product_id|reordered|
+-------+----------+---------+
|      1|       196|       10|
|     15|       196|        4|
|     19|       196|        2|
|     21|       196|        0|
|     31|       196|        1|
|     43|       196|        1|
|     52|       196|       13|
|     67|       196|       19|
|     81|       196|        1|
|     82|       196|        8|
|     98|       196|        6|
|    109|       196|        0|
|    120|       196|        1|
|    185|       196|        0|
|    195|       196|       11|
|    222|       196|        1|
|    290|       196|        1|
|    331|       196|        0|
|    360|       196|        0|
|    377|       196|        0|
+-------+----------+---------+
only showing top 20 rows



In [4]:
# Splite Data
train_data, validation_data, test_data = data_clean.randomSplit([0.6,0.2,0.2])
nbValidating = validation_data.count()
nbTesting    = test_data.count()
print("Training: %d, validation: %d, test: %d" % (train_data.count(), nbValidating, test_data.count()))

Training: 8317687, validation: 2770902, test: 2775157


In [5]:
# finding best set of parameters
ranks  = [5, 10, 15, 20]
regParams = [0.1, 1.0 , 10.0]
maxIter  = [5, 10, 20]
alphas = [10, 20, 40]

# initial
min_error = float('inf')
best_rank = -1
best_iter = -1
best_regularization = 0
best_alpha = 0
best_model = None

for rank, reg, num, alpha in itertools.product(ranks, regParams, maxIter, alphas):
    # get ALS model
    als = ALS(maxIter=num, regParam=reg, rank=rank, alpha=alpha, implicitPrefs=True,\
              coldStartStrategy="drop", userCol="user_id", itemCol="product_id", ratingCol="reordered")
    # train ALS model
    model = als.fit(train_data)
    # evaluate the model by computing the RMSE on the validation data
    predictions = model.transform(validation_data)
    evaluator = RegressionEvaluator(metricName="rmse",
                                    labelCol="reordered",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print('{} latent factors, regularization = {}, maxIter = {} and alpha = {}: '
          'validation RMSE is {}'.format(rank, reg, num, alpha ,rmse))
    
    if rmse < min_error:
        min_error = rmse
        best_rank = rank
        best_iter = num
        best_regularization = reg
        best_alpha = alpha
        best_model = model
                        
    print('\nThe best model has {} latent factors, regularization = {} ,maxIter = {}, alpha = {}'\
          .format(best_rank, best_regularization, best_iter, best_alpha))

5 latent factors, regularization = 0.1, maxIter = 5 and alpha = 10: validation RMSE is 3.6903677165251256

The best model has 5 latent factors, regularization = 0.1 ,maxIter = 5, alpha = 10


In [10]:
# train ALS model
als = ALS(maxIter=best_iter, regParam=best_regularization, rank=best_rank, alpha=best_alpha, implicitPrefs=True,\
          coldStartStrategy="drop", userCol="user_id", itemCol="product_id", ratingCol="reordered")

model = als.fit(train_data)

# test model
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="reordered",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print('The out-of-sample RMSE of the best tuned model is:', rmse)

The out-of-sample RMSE of the best tuned model is: 3.691322615742211


In [11]:
# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show(10)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    148|[[26620, 0.055957...|
|    463|[[19660, 0.384348...|
|    471|[[24852, 0.537272...|
|    496|[[19660, 1.290631...|
|    833|[[21903, 0.901169...|
|   1088|[[47766, 0.532761...|
|   1238|[[24852, 1.201401...|
|   1342|[[10070, 0.104699...|
|   1580|[[19660, 0.476135...|
|   1591|[[24852, 0.262911...|
+-------+--------------------+
only showing top 10 rows



In [12]:
# Generate top 10 user recommendations for each product
prodRecs = model.recommendForAllItems(10)
prodRecs.show(10)

+----------+--------------------+
|product_id|     recommendations|
+----------+--------------------+
|       148|[[2335, 0.8462036...|
|       463|[[40990, 0.0], [4...|
|       471|[[151061, 0.07268...|
|       496|[[74315, 0.008597...|
|       833|[[40990, 0.0], [4...|
|      1088|[[74315, 0.002776...|
|      1238|[[40990, 0.0], [4...|
|      1342|[[77509, 0.003764...|
|      1580|[[151061, 0.00138...|
|      1591|[[148280, 6.40124...|
+----------+--------------------+
only showing top 10 rows



In [18]:
# Generate top 10 product recommendations for a specified set of users
users = data_clean.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
userSubsetRecs.show(10)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   3794|[[23909, 1.251553...|
|   8086|[[24852, 1.178453...|
|  11748|[[19348, 0.141449...|
+-------+--------------------+



In [20]:
# Generate top 10 user recommendations for a specified set of products
products = data_clean.select(als.getItemCol()).distinct().limit(3)
prodSubSetRecs = model.recommendForItemSubset(products, 10)
prodSubSetRecs.show(10)

+----------+--------------------+
|product_id|     recommendations|
+----------+--------------------+
|     23271|[[192754, 0.10195...|
|     29993|[[149869, 0.88273...|
|     29894|[[57546, 0.150543...|
+----------+--------------------+

