# Spark Recommender System

In [66]:
import os
import shutil
import pyspark as ps
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import Row
from pyspark.sql.types import DoubleType

In [67]:
## ensure the spark context is available
spark = (ps.sql.SparkSession.builder
        .appName("sandbox")
        .getOrCreate()
        )
sc = spark.sparkContext
print(spark.version)

3.0.0


In [68]:
data_dir = os.path.join(".", "")
file = os.path.join(data_dir, "item_features_clustered.csv")

df_user = spark.read.format("csv")\
               .option("multiline", "true")\
               .option("quote", '"')\
               .option("header", "true")\
               .option("escape", "\\")\
               .option("escape", '"')\
               .load(file)

In [69]:
data_dir = os.path.join(".", "")
file = os.path.join(data_dir, "item_features.csv")

df_item = spark.read.format("csv")\
               .option("multiline", "true")\
               .option("quote", '"')\
               .option("header", "true")\
               .option("escape", "\\")\
               .option("escape", '"')\
               .load(file)

In [70]:
user_features = df_user.select(df_user['customer_unique_id'], 
                               df_user['product_id'], 
                               df_user['product_category_count'], 
                               df_user['cluster_id'])

In [71]:
user_features.show(4)

+--------------------+--------------------+----------------------+----------+
|  customer_unique_id|          product_id|product_category_count|cluster_id|
+--------------------+--------------------+----------------------+----------+
|7c396fd4830fd0422...|87285b34884572647...|                     1|        12|
|7c396fd4830fd0422...|9abb00920aae319ef...|                     1|        12|
|e781fdcc107d13d86...|87285b34884572647...|                     1|         3|
|3a51803cc0d012c3b...|87285b34884572647...|                     1|         3|
+--------------------+--------------------+----------------------+----------+
only showing top 4 rows



In [72]:
user_features = user_features.sort("customer_unique_id")

In [73]:
item_features = df_item.select(df_item['product_id'], 
                               df_item['product_category_name'], 
                               df_item['avg_price_binned'])

In [74]:
item_features.show(4)

+--------------------+---------------------+----------------+
|          product_id|product_category_name|avg_price_binned|
+--------------------+---------------------+----------------+
|372645c7439f9661f...|       bed_bath_table| (74.9, 135.833]|
|5099f7000472b634f...|        health_beauty|   (0.849, 39.9]|
|64b488de448a5324c...|           stationery|    (39.9, 74.9]|
|2345a354a6f203360...|            telephony|   (0.849, 39.9]|
+--------------------+---------------------+----------------+
only showing top 4 rows



In [75]:
from pyspark.ml.feature import StringIndexer

# create object of StringIndexer class and specify input and output column
SI_customer = StringIndexer(inputCol='customer_unique_id',outputCol='customer_index')
SI_product = StringIndexer(inputCol='product_id',outputCol='product_index')

# transform the data
user_features = SI_customer.fit(user_features).transform(user_features)
user_features = SI_product.fit(user_features).transform(user_features)
item_features = SI_product.fit(item_features).transform(item_features)

# view the transformed data
user_features.select('customer_unique_id', 'customer_index', 'product_id', 'product_index').show(10)
item_features.select('product_id', 'product_index').show(10)

+--------------------+--------------+--------------------+-------------+
|  customer_unique_id|customer_index|          product_id|product_index|
+--------------------+--------------+--------------------+-------------+
|0000366f3b9a7992b...|       11614.0|372645c7439f9661f...|        380.0|
|0000b849f77a49e4a...|       11615.0|5099f7000472b634f...|       2737.0|
|0000f46a3911fa3c0...|       11616.0|64b488de448a5324c...|       4156.0|
|0000f6ccb0745a6a4...|       11617.0|2345a354a6f203360...|       6611.0|
|0004aac84e0df4da2...|       11618.0|c72e18b3fe2739b8d...|      28356.0|
|0004bd2a26a76fe21...|       11619.0|25cf184645f3fae66...|       6625.0|
|00050ab1314c0e55a...|       11620.0|8cefe1c6f2304e7e6...|       2384.0|
|00053a61a98854899...|        2805.0|62984ea1bba7fcea1...|       5331.0|
|00053a61a98854899...|        2805.0|58727e154e8e85d84...|       1357.0|
|0005e1862207bf6cc...|       11621.0|e24f73b7631ee3fbb...|        801.0|
+--------------------+--------------+--------------

In [81]:
from pyspark.sql.types import IntegerType
user_features = user_features.withColumn("product_category_count",
                                        user_features["product_category_count"].cast(IntegerType()))

In [284]:
(training, test) = user_features.randomSplit([0.8, 0.2])

## Model Training

In [83]:
# train the recommender with als
als_alg = ALS(maxIter=5, 
              regParam=0.01, 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

model=als_alg.fit(training)

# evaluate with the holdout set
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='product_category_count',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)

print("Root-mean-squared-error = " + str(round(rmse, 3)))

Root-mean-squared-error = 0.273


## Generate user and product recommendations

In [84]:
# generate top 5 product recommendations for user
user_recs = model.recommendForAllUsers(5)
user_recs.show(4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[3776, 8.908216]...|
|           463|[[586, 4.9994254]...|
|           471|[[23114, 11.97792...|
|           496|[[1075, 5.677565]...|
+--------------+--------------------+
only showing top 4 rows



In [196]:
product_recs = model.recommendForAllItems(10)
product_recs.show()

+-------------+--------------------+
|product_index|     recommendations|
+-------------+--------------------+
|          148|[[93, 4.552056], ...|
|          463|[[2087, 6.8684244...|
|          471|[[55, 7.1659684],...|
|          496|[[55, 5.2572684],...|
+-------------+--------------------+
only showing top 4 rows



In [215]:
users = user_features.select(als_alg.getUserCol()).distinct().limit(3)
user_subset_recs = model.recommendForUserSubset(users, 10)
user_subset_recs.show(n=4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|         11757|[[68, 3.4271233],...|
|           558|[[1075, 20.42704]...|
|          2815|[[352, 3.225371],...|
+--------------+--------------------+



In [217]:
users.show()

+--------------+
|customer_index|
+--------------+
|        2815.0|
|         558.0|
|       11757.0|
+--------------+



## Recommender Function

In [233]:
# Generate pandas df for accessing products in recommender function
products = item_features.toPandas()

In [156]:
def user_recommendations(user_id, top_n = 3):
    
    print("User: {}\n".format(user_id))
    print("Top {} Recommendations: \n".format(top_n))
    rec_products = []
    for n in range(top_n):
        rec_products.append(recs[recs['customer_index'] == user_id]['recommendations'][0][n][0])
        
        print("{}. \n".format(n+1), products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][0])
        
        print(products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][1])

In [157]:
user_recommendations(user_id = 11757, top_n = 3)

User: 11757

Top 3 Recommendations: 

1. 
 e7cc48a9daff5436f63d3aad9426f28b
telephony
2. 
 f25b4b41194b370120c42e1b19bd2a8a
bed_bath_table
3. 
 04c4a4b9c924494fcf82e0fba966f955
watches_gifts


### Model with all data

In [236]:
# train the recommender with als
als_alg = ALS(maxIter=5, 
              regParam=0.01, 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

model=als_alg.fit(user_features)

In [237]:
# generate top_n product recommendations for user
nrecommend = 5
user_recs = model.recommendForAllUsers(nrecommend)
user_recs.show(4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[1651, 12.715227...|
|           463|[[586, 5.005967],...|
|           471|[[692, 11.821859]...|
|           496|[[596, 7.749952],...|
+--------------+--------------------+
only showing top 4 rows



In [245]:
recs = user_recs.toPandas()

In [250]:
recs.head()

Unnamed: 0,customer_index,recommendations
0,148,"[(1651, 12.715227127075195), (23114, 12.715227..."
1,463,"[(586, 5.005967140197754), (956, 4.98965263366..."
2,471,"[(692, 11.821859359741211), (901, 11.619313240..."
3,496,"[(596, 7.7499518394470215), (508, 6.2985320091..."
4,833,"[(676, 13.120061874389648), (3776, 12.81623840..."


In [251]:
customer_index = user_features_df[user_features_df['customer_unique_id'] == '698e1cf81d01a3d389d96145f7fa6df8']\
                                                            ['customer_index'].unique()[0]

In [268]:
list(recs[recs['customer_index'] == customer_index]['recommendations'])[0][0][0]

678

In [269]:
def user_recommendations(user_id, top_n = 3):
    
    if top_n > nrecommend:
        print("Please select up to {} items to recommend".format(nrecommend))
        return; 
    
    
    print("User: {}\n".format(user_id))
    print("Known positives: ")
    known_like_product = user_features_df[user_features_df['customer_unique_id'] == user_id]\
                                                            ['product_id'].unique()[0]
    known_like_category = products[products['product_id'] == known_like_product]\
                                                            ['product_category_name'].unique()[0]
    
    print("\t", known_like_product)
    print("\t", known_like_category, "\n")
    
    customer_index = user_features_df[user_features_df['customer_unique_id'] == user_id]\
                                                            ['customer_index'].unique()[0]
    
    print("Top {} Recommendations: \n".format(top_n))
    rec_products = []
    
    for n in range(top_n):
        
        rec_products.append(list(recs[recs['customer_index'] == customer_index]['recommendations'])[0][n][0])
        
        print("{}.\n".format(n+1), products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][0])
        
        print(products[products['product_index'] == rec_products[n]]\
                                                  [['product_id', 'product_category_name']].iloc[0][1])

In [274]:
user_recommendations('698e1cf81d01a3d389d96145f7fa6df8', top_n=5)

User: 698e1cf81d01a3d389d96145f7fa6df8

Known positives: 
	 9571759451b1d780ee7c15012ea109d4
	 auto 

Top 5 Recommendations: 

1.
 9571759451b1d780ee7c15012ea109d4
auto
2.
 f3720bc68555b1bff49b9ffd41b017ac
computers_accessories
3.
 42189544021ccb7369862e7ee218d828
health_beauty
4.
 837b5c6df9ceb8a9c604e78fde0e60a2
computers_accessories
5.
 70c1bce00b24bfd21332f7f8ebe2217f
housewares


### Parameter Tuning

In [275]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [278]:
# train the recommender with als
als = ALS(maxIter=5, 
              regParam=0.01, 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

In [279]:
# Import the requisite packages
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [280]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="product_category_count", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [281]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [285]:
#Fit cross validator to the 'train' dataset
model = cv.fit(training)
#Extract best model from the cv model above
best_model = model.bestModel
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.2198100620768705


In [286]:
print("**Best Model**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 150
  MaxIter: 5
  RegParam: 0.05


In [287]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[1358, 5.986023]...|
|           463|[[956, 5.001903],...|
|           471|[[353, 4.9678936]...|
|           496|[[11226, 3.976293...|
|           833|[[5976, 3.981684]...|
|          1088|[[8835, 4.0598054...|
|          1238|[[19280, 1.956732...|
|          1342|[[3733, 2.9773977...|
|          1580|[[678, 4.1014385]...|
|          1591|[[1687, 4.0513353...|
|          1645|[[8578, 2.9761338...|
|          1829|[[8550, 2.976134]...|
|          1959|[[352, 3.8325496]...|
|          2122|[[460, 1.9567739]...|
|          2142|[[13069, 3.029988...|
|          2366|[[1620, 2.968525]...|
|          2659|[[1923, 2.9697392...|
|          2866|[[6917, 1.9673104...|
|          3175|[[369, 1.954752],...|
|          3749|[[5388, 1.4065131...|
+--------------+--------------------+
only showing top 20 rows



### Re-run model for all users

In [289]:
# train the recommender with als
als_alg = ALS(maxIter=best_model._java_obj.parent().getMaxIter(), 
              regParam=best_model._java_obj.parent().getRegParam(), 
              userCol='customer_index', 
              itemCol="product_index", 
              ratingCol='product_category_count',
              coldStartStrategy='drop')

model=als_alg.fit(user_features)

In [290]:
# generate top_n product recommendations for user
nrecommend = 5
user_recs = model.recommendForAllUsers(nrecommend)
user_recs.show(4)

+--------------+--------------------+
|customer_index|     recommendations|
+--------------+--------------------+
|           148|[[23114, 14.96976...|
|           463|[[23114, 6.162423...|
|           471|[[692, 12.891839]...|
|           496|[[1651, 6.5149956...|
+--------------+--------------------+
only showing top 4 rows



In [291]:
recs = user_recs.toPandas()

In [292]:
user_recommendations('698e1cf81d01a3d389d96145f7fa6df8', top_n=5)

User: 698e1cf81d01a3d389d96145f7fa6df8

Known positives: 
	 9571759451b1d780ee7c15012ea109d4
	 auto 

Top 5 Recommendations: 

1.
 9571759451b1d780ee7c15012ea109d4
auto
2.
 837b5c6df9ceb8a9c604e78fde0e60a2
computers_accessories
3.
 f3720bc68555b1bff49b9ffd41b017ac
computers_accessories
4.
 42189544021ccb7369862e7ee218d828
health_beauty
5.
 70c1bce00b24bfd21332f7f8ebe2217f
housewares
