In [28]:
import os
import sys
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark import SparkContext

from pyspark.mllib.recommendation import ALS
import math
import pyspark.sql
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
import pyspark.sql.functions as func
import string
import pickle
import json

In [29]:
# Spark session configuration
spark = SparkSession \
    .builder \
    .appName("Book Recommender") \
    .config("spark.recommender", "1") \
    .getOrCreate()

In [30]:
conf = SparkConf().setAppName("Book Recommender") \
                  .set("spark.executor.heartbeatInterval", "200000") \
                  .set("spark.network.timeout", "300000") # Increasing the network timeout 
sc = SparkContext.getOrCreate(conf)
sqlcontext = SQLContext(sc)

Loading the book dataset which is been preprocessed and converted to .csv file.

In [31]:
# Loading the data with userid, title, ratings
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('user_book_rating.csv')


In [32]:
# Loading the data with 500 unique users for testing
unique_user_data = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('unique_user_data.csv')

The user_id in the dataset is in base 16, but the ALS train function need the user_id in int base 10. Therefore converting user ids to have a unique int base 10 number using the StringIndexer() function and added as the new column 'int_user_id'.

In [33]:
stringIndexer = StringIndexer(inputCol="user_id", outputCol="int_user_id")
model = stringIndexer.fit(ratings_df)
ratings_df = model.transform(ratings_df)

In [34]:
ratings_df.show(5)

+--------------------+--------+------+-----------+--------------------+
|             user_id| book_id|rating|int_user_id|               title|
+--------------------+--------+------+-----------+--------------------+
|e258cbe5ccb791a14...|  789813|     3|     290145|No Colder Place (...|
|c6d23cb41fdcd4a5d...|19291332|     4|     254922|One in a Million ...|
|ba915128f7275e70f...|  261122|     5|     239184|The Dead Girls' D...|
|8bfbf1b344efdb2ca...| 9361589|     3|     179483|    The Night Circus|
|f15b6dcc2311ab919...|  234225|     4|     309302|Dune (Dune Chroni...|
+--------------------+--------+------+-----------+--------------------+
only showing top 5 rows



In [35]:
# Created a dataframe having about 500 users to get their recommendations
unique_user_data.show(5)

+--------------------+-----------+
|             user_id|int_user_id|
+--------------------+-----------+
|a6dcd679dca6344a0...|     213915|
|a16922af070d21d41...|     207010|
|c9583a77ed90d0461...|     258083|
|a9356165bd3332802...|     216948|
|632432e4fba80ee05...|     127095|
+--------------------+-----------+
only showing top 5 rows



In [36]:
# Forming a dataframe with only the required columns
required_df = ratings_df[['int_user_id', 'book_id', 'rating']]
required_df.show(5)

+-----------+--------+------+
|int_user_id| book_id|rating|
+-----------+--------+------+
|     290145|  789813|     3|
|     254922|19291332|     4|
|     239184|  261122|     5|
|     179483| 9361589|     3|
|     309302|  234225|     4|
+-----------+--------+------+
only showing top 5 rows



In [47]:
# Number of records in the dataset

required_df.count()


7567357


In [37]:
#splitting data for training, testing and validation

(trainingData,validationData,testData) = required_df.randomSplit([0.6,0.2,0.2]) 

validation_for_predict = validationData.select('int_user_id','book_id')
test_for_predict = testData.select('int_user_id','book_id')


In [38]:
testData.show(5)

+-----------+--------+------+
|int_user_id| book_id|rating|
+-----------+--------+------+
|         19|33954483|     4|
|         51|20821149|     5|
|        117|25095546|     5|
|        176|26244548|     4|
|        201|   47683|     4|
+-----------+--------+------+
only showing top 5 rows



In [39]:
# Best parameters obtained after several trials
seed = 15 # For initial matrix factorization model
iterations = 10 # after trying 5, 15, 20
reg = 0.09 #upon trails found this values as optimum for regularization
rank = 20 # [4, 8, 12, 20] number of factors to choose for matrix factorization


In [41]:

# Converting the dataframe to RDD using df.RDD

model = ALS.train(trainingData, rank, seed=seed, iterations=iterations,
                      lambda_=reg)
    
predictions = model.predictAll(validation_for_predict.rdd).map(lambda r: ((r[3], r[1]), r[2]))
ates_and_preds = validationData.rdd.map(lambda r: ((str(r[0]), int(r[1])), float(r[2]))).join(predictions)


In [42]:

# Predicting rating of all books for the users of test dataset
predictions_test = model.predictAll(test_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))

predictions_test.take(5)


[((158480, 15808474), 0.0),
 ((158480, 547448), 0.0),
 ((158480, 23512999), 0.0),
 ((146272, 10429092), 0.17743873563377147),
 ((24360, 410615), -1.0944712442902218)]

**Rating Prediction**


Training the model with the whole data, and getting the top five movie recmmendatons for randomly chosen 500 users. The output is saved in the dictionary with user_id as the key and the recommendations with predicted ratings as the value. This dictionary is pickled and used further in the cross domain recommender to show top five books and movies recommended to the chosen / test user_id.

In [43]:
def getRecommendations(user,testDf,trainDf,model):
    
    userDf = testDf.filter(testDf.int_user_id == user)
    
    # Passing only the unread books, by subtracting the read books of the user
    mov = trainDf.select('book_id').subtract(userDf.select('book_id'))
    
    # covert our dataframe into RDD as predicrtAll() takes only RDD format for arguments
    pred_rat = model.predictAll(mov.rdd.map(lambda x: (user, x[0]))).collect()
    
    # Get the top five recommendations
    recommendations = sorted(pred_rat, key=lambda x: x[2], reverse=True)[:5]

    return recommendations

#Creating a dictionary to hold the user_id as key and recommendations as value
output_dict = {}

# Call getRecommendations method
for user in unique_user_data.select('int_user_id').collect():
    
    
    derived_rec = getRecommendations(user[0],testData,trainingData,model)
    
    user_name = unique_user_data.filter(unique_user_data.int_user_id==user[0]).select('user_id')
    output_dict[user_name.collect()[0]['user_id']] = []
    #print(user_name.collect())
    for i in range(len(derived_rec)):
        
        # to get the title of the book from the book_id
        result = ratings_df.filter(ratings_df.book_id==derived_rec[i][1]).select('title')
        # appending the recommendations to the dictionary value
        output_dict[user_name.collect()[0]['user_id']].append((result.rdd.flatMap(list).first(), derived_rec[i][2]))


When the dictionary is searched by key = user_id of interest, the recommended movies and the respective predicted ratings are shown. 

In [44]:
# Getting book recommendations for the test user
id_ = '8438f7cb6879b1e684d944e8afd50ec1'
val = output_dict.get(id_) # key for the dictionary is the user_id
print(val)

[('Queen of Shadows (Throne of Glass, #4)', 4.950911418089894), ('Wallbanger (Cocktail, #1)', 4.185194464252074), ('Harry Potter and the Prisoner of Azkaban (Harry Potter, #3)', 4.1137276488376795), ('Tough Luck (Hard Rock Roots, #3)', 3.8615057209867754), ('Roman Crazy', 3.808723370533395)]


In [45]:
# Converting the output dictionary to json file
with open('book_rec.txt', 'w') as file:
    file.write(json.dumps(output_dict))

Pickling the result of recommendations, to show further in the cross domain recommender

In [46]:
# Pickling the dictionery to use the values further
with open('book_rec.pickle', 'wb') as handle:
    pickle.dump(output_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open('book_rec.pickle', 'rb') as handle:
    b = pickle.load(handle)

print (output_dict == b)

True


**References:**


https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html <br>
https://sjsu.instructure.com/files/57380718/download?download_frd=1 <br>
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html <br>
