SetUp Spark

In [1]:
import findspark

In [2]:
findspark.init('C:/Users/sandi/spark-2.4.3-bin-hadoop2.7')

In [3]:
findspark.find()

'C:/Users/sandi/spark-2.4.3-bin-hadoop2.7'

In [23]:
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, desc, lit, row_number, array
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [5]:
# spark config
spark = SparkSession \
    .builder \
    .appName("recommender system") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.master", "local[2]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

Scenario1: Build a model for batch prediction 

In [6]:
# contains data with book_id, cust_id and rating using
ratings = spark.read.load('D:/Code/Recommender system/datasets/book_ratings.csv', format='csv', header=True, inferSchema=True)

In [7]:
ratings.show(3, False)

+-------+-------+------+
|book_id|cust_id|rating|
+-------+-------+------+
|613    |49452  |5     |
|606    |22007  |1     |
|235    |72296  |3     |
+-------+-------+------+
only showing top 3 rows



In [8]:
ratings.describe().show()

+-------+-----------------+------------------+------------------+
|summary|          book_id|           cust_id|            rating|
+-------+-----------------+------------------+------------------+
|  count|           100000|            100000|            100000|
|   mean|        500.44013|       50011.57292|           2.99959|
| stddev|288.4549640034758|23104.453942277418|1.4147614100612311|
|    min|                1|             10001|                 1|
|    max|             1000|             90000|                 5|
+-------+-----------------+------------------+------------------+



In [9]:
# No of dictinct customer Ids and Book Ids
print("Distinct customer Ids: {}".format(ratings.select('cust_id').distinct().count()))
print("Distinct book Ids: {}".format(ratings.select('book_id').distinct().count()))

Distinct customer Ids: 57072
Distinct book Ids: 1000


In [10]:
# Distinct ratings
ratings.select('rating').distinct().sort('rating').show()

+------+
|rating|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
+------+



In [11]:
# Uses cross validator with hyperparameter space and k-fold validation to train the model. 
# Due to k-fold cross validation this was taking longer time. It appeared it would take longer time during demo therefore
# I only used the one with hyper parameter space. i.e. the function - train_model_als.
def train_model_als_csv(data, num_iters, reg_params, ranks, train_split, params, num_folds):
    (train_data, validation_data) = data.randomSplit([train_split, 1 - train_split])
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    als = ALS(userCol = params['userCol'], itemCol = params['itemCol'], ratingCol = params['ratingCol'], seed = 101, coldStartStrategy = "drop")
    paramGrid = ParamGridBuilder().addGrid(als.rank, ranks).addGrid(als.maxIter, [num_iters]).addGrid(als.regParam, reg_params).build()
    crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=num_folds)
    cvModel = crossval.fit(train_data)
    predictions = cvModel.transform(validation_data)
    print ("The root mean squared error for final model is: " + str(evaluator.evaluate(predictions.na.drop())))

In [14]:
# Output - RMSE score
# Input - validation or test data. Validation data has been used to arrive at optimal model parameters. 
# The optimal\best model is then used with the test data
# label_col - original ground truth data
# prediction_col - prediction made by the model
def get_rmse_score(test_data, model, label_col, prediction_col):
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol = label_col, predictionCol=prediction_col)
    rmse = evaluator.evaluate(predictions)
    return rmse

In [15]:
# Output - Model based on multiple input parameters below
# data - Pyspark dataframe containing training data
# num_iters - No of iterations that ALS algorithm would run
# ranks - Latent factor
# train_split - It's a number between 0 and 1 based on which it would split the train & validation/test data
# params is a dictionary with the format {userCol: userId, itemCol: movieId, ratingCol: rating}
def train_model_als(data, num_iters, reg_params, ranks, train_split, params):
    if train_split <=0 and train_split >=1:
        print ("The split is not valid")
        return None
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_params:
            als = ALS(maxIter = num_iters, regParam = reg, userCol = params['userCol'], itemCol = params['itemCol'], ratingCol = params['ratingCol'], rank=rank, seed = 101, coldStartStrategy = "drop")
            (train_data, validation_data) = data.randomSplit([train_split, 1 - train_split])
            model = als.fit(train_data)
            rmse = get_rmse_score(validation_data, model, "rating", "prediction")
            print ("Latent factor: {} L2 reg: {} RMSE: {}".format(rank, reg, rmse))
            if  rmse < min_error:
                min_error = rmse
                final_rank = rank
                final_reg_param = reg
                final_model = model
    print("Final model params:: latent factors {}, regularization = {}, RMSE = {}".format(final_rank, final_reg_param, min_error)) 
    return final_model
        

In [25]:
# Returns the best model by passing multiple hyper-parameters to the function - train_model_als
# Note: For demo purpose, I've used limited hyper-params in the hyper parameter space. Ideally it should be trained on bigger set 
# params is a dictionary with the format {userCol: userId, itemCol: movieId, ratingCol: rating}
def get_best_model(train_data, params):
    num_iterations = 5
    ranks = [5, 10, 20]
    reg_params = [0.001, 0.01, 1.0]
    start_time = time.time()
    # Trains the model based on hyper parameter space
    best_model = train_model_als(train_data, num_iterations, reg_params, ranks, 0.8, params)
    # Uses k-fold cross validation with hyper parameter space for training
    # best_model = train_model_als_csv(train_data, num_iterations, reg_params, ranks, 0.8, params, 5)
    print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))
    return best_model

In [26]:
# Output: Calls the get_best_model function to get the optimal\best model that will be used for prediction
# 80% of the data is used for training and 20% for testing 
(train_data, test_data) = ratings.randomSplit([0.8, 0.2])
best_model = get_best_model(train_data, {'userCol': 'cust_id', 'itemCol': 'book_id', 'ratingCol': 'rating'})

Latent factor: 5 L2 reg: 0.001 RMSE: 4.358051925856775
Latent factor: 5 L2 reg: 0.01 RMSE: 4.103991750133299
Latent factor: 5 L2 reg: 1.0 RMSE: 3.39363214894107
Latent factor: 10 L2 reg: 0.001 RMSE: 3.6223317557045247
Latent factor: 10 L2 reg: 0.01 RMSE: 3.576915439846166
Latent factor: 10 L2 reg: 1.0 RMSE: 3.284929082656895
Latent factor: 20 L2 reg: 0.001 RMSE: 3.40146577924521
Latent factor: 20 L2 reg: 0.01 RMSE: 3.393262669058938
Latent factor: 20 L2 reg: 1.0 RMSE: 3.314399992140276
Final model params:: latent factors 10, regularization = 1.0, RMSE = 3.284929082656895
Total Runtime: 867.94 seconds


In [27]:
# Output: Predicts top n item recommendations for each User
# Input: final model, data - test data, num_items - returns the no of items for each user
def predict_top_n_items(best_model, data, num_items):
    best_model.transform(data)
    return best_model.recommendForAllUsers(num_items)

# Predicts top n user recommendations for each item
# Input: final model, data - test data, num_users - returns the no of user for each item
def predict_top_n_users(best_model, data, num_users):
    best_model.transform(data)
    return best_model.recommendForAllItems(num_users)


In [28]:
# Output: Pyspark dataframe containing Customer Id and list of recommendationsi.e [book_id, rating]
# Input: Model, test data & no of recommendations 
top_5_items_df = predict_top_n_items(best_model, test_data, 5)
top_5_items_df.show(5, False)

+-------+-------------------------------------------------------------------------------------------+
|cust_id|recommendations                                                                            |
+-------+-------------------------------------------------------------------------------------------+
|11141  |[[881, 1.4347032], [172, 1.2989947], [404, 1.1880406], [357, 1.1697912], [992, 1.1179773]] |
|11317  |[[757, 3.4713113], [744, 3.2671514], [674, 3.124947], [37, 3.022014], [655, 2.9587038]]    |
|11458  |[[223, 2.8591177], [43, 2.3309672], [82, 2.242475], [248, 2.198267], [239, 2.1922367]]     |
|11858  |[[944, 2.7403433], [659, 2.717301], [154, 2.4282458], [896, 2.4180899], [981, 2.3945622]]  |
|12046  |[[537, 0.6584311], [629, 0.6234269], [141, 0.6159075], [508, 0.6124771], [465, 0.58440864]]|
+-------+-------------------------------------------------------------------------------------------+
only showing top 5 rows



In [29]:
# Output returns: first element of every lists within a list
# Input: Lists of lists
def return_item_data(list_recommendations):
    temp_list = []
    for item in list_recommendations:
        temp_list.append(item[0])
    return temp_list

# Output: Dataframe with customer Id and list of items. THis removes the raring from the above dataframe  
def get_user_movie_recommendation(function, df):
    get_list_udf = udf(function, ArrayType(IntegerType()))
    user_item_df = df.withColumn('list_items', get_list_udf(df.recommendations)).drop('recommendations')
    return user_item_df
    

In [30]:
# Calls get_user_movie_recommendation and displays the result
user_item_df = get_user_movie_recommendation(return_item_data, top_5_items_df)
user_item_df.show(5, False)

+-------+-------------------------+
|cust_id|list_items               |
+-------+-------------------------+
|11141  |[881, 172, 404, 357, 992]|
|11317  |[757, 744, 674, 37, 655] |
|11458  |[223, 43, 82, 248, 239]  |
|11858  |[944, 659, 154, 896, 981]|
|12046  |[537, 629, 141, 508, 465]|
+-------+-------------------------+
only showing top 5 rows



In [31]:
# RMSE is calculated for the test data. Note the higher RMSE is attributed to the randomness of the data
rmse_test_data = get_rmse_score(test_data, best_model, "rating", "prediction")
print("RMSE for test data is {}".format(rmse_test_data))

RMSE for test data is 3.334720511198201


Scenario 2: Offline Model Training. This is a scenario where few users (whose UserId is not yet created) have provided their preference by giving book name 

In [32]:
# Assigns a user_id  to the new User
def generate_new_id(df, user_id_column):
    max_user_id = df.agg({user_id_column: "max"}).collect()[0]['max(' + user_id_column + ')'] + 1
    return max_user_id

In [33]:
# This data consists of Username and Book Name
user_pref_df = spark.read.load('D:/Code/Recommender system/datasets/user_pref_books.csv', format='csv', header=True, inferSchema=True)

In [34]:
user_pref_df.show(5, False)

+--------+--------------+
|username|book_name     |
+--------+--------------+
|Arvind  |Robert Garcia |
|Mac     |Michael Rivera|
|Alan    |Jaime Palmer  |
|Ram     |Matthew Jones |
|Ankit   |Adam Wright   |
+--------+--------------+
only showing top 5 rows



In [35]:
# This dataframe contains book name and book Id. **** Please pardon me for the book names listed here :) *************
books_df = spark.read.load('D:/Code/Recommender system/datasets/book.csv', format='csv', header=True, inferSchema=True)
books_df.show(3, False)

+-----------------+-------+
|book_name        |book_id|
+-----------------+-------+
|Juan Jimenez     |334    |
|Sharon George    |192    |
|Alexandra Stanley|786    |
+-----------------+-------+
only showing top 3 rows



In [36]:
books_df.count()

50000

In [37]:
# Output: returns dataframe with userId, rating and movieId for the new User. This data will then be used to train the model.
# Adds userId and ItemId to the user preference dataset
def preparing_rating_dataset(ratings_df, user_pref_df, item_df, user_id_column):
    # Add new column: userId
    max_user_id = ratings_df.agg({user_id_column: "max"}).collect()[0]['max(' + user_id_column + ')']
    w = Window.orderBy("userName")
    user_item_df = user_pref_df.withColumn(user_id_column, row_number().over(w) + max_user_id).withColumn("rating", lit(4))
    # Add ItemId
    rating_new_df = user_item_df.join(item_df, user_item_df.book_name == item_df.book_name,'inner').drop('book_name').drop('username')
    return rating_new_df

In [38]:
new_rating_df = preparing_rating_dataset(ratings, user_pref_df,books_df, 'cust_id')
new_rating_df.show(3)

+-------+------+-------+
|cust_id|rating|book_id|
+-------+------+-------+
|  90008|     4|    867|
|  90014|     4|     51|
|  90008|     4|    940|
+-------+------+-------+
only showing top 3 rows



In [39]:
# Retrains the model by adding the new user data to the original train dataset
def retrain_model(train_df, user_pref_df):
    updated_train_df = train_df.drop('timestamp').union(user_pref_df)
    best_model = get_best_model(updated_train_df, {'userCol': 'cust_id', 'itemCol': 'book_id', 'ratingCol': 'rating'})
    return best_model

In [40]:
updated_model = retrain_model(train_data, new_rating_df)

Latent factor: 5 L2 reg: 0.001 RMSE: 8.925741439535976
Latent factor: 5 L2 reg: 0.01 RMSE: 12.105380792286928
Latent factor: 5 L2 reg: 1.0 RMSE: 8.039010302992763
Latent factor: 10 L2 reg: 0.001 RMSE: 10.981460498230357
Latent factor: 10 L2 reg: 0.01 RMSE: 11.69551318316286
Latent factor: 10 L2 reg: 1.0 RMSE: 9.0794303338148
Latent factor: 20 L2 reg: 0.001 RMSE: 8.869672626283183
Latent factor: 20 L2 reg: 0.01 RMSE: 11.483837170466234
Latent factor: 20 L2 reg: 1.0 RMSE: 8.81810943018907
Final model params:: latent factors 5, regularization = 1.0, RMSE = 8.039010302992763
Total Runtime: 725.62 seconds


Scenario3: Cold Start problem. ALS Matrix factorization works for users who have rated the items\books. It will not be handled for new users. The idea is to assign the top 10 popular items to each User and keep shuffling between them while displaying the items to the User

In [41]:
# Returns top n popular items from the ratings dataframe. 
def get_n_popular_items(ratings_df, n):
    item_grouped_df = ratings_df.groupBy('book_id').count().sort('count', ascending=False).take(n)
    popular_book_id = []
    for row in item_grouped_df:
        popular_book_id.append(lit(row['book_id']))
    return popular_book_id
    

In [42]:
n_popular_items = get_n_popular_items(ratings,5)
print (n_popular_items)

[Column<b'662'>, Column<b'234'>, Column<b'247'>, Column<b'374'>, Column<b'739'>]


In [43]:
# The file consists of new set of users with customer Id and Customer name
new_users_df = spark.read.load('D:/Code/Recommender system/datasets/new_users.csv', format='csv', header=True, inferSchema=True)

In [44]:
new_users_df.show(2, False)

+-------+-------------+
|cust_id|cust_name    |
+-------+-------------+
|95022  |Thomas Thomas|
|96635  |Renee Brooks |
+-------+-------------+
only showing top 2 rows



In [45]:
# returns list of popular books against each User
new_users_recommended_books=new_users_df.withColumn("recommended_books", array(n_popular_items))
new_users_recommended_books.show(5, False)

+-------+---------------+-------------------------+
|cust_id|cust_name      |recommended_books        |
+-------+---------------+-------------------------+
|95022  |Thomas Thomas  |[662, 234, 247, 374, 739]|
|96635  |Renee Brooks   |[662, 234, 247, 374, 739]|
|96201  |Shannon Perez  |[662, 234, 247, 374, 739]|
|95788  |Gloria Williams|[662, 234, 247, 374, 739]|
|96296  |Craig Lopez    |[662, 234, 247, 374, 739]|
+-------+---------------+-------------------------+
only showing top 5 rows

