## Overview

In this notebook, a recommendation system to recommend books to user is implemented using Apache Spark and Spark ML library's Alternating Least Square's method in Python.

Load the data files and create tables

In [3]:
# File location and type
file_location = "/FileStore/tables/BX_Book_Ratings-05a61.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"
load_mode = "DROPMALFORMED"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("mode",load_mode) \
  .load(file_location)

df.show(n=10)

In [4]:
# Create a view or table

temp_table_name = "bx_book_ratings"

df.createOrReplaceTempView(temp_table_name)

In [5]:
# File location and type
file_location = "/FileStore/tables/BX_Books-c72a2.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"
load_mode = "DROPMALFORMED"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("mode",load_mode) \
  .load(file_location)

df.show(n=10)

In [6]:
# Create a view or table

temp_table_name = "bx_books"

df.createOrReplaceTempView(temp_table_name)

In [7]:
# File location and type
file_location = "/FileStore/tables/BX_Users-53089.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"
load_mode = "DROPMALFORMED"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("mode",load_mode) \
  .load(file_location)

df.show(n=10)

In [8]:
# Create a view or table

temp_table_name = "bx_users"

df.createOrReplaceTempView(temp_table_name)

Get the famous books which have atleast 50 ratings and average rating greater than five. Rank the books per author and get the top book per author

In [10]:
%sql
WITH books_author AS (
SELECT bo.`Book-Title`, bo.`ISBN`, COUNT(br.`Book-Rating`) number, AVG(br.`Book-Rating`) average, bo.`Book-Author`,
  RANK() OVER(PARTITION BY bo.`Book-Author` ORDER BY COUNT(br.`Book-Rating`) DESC) book_rank
FROM bx_book_ratings br INNER JOIN bx_books bo 
ON bo.ISBN = br.ISBN
GROUP BY bo.`Book-Title`, bo.`Book-Author`, bo.`ISBN`
HAVING COUNT(br.`Book-Rating`) > 50 AND AVG(br.`Book-Rating`) > 5
ORDER BY number DESC
)

SELECT * FROM books_author ba WHERE ba.book_rank = 1
ORDER BY ba.average DESC
--LIMIT 10

Book-Title,ISBN,number,average,Book-Author,book_rank
Free,1844262553,54,7.962962962962963,Paul Vincent,1
"The Fellowship of the Ring (The Lord of the Rings, Part 1)",0618002227,63,6.206349206349207,J. R. R. Tolkien,1
The Stand (The Complete and Uncut Edition),0385199570,57,6.175438596491228,Stephen King,1
Griffin & Sabine: An Extraordinary Correspondence,0877017883,72,6.041666666666667,Nick Bantock,1
The Little Prince,0156528207,79,5.79746835443038,Antoine de Saint-Exupéry,1
The Cat in the Hat,039480001X,52,5.730769230769231,Dr. Seuss,1
Harry Potter and the Order of the Phoenix (Book 5),043935806X,334,5.57185628742515,J. K. Rowling,1
The Godfather,0451167716,54,5.518518518518518,Mario Puzo,1
The Secret Life of Bees,0670894605,96,5.5,Sue Monk Kidd,1
Fear and Loathing in Las Vegas : A Savage Journey to the Heart of the American Dream,0679785892,57,5.385964912280702,HUNTER S. THOMPSON,1


Combine all the files and create a table for easier analysis

In [12]:
bru_df = spark.sql("SELECT b.ISBN, b.`Book-Title`, b.`Book-Author`, b.`Year-Of-Publication`, b.Publisher, b.`Image-URL-S`, b.`Image-URL-M`, b.`Image-URL-L`, r.`User-ID`, r.`Book-Rating`, u.Location, u.Age FROM bx_books b LEFT JOIN bx_book_ratings r ON b.ISBN=r.ISBN LEFT JOIN bx_users u ON u.`User-ID`=r.`User-ID`")
bru_df.show(n=5)

In [13]:
bru_df.createOrReplaceTempView("CombinedData")
data_df = spark.sql("SELECT `User-ID`, INT(`Book-Rating`), ISBN FROM CombinedData")
data_df.show(n=5)
#data_df.printSchema()

###Collaborative Filtering

In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, lit, avg

Convert String type into integer using StringIndexer to build ALS model

In [17]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep') for column in list(set(data_df.columns)-set(['Book-Rating'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(data_df).transform(data_df)
transformed.show(5)

Drop NAs

In [19]:
print(f"Number of rows before dropping NAs: {transformed.count()}")
transformed = transformed.na.drop()
print(f"Number of rows after dropping NAs: {transformed.count()}")

In [20]:
transformed.createOrReplaceTempView('trans')

Replace String ids of books and users with integer ids for easy analysis and save the csv file

In [22]:
data_write = spark.sql("SELECT br.`Book-Title`, br.`Book-Author`, br.`Year-Of-Publication`, br.Publisher, br.`Image-URL-S`, br.`Image-URL-M`, br.`Image-URL-L`,  br.`Book-Rating`, tr.`User-ID_index`, tr.`ISBN_index` FROM CombinedData br, trans tr WHERE br.`User-ID`= tr.`User-ID` AND br.ISBN = tr.ISBN")
data_write.show(5)

In [23]:
data_write.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("sep",";").save("dbfs:/FileStore/df/data")

In [24]:
data_write.createOrReplaceTempView('new_data')

Check the top books with atleast 50 ratings and average rating greater than 5

In [26]:
%sql
WITH books_author AS (
SELECT nd.`Book-Title`, nd.`ISBN_index`, COUNT(nd.`Book-Rating`) number, AVG(nd.`Book-Rating`) average, nd.`Book-Author`,
  RANK() OVER(PARTITION BY nd.`Book-Author` ORDER BY COUNT(nd.`Book-Rating`) DESC) book_rank
FROM new_data nd 
GROUP BY nd.`Book-Title`, nd.`Book-Author`, nd.`ISBN_index`
HAVING COUNT(nd.`Book-Rating`) > 50 AND AVG(nd.`Book-Rating`) > 5
ORDER BY number DESC
)

SELECT * FROM books_author ba WHERE ba.book_rank = 1
ORDER BY ba.average DESC

Book-Title,ISBN_index,number,average,Book-Author,book_rank
Free,1939.0,54,7.962962962962963,Paul Vincent,1
"The Fellowship of the Ring (The Lord of the Rings, Part 1)",1526.0,63,6.206349206349207,J. R. R. Tolkien,1
The Stand (The Complete and Uncut Edition),1772.0,57,6.175438596491228,Stephen King,1
Griffin & Sabine: An Extraordinary Correspondence,1225.0,72,6.041666666666667,Nick Bantock,1
The Little Prince,1060.0,79,5.79746835443038,Antoine de Saint-Exupéry,1
The Cat in the Hat,1993.0,52,5.730769230769231,Dr. Seuss,1
Harry Potter and the Order of the Phoenix (Book 5),67.0,334,5.57185628742515,J. K. Rowling,1
The Godfather,1895.0,54,5.518518518518518,Mario Puzo,1
The Secret Life of Bees,767.0,96,5.5,Sue Monk Kidd,1
Fear and Loathing in Las Vegas : A Savage Journey to the Heart of the American Dream,1749.0,57,5.385964912280702,HUNTER S. THOMPSON,1


Split into training and testing data

In [28]:
(training, test, valid) = transformed.randomSplit([0.6,0.2,0.2], seed=42)

In [29]:
print(training.count())
print(valid.count())
print(test.count())

In [30]:
train_df = training.cache()
test_df = test.cache()
valid_df = valid.cache()

In [31]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [32]:
als = ALS()

seed = 42
ranks = [2, 6, 12]
reg_params = [0.1, 0.5, 1]
errors = [0, 0, 0, 0, 0, 0, 0, 0, 0]
models = [0, 0, 0, 0, 0, 0, 0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
best_reg = -1
best_ind = -1

als.setMaxIter(5) \
   .setUserCol("User-ID_index") \
   .setItemCol("ISBN_index") \
   .setRatingCol("Book-Rating") \
   .setColdStartStrategy("drop") \
   .setNonnegative(True) \
   .setImplicitPrefs(False) \
   .setSeed(seed) 

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Book-Rating", metricName="rmse")

for reg_param in reg_params:
  for rank in ranks:
    als.setRegParam(reg_param) \
       .setRank(rank)
    
    model = als.fit(train_df)
    predict_df = model.transform(valid_df)
    error = reg_eval.evaluate(predict_df)
    errors[err] = error
    models[err] = model
    print(f'For rank: {rank} and reg_param: {reg_param}, error is {error}')
    if error < min_error:
      min_error = error
      best_rank = rank
      best_reg = reg_param
      best_ind = err
    err += 1
    
als.setRank(best_rank) \
   .setRegParam(best_reg)
print(f'Best rank: {best_rank}, Best reg: {best_reg}, Best model index: {best_ind}')

best_model = models[best_ind]

In [33]:
predictions = best_model.transform(test_df)
predictions.show(10)

In [34]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Book-Rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Test Root-mean-square error = " + str(rmse))

In [35]:
avg_rating_df = train_df.agg(avg(col("Book-Rating")))

# Extract the average rating value. (This is row 0, column 0.)
training_avg_rating = avg_rating_df.collect()[0][0]

print(f'The average rating for movies in the training set is {training_avg_rating}')

# Add a column with the average rating
test_for_avg_df = test_df.withColumn('prediction', lit(training_avg_rating))
test_for_avg_df.show(5)

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = evaluator.evaluate(test_for_avg_df)

print(f"Test RMSE on average set is {test_avg_RMSE}")


Top Ten recommendations for all users

In [37]:
user_recs = model.recommendForAllUsers(10)
user_recs.show(10)

Predict recommendations for new user

In [39]:
new_user_id = 0
new_user_ratings = [(0, 1939, 6), (0, 1526, 7), (0, 1772, 3), (0, 67, 8), (0, 1895, 6), (0, 767, 4)]
df = spark.createDataFrame(new_user_ratings, ['User-ID_index', 'ISBN_index','Book-Rating'])
df.show()                        

In [40]:
train_new_df = train_df.select(["User-ID_index","ISBN_index","Book-Rating"]).unionAll(df)
print(train_new_df.count())

In [41]:
als.setMaxIter(5) \
   .setPredictionCol("prediction") \
   .setUserCol("User-ID_index") \
   .setItemCol("ISBN_index") \
   .setRatingCol("Book-Rating") \
   .setColdStartStrategy("drop") \
   .setNonnegative(True) \
   .setImplicitPrefs(False) \
   .setSeed(seed) \
   .setRank(best_rank) \
   .setRegParam(best_reg) 
    
new_model = als.fit(train_new_df)

In [42]:
new_preds = new_model.transform(test_df)
test_rmse_new = evaluator.evaluate(new_preds)
print(f'Test RMSE with new user ratings: {test_rmse_new}')

In [43]:
new_user_ratings_ids = [x[1] for x in new_user_ratings] 
new_user_unrated_df = transformed.filter(~col('ISBN_index').isin(new_user_ratings_ids)).select(['User-ID_index','ISBN_index']).dropDuplicates(['ISBN_index'])
new_df = new_user_unrated_df.withColumn('User-ID_index', lit(new_user_id))
new_df.show(10)

In [44]:
spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )

In [45]:
new_user_preds = new_model.transform(new_df)
new_user_preds.orderBy('prediction',ascending=False).show(10)

In [46]:
new_user_preds.createOrReplaceTempView('nu_recomms')


In [47]:
%sql
SELECT DISTINCT nu.`ISBN_index`, nd.`Book-Title`, nu.prediction 
FROM nu_recomms nu LEFT JOIN new_data nd 
ON nu.`ISBN_index` = nd.`ISBN_index`
ORDER BY nu.prediction DESC
LIMIT 25

ISBN_index,Book-Title,prediction
189831.0,The Godfather,10.991946
55107.0,She Who Hears the Sun,10.991946
47029.0,Acts of God (Book Three of The Christ Clone Trilogy),10.991946
206706.0,Valley of Horses (Thorndike Large Print Basic Series),10.991946
127583.0,"Increase Your Web Traffic In a Weekend, Revised Edition",10.991946
119412.0,Even Angels Ask: A Journey to Islam in America,10.991946
123619.0,The Song of Eve: Mythology and Symbols of the Goddess,10.927781
195346.0,Gulliver's Travels (The World's Classics),10.588649
122664.0,Deadwood,10.588649
164376.0,Love Pirate (Volume 70),10.58313


In [48]:
als = ALS()

seed = 42
best_rank = 12
best_reg = 1

als.setMaxIter(5) \
   .setPredictionCol("prediction") \
   .setUserCol("User-ID_index") \
   .setItemCol("ISBN_index") \
   .setRatingCol("Book-Rating") \
   .setColdStartStrategy("drop") \
   .setNonnegative(True) \
   .setImplicitPrefs(False) \
   .setSeed(seed) \
   .setRank(best_rank) \
   .setRegParam(best_reg) 

In [49]:
full_model = als.fit(transformed)

In [50]:
sparkTransformed = full_model.transform(transformed)
display(sparkTransformed)

User-ID,Book-Rating,ISBN,ISBN_index,User-ID_index,prediction
170513,0,0060987529,148.0,463.0,2.3855572
28523,0,0060987529,148.0,392.0,1.16596
78783,5,0060987529,148.0,31.0,0.5156915
231237,10,0060987529,148.0,516.0,2.4990902
69648,0,0060987529,148.0,9383.0,2.3373637
174304,0,0060987529,148.0,85.0,1.4919932
61042,9,0060987529,148.0,7644.0,4.106174
142121,7,0060987529,148.0,879.0,2.1210964
76814,0,0060987529,148.0,18334.0,2.7850966
174375,8,0060987529,148.0,6176.0,2.4178317


Save model trained on whole data

In [52]:
model_path = "/FileStore/models/recomm"

In [53]:
full_model.save(model_path)

Load model and test it

In [55]:
from pyspark.ml.recommendation import ALSModel

In [56]:
test_model = ALSModel.load(model_path)

In [57]:
test_df.show(n=5)

In [58]:
test_results = test_model.transform(test_df)
test_results.show(n=10)

In [59]:
# Generate top 10 user recommendations for each book
#book_recs = model.recommendForAllItems(10)
#book_recs.show()