In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.functions import count, col
import math
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row


In [2]:
spark = SparkSession.builder.master("local")\
        .appName("Recommendation Engine")\
        .getOrCreate()

In [3]:
 # load ratings csv file
ratingSchema =StructType([StructField("user_id", IntegerType(), True),
                 StructField("book_id", IntegerType(), True),
                 StructField("rating", DoubleType(), True)])

df = spark.read.load("bds/ratings.csv",
                     format="csv", sep=",", header="true", schema=ratingSchema)
df = df.cache()
df.show(n=5, truncate=False)
df.printSchema()

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|1      |258    |5.0   |
|2      |4081   |4.0   |
|2      |260    |5.0   |
|2      |9296   |5.0   |
|2      |2318   |3.0   |
+-------+-------+------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: double (nullable = true)



In [4]:
# Q1 : find out unique users 
unq_user_count = df.select("user_id").drop_duplicates().count()
print(unq_user_count)

53424


In [4]:
# Q1 : find out unique books
unq_book_count = df.select("book_id").drop_duplicates().count()
print(unq_book_count)

10000


In [5]:
# Q2: percentage of books which got <3 rating ever.
books_with_rating_le3 = df.filter("rating <= 3").groupBy("book_id").agg(F.count("book_id")).drop_duplicates()
print('% of books with rating <=3 :: ', (books_with_rating_le3.count()/unq_book_count)*100, '%')


% of books with rating <=3 ::  100.0 %


In [6]:
# Q3 :
# since traing on whole data to find best RMSE is time consuming, we are taking only 10% of data
# and dividing it on training and test data to train & test the model.

small_df = df.limit(500000)
splits = small_df.randomSplit([0.7, 0.3], 1)
training_data = splits[0]
test_data = splits[1]

print(small_df.count(), splits[0].count(), splits[1].count())
print(small_df.select("user_id").drop_duplicates().count())
print(small_df.select("book_id").drop_duplicates().count())

500000 350115 149885
6923
6575


In [7]:
# Q3 continued. 
# we will train the model on different values of ranks, reg param & iterations to get least RMSE. 
# then the values obtained for rank, iterations & reg param will be used to create model on full data.
# this approach is taken to reduce the time taken to fine tune the model to reduce th RMSE.

# technically this is done using 3 nested for loops so that we can check RMSE on each combination of ranks, iterations,
# and reg params.
# after each iteration, RMSE is checked with previous value, if it is less then the values of rank, iterations 
# & reg params are saved. if it is more or unchanged, then continue to next combinations.

import time
seed = 5

iterations =[4, 10, 20]  
regularization_parameters = [0.01, 0.04, 0.08, 0.1]
ranks = [2, 4, 8, 12]

best_rmse = None
best_rank= None
best_reg_param = None 
best_iter = None

require_training = True  # make it True to Train the model on different values

if require_training:
    for r in ranks:
        for rp in regularization_parameters:
            for itr in iterations:
                start= time.time()
                
                print('trying with combination of rank = ', r, ' reg params = ', rp, ' iteratins = ', itr)
                
                als = ALS(maxIter=itr, regParam=rp, rank=r, userCol="user_id", 
                          itemCol="book_id", ratingCol="rating", coldStartStrategy="drop")
                model = als.fit(training_data)

                # Evaluate the model by computing the RMSE on the test data
                predictions = model.transform(test_data)

                evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
                rmse = evaluator.evaluate(predictions)
                print("Root-mean-square error = " + str(rmse))

                if best_rmse is None or rmse < best_rmse:
                    best_rank= r
                    best_reg_param = rp
                    best_iter = itr
                    best_rmse = rmse

                print('rmse: ', best_rmse, 'rank: ', r, 'reg param: ', rp, ' iterations: ', itr, ' time taken: ', time.time()- start)
else:
    best_rank= 2
    best_reg_param = 0.08
    best_iter = 20


trying with combination of rank =  2  reg params =  0.01  iteratins =  4
Root-mean-square error = 1.0360830548133249
rmse:  1.0360830548133249 rank:  2 reg param:  0.01  iterations:  4  time taken:  298.15836238861084
trying with combination of rank =  2  reg params =  0.01  iteratins =  10
Root-mean-square error = 0.9530167235594668
rmse:  0.9530167235594668 rank:  2 reg param:  0.01  iterations:  10  time taken:  199.80846166610718
trying with combination of rank =  2  reg params =  0.01  iteratins =  20
Root-mean-square error = 0.9114236472181662
rmse:  0.9114236472181662 rank:  2 reg param:  0.01  iterations:  20  time taken:  261.4558551311493
trying with combination of rank =  2  reg params =  0.04  iteratins =  4
Root-mean-square error = 0.9093694916655924
rmse:  0.9093694916655924 rank:  2 reg param:  0.04  iterations:  4  time taken:  293.6180076599121
trying with combination of rank =  2  reg params =  0.04  iteratins =  10
Root-mean-square error = 0.8847762516515298
rmse:  0

Root-mean-square error = 1.100059968535926
rmse:  0.8812732507680017 rank:  12 reg param:  0.01  iterations:  20  time taken:  195.40132880210876
trying with combination of rank =  12  reg params =  0.04  iteratins =  4
Root-mean-square error = 0.9357421696478719
rmse:  0.8812732507680017 rank:  12 reg param:  0.04  iterations:  4  time taken:  151.768221616745
trying with combination of rank =  12  reg params =  0.04  iteratins =  10
Root-mean-square error = 0.9443112770575045
rmse:  0.8812732507680017 rank:  12 reg param:  0.04  iterations:  10  time taken:  167.32115292549133
trying with combination of rank =  12  reg params =  0.04  iteratins =  20
Root-mean-square error = 0.9499422727507031
rmse:  0.8812732507680017 rank:  12 reg param:  0.04  iterations:  20  time taken:  238.44780158996582
trying with combination of rank =  12  reg params =  0.08  iteratins =  4
Root-mean-square error = 0.8971213625680267
rmse:  0.8812732507680017 rank:  12 reg param:  0.08  iterations:  4  time

In [8]:
#Q3:  print best RMSE, rank, iterations and reg param
print('best rmse :: ', best_rmse, ' rank :: ', best_rank, ' reg_params :: ', best_reg_param, ' best iteration :: ', best_iter )

best rmse ::  0.8812732507680017  rank ::  8  reg_params ::  0.1  best iteration ::  10


In [20]:
# Q4.a  find out top 5 recommended books for each user

# using old library since it makes flattening of the records a lot easier.
from pyspark.mllib.recommendation import ALS

# train on all the records now using the parameters like rank, number of iterations & regularisation parameter 
#  obtained from training with least RMSE
complete_model = ALS.train(df, seed=seed,  rank=best_rank,
                                   iterations=best_iter, lambda_=best_reg_param)

#top 5 recommended books for each user
recommended_books = complete_model.recommendProductsForUsers(5)
recommended_books = recommended_books.flatMap(lambda d: Row(d[1][0], d[1][1], d[1][2], d[1][3], d[1][4]))
recommended_books_df = spark.createDataFrame(recommended_books, ratingSchema) # convert from rdd to dataframe
recommended_books_df = recommended_books_df.orderBy("rating", ascending=False)

books_df = spark.read.load("bds/books.csv",
                     format="csv", sep=",", header="true").select("book_id", "title")
books_df = books_df.withColumn("book_id", F.col("book_id").cast(IntegerType()))

# join with books on book_id to get name of recommended books
recommended_title = recommended_books_df.alias("recommended_books_df").join(books_df.alias("books_df"),
                                                    (recommended_books_df.book_id == books_df.book_id))

recommended_title = recommended_title.select("user_id", "title")

# group by user id and collect recommended books as list
gdf = recommended_title.groupBy("user_id").agg(F.collect_list("title"))

# display for 100 users the book recommended by model
gdf.show(n=100, truncate=False)


+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|collect_list(title)                                                                                                                                                                                                                                                                                                                                  |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [19]:
#  IGNORE THIS BLOCK. THIS WAS TO ENSURE THAT BOTH OLD LIBRARY & NEW LIBRARY GIVES SIMILAR RECOMMENDATIONS

# from pyspark.ml.recommendation import ALS
# als = ALS(maxIter=itr, regParam=rp, rank=r, userCol="user_id", 
#           itemCol="book_id", ratingCol="rating", coldStartStrategy="drop")
# model = als.fit(df)

#df2 = model.recommendForAllUsers(5)
#df2 = df2.cache()

#df2.show(n=3, truncate=False)


# df2.filter("user_id==148").show(truncate=False)
# recommended_books_df.filter("user_id==148").orderBy("rating", ascending=False).show(truncate=False)

+-------+----------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                               |
+-------+----------------------------------------------------------------------------------------------+
|148    |[[3628, 4.347618], [6920, 4.3459067], [8946, 4.3413296], [9566, 4.3284607], [6590, 4.3181863]]|
+-------+----------------------------------------------------------------------------------------------+

+-------+-------+-----------------+
|user_id|book_id|rating           |
+-------+-------+-----------------+
|148    |9566   |4.461114568364309|
|148    |6920   |4.449642645691551|
|148    |3628   |4.437684306284717|
|148    |6590   |4.423544299399061|
|148    |5207   |4.360593674803507|
+-------+-------+-----------------+



In [21]:
# Q4.b  find out top 5 recommended user for each book title

recommended_users = complete_model.recommendUsersForProducts(5)
recommended_users = recommended_users.flatMap(lambda d: Row(d[1][0], d[1][1], d[1][2], d[1][3], d[1][4]))
recommended_users_df = spark.createDataFrame(recommended_users, ratingSchema) # convert from rdd to dataframe
recommended_users_df = recommended_users_df.orderBy("rating", ascending=False)

gdf = recommended_users_df.select("user_id", "book_id").groupBy("book_id").agg(F.collect_list("user_id").alias("users"))

books_df = spark.read.load("bds/books.csv",
                     format="csv", sep=",", header="true").select("book_id", "title")
books_df = books_df.withColumn("book_id", F.col("book_id").cast(IntegerType()))

recommended_users = gdf.alias("gdf").join(books_df.alias("books_df"), (gdf.book_id == books_df.book_id))

recommended_users = recommended_users.select("title", "users")

# display for 100 books the recommended users
recommended_users.show(n=100, truncate=False)



+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+
|title                                                                                                                                                                  |users                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+
|Revenge of the Spellmans (The Spellmans, #3)                                                                                                                           |[38076, 30440, 41375, 41623, 38217]|
|Dragonfly in Amber (Outlander, #2)                                                                                                                                     |[38076,

In [23]:
#Q5: find out what books in to_read list of users matches with model recommended books for users(1).
to_read_df = spark.read.load("bds/to_read.csv",
                     format="csv", sep=",", header="true")
to_read_df.show(n=3)
to_read_df.filter("user_id == 1").show()

#join on model recommended books and users to read to find out model predictability
matched_df = recommended_books_df.alias("recommended_books_df").join(to_read_df.alias("to_read_df"),
            (recommended_books_df.user_id == to_read_df.user_id) &
            (recommended_books_df.book_id == to_read_df.book_id))

matched_grp = matched_df.groupBy("recommended_books_df.user_id")\
                    .agg(F.count("recommended_books_df.book_id").alias("matched_books"))

#Q5 check for user id 1
matched_grp.filter("user_id ==1").show(n=10, truncate=False)


# find out user id where more than 1 books recommended by model matches with user to_read list
matched_grp.filter("matched_books > 1").show()


+-------+-------+
|user_id|book_id|
+-------+-------+
|      9|      8|
|     15|    398|
|     15|    275|
+-------+-------+
only showing top 3 rows

+-------+-------+
|user_id|book_id|
+-------+-------+
|      1|   1874|
|      1|    235|
|      1|   1198|
|      1|    533|
|      1|    112|
|      1|   2058|
|      1|   3334|
+-------+-------+

+-------+-------------+
|user_id|matched_books|
+-------+-------------+
+-------+-------------+

+-------+-------------+
|user_id|matched_books|
+-------+-------------+
|  20497|            2|
|   9405|            2|
|  51414|            2|
|  29847|            2|
|  51910|            2|
|  51895|            2|
|  47832|            2|
|  11144|            2|
|  16247|            2|
|  41748|            2|
|   4506|            2|
|  30785|            2|
|   9576|            2|
|   9987|            3|
|  26780|            2|
|  17867|            2|
|  51810|            2|
|  24902|            2|
|  19312|            2|
|  49950|            2|
+