**Goodbooks Recommendation Engine**

Yiyan Zhang

**1. Business Understanding**

The purpose of this project is to build a simple recommendation engine to recommend books based on reader's ratings.

**2. Data Understanding**

The dataset is from [Kaggle](https://www.kaggle.com/zygmunt/goodbooks-10k).

The dataset contains the following files:
* ratings.csv: Users' ratings for books that they have read.
* books.csv: Metadata for each book.
* to_read.csv: Book marked "to read" by users, like a reading wish list.
* book_tags.csv: User assigned tags/generes to books.
* tags.csv: Translatre tag IDs to names.

**Data Importing**

In [5]:
# import ratings data
ratings = spark.read.csv('/FileStore/tables/ratings.csv',
                        header=True,
                        sep=',',
                        inferSchema=True)
ratings.printSchema()

In [6]:
# import to_read data
to_read = spark.read.csv('/FileStore/tables/to_read.csv',
                        header=True,
                        sep=',',
                        inferSchema=True)
to_read.printSchema()

In [7]:
#import books data
books = spark.read.csv('/FileStore/tables/books.csv',
                        header=True,
                        sep=',',
                        inferSchema=True)
books.printSchema()

The "original_title" column in books.csv has strings with commas in them, when importing using comma as delimiter, some entries in the resulted tables were misaligned to the column header.

I tried to format the CSV file and put quotation marks around titles, but couldn't solve the problem. I also tried saving the file as a TSV file and used tab as delimiter, but still couldn't achieve what I wanted.

The quotation marks existed after uploding the CSV file to Databricks FileStore but were missing after importing into the cluster attached to the notebook.

However this problem did not affect the modelling process.

In [9]:
# import book_tags data
book_tags = spark.read.csv('/FileStore/tables/book_tags.csv',
                        header=True,
                        sep=',',
                        inferSchema=True)
book_tags.printSchema()

In [10]:
#import tags data
tags = spark.read.csv('/FileStore/tables/tags.csv',
                        header=True,
                        sep=',',
                        inferSchema=True)
tags.printSchema()

Once data was imported into Databricks, I conducted exploratory data analysis (EDA) on the datasets to get familiar with the data, to identify data sanity issues, and to discover first-hand insights.

**EDA on ratings**

In [13]:
import pandas as pd

# convert ratings to pandas dataframe
ratings_df = ratings.toPandas()
ratings_df.head()

Unnamed: 0,book_id,user_id,rating
0,1,314,5
1,1,439,3
2,1,588,5
3,1,1169,4
4,1,1185,4


In [14]:
# check uniques and missing values
total = ratings_df['rating'].count()
b_num = ratings_df['book_id'].nunique()
u_num = ratings_df['user_id'].nunique()
na = ratings_df.isnull().values.any()

# average ratings
b_rating = round((u_num/b_num), 2)
avg_rating = round((total/b_num), 2)
u_rating = round((total/u_num), 2)

# missing ratings
missed_rating = (1 - (total/(b_num * u_num))) * 100

print('There are', b_num, 'distinct books and', u_num, 'individual users.\n', 
      'Are there missing values?:', na, '\n', 
      'On average, one book is rated by', b_rating, 
      'distinct users and has', avg_rating, 'ratings.\n',
      'One user gives out', u_rating, 'ratings.\n',
      'Not all users rate all books, so', '%.2f' % missed_rating, '% of the ratings is missing.')

In [15]:
# check ratings with histogram to see distribution
ratings_df.hist(column='rating', bins=5)

In [16]:
# get summary of ratings
ratings_df['rating'].describe()

Ratings range from 1 to 5 with an average of 3.86 and a standard deviation of 0.98.
There is no outlier in the data and majority of the ratings are high.

**EDA on to_read**

In [19]:
# convert to_read to pandas dataframe
read_df = to_read.toPandas()
read_df.head()

Unnamed: 0,user_id,book_id
0,1,112
1,1,235
2,1,533
3,1,1198
4,1,1874


In [20]:
# check uniques
rows = len(read_df)
b_num = read_df['book_id'].nunique()
u_num = read_df['user_id'].nunique()
avg_read = round((rows/b_num),2)

print('On average, one user has', avg_read, 'book in the to read list.')

In [21]:
from pyspark.sql import Row

# create temporary tables
to_read.createOrReplaceTempView('to_read')
books.createOrReplaceTempView('books')

In [22]:
%sql
-- top 10 users with the most number of books in their to read list --
SELECT user_id, COUNT(*) as Num
FROM to_read
GROUP BY user_id
ORDER BY Num DESC LIMIT 10;

user_id,Num
38457,117
28259,114
38076,110
44530,108
46555,107
46000,107
34162,105
34487,104
39174,103
40362,102


In [23]:
%sql
-- top 10 saved books in users' to read list --
SELECT a.book_id, b.title, COUNT(*) as Num
FROM to_read a
INNER JOIN books b
ON a.book_id = b.book_id
GROUP BY a.book_id, b.title
ORDER BY Num DESC LIMIT 10;

book_id,title,Num
13,The Ultimate Hitchhiker's Guide to the Galaxy,1812
11,"The Hitchhiker's Guide to the Galaxy (Hitchhiker's Guide to the Galaxy, #1)",1767
119,The Lord of the Rings: The Art of The Fellowship of the Ring,1499
8,"Harry Potter Boxed Set, Books 1-5 (Harry Potter, #1-5)",1498
6,"Harry Potter and the Goblet of Fire (Harry Potter, #4)",1484
67,The Known World,1352
5,"Harry Potter and the Prisoner of Azkaban (Harry Potter, #3)",1293
36,The Lord of the Rings: Weapons and Warfare,1211
28,Notes from a Small Island,1148
10,"Harry Potter Collection (Harry Potter, #1-6)",1110


**EDA on books**

In [25]:
# convert to pandas dataframe
books_df = books.toPandas()
books_df.head()

# remove entries due to data importing issue
books_df = books_df[books_df.ratings_count.str.isdigit() == True]
books_df.ratings_count = books_df.ratings_count.astype('int')
books_df.average_rating = books_df.average_rating.astype('float')

I originally wanted to use SQL to generate the tables below, but I could not solve the data importing issue. Instead I utilized Pandas and manipulated the dataframe to get the resulted tables.

In [27]:
# top 10 popular books' ratings
books_df.sort_values(by = 'ratings_count', ascending = False)[['original_title',
                                                               'ratings_count', 
                                                               'average_rating' ]][0:10]

Unnamed: 0,original_title,ratings_count,average_rating
0,The Hunger Games,4780653,4.34
1,Harry Potter and the Philosopher's Stone,4602479,4.44
2,Twilight,3866839,3.57
3,To Kill a Mockingbird,3198671,4.25
4,The Great Gatsby,2683664,3.89
5,The Fault in Our Stars,2346404,4.26
6,The Hobbit or There and Back Again,2071616,4.25
7,The Catcher in the Rye,2044241,3.79
9,Pride and Prejudice,2035490,4.24
8,Angels & Demons,2001311,3.85


In [28]:
# top 10 rated books
books_df.sort_values(by = 'average_rating', ascending = False)[['original_title',
                                                                'average_rating' ]][0:10]

Unnamed: 0,original_title,average_rating
3627,The Complete Calvin and Hobbes,4.82
861,Words of Radiance,4.77
3274,,4.77
8853,Mark of the Lion Trilogy,4.76
7946,,4.76
4482,It's a Magical World: A Calvin and Hobbes Coll...,4.75
6360,There's Treasure Everywhere: A Calvin and Hobb...,4.74
421,Complete Harry Potter Boxed Set,4.74
6589,The Authoritative Calvin and Hobbes,4.73
6919,The Indispensable Calvin and Hobbes: A Calvin ...,4.73


**EDA on book_tags and tags**

In [30]:
# EDA on book_tags
book_tags.show(n=5)

In [31]:
# EDA on tags
tags.show(n=5)

In [32]:
# create temporary tables
book_tags.createOrReplaceTempView('book_tags')
tags.createOrReplaceTempView('tags')

In [33]:
%sql
-- 10 most popular tag names --
SELECT DISTINCT a.tag_name, SUM(b.count) as Total
FROM tags a
INNER JOIN book_tags b
ON a.tag_id = b.tag_id
GROUP BY a.tag_name
ORDER BY Total DESC LIMIT 10;

tag_name,Total
to-read,140718761
currently-reading,7507958
favorites,4503173
fiction,3688819
fantasy,3548157
young-adult,1848306
classics,1756920
books-i-own,1317235
romance,1231926
owned,1224279


**3. Data Preparation**

The raw datasets is in healthy shape and is ready to start modeling.

**4. Modeling**

I utilized collaborative filtering and used Alternating Least Square (ALS) to build the recommendation engine. 

Collaborative filtering is an important and popular concept behind recommendation engine, it basicly means that people with similar taste tend to consume the same kind of product. 

Collaborative filtering can be achieved by matrix factorization, where a user-item matrix can be factored into a user-feature matrix and an item-feature matrix. While ALS is used to find the latent factor, the link between users and their ratings, and to approximate the factor weights to minimize the least squares between prediction and actual ratings. As a result, ALS will fill in missing ratings with its predictions.

I set coldStartStrategy to 'drop' because I want to make sure that there is no NAs in the predictions and the evaluator will return meaningful results. ImplicitPrefs is set to False because the data has users' ratings and ALS will recognize them as explicit preferences. NonNegative is just to make sure there will be no negative numbers in the predictions.

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator 

# split data into training and test set
(train, test) = ratings.randomSplit([0.8, 0.2])

# create ALS model
als = ALS(userCol='user_id', itemCol='book_id', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True, implicitPrefs=False)

# define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = 'rmse', 
                                labelCol = 'rating', 
                                predictionCol = 'prediction')

In [37]:
# tune model
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20, 75, 150]) \
            .addGrid(als.maxIter, [10, 50, 100]) \
            .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
            .build()

# build cross validator
cv = CrossValidator(estimator = als, 
                    estimatorParamMaps = param_grid, 
                    evaluator = evaluator, 
                    numFolds = 5)

In [38]:
# fit model
model = als.fit(train)

# make predictions
pred = model.transform(test)

In [39]:
# create function to tune hyper parameter and find the best model configuration
def tune_als(train_data, test_data, ranks, iters, params):
  """
  train_data: dataset for training
  test_data: dataset for testing
  ranks: number of latent factors
  iters: MaxIter
  params: RegParam
  return: the best fitted ALS model with the lowest RMSE
  """
  
  # initialize place holders
  min_error = float('inf')
  best_rank = -1
  best_iter = 1
  best_reg = 0
  best_model = None
  
  for rank in ranks:
    for reg in params:
      
      # configure ALS model
      als = ALS()\
      .setRank(rank)\
      .setMaxIter(iters)\
      .setRegParam(reg)\
      .setUserCol("user_id")\
      .setItemCol("book_id")\
      .setRatingCol("rating")
      
      # train model
      model = als.fit(train_data)
      
      # predict and evaluate model
      pred = model.transform(test_data)
      evaluator = RegressionEvaluator(metricName = 'rmse', 
                              labelCol = 'rating', 
                              predictionCol = 'prediction')
      rmse = evaluator.evaluate(pred)

      if rmse < min_error:
        min_error = rmse
        best_rank = rank
        best_reg = reg
        best_model = model
  print('Rank:', best_rank)
  print('Iter:', best_iter)
  print('Param:', best_reg)
  return best_model


# hyper param config
iters = 50
ranks = [10, 50, 100]
reg_param = [0.01, 0.05, 0.1]

final_model = tune_als(train, test, ranks, iters, reg_param)

**5. Evaluation**

I wanted to find the best hyper parameters for the ALS model and tried cross validation and user defined function, but the process took extremely long time to complete and Databricks automatically logged me out during the session. I think the reason might be that I am running the free community version.

Instead I manually tested several groups of hyper parameters and found that the default setting has a RMSE of around 0.91, which I believe is acceptable and reasonable given the context that the engine is recommending books to readers. This number indicates that the model's predictions on ratings are 0.91 unit away from the actual ratings.

In [42]:
# calculate RMSE
rmse = evaluator.evaluate(pred)
print('RMSE is ' + str(rmse))

In [43]:
# create temporary tables
pred.createOrReplaceTempView('pred')
books.createOrReplaceTempView('books')

In [44]:
%sql
-- check model predictions --
SELECT pred.user_id, pred.book_id, pred.rating, pred.prediction, books.title 
FROM pred 
LEFT JOIN books 
ON pred.book_id = books.id

user_id,book_id,rating,prediction,title
588,148,4,3.365916,Girl with a Pearl Earring
9731,148,3,3.031473,Girl with a Pearl Earring
10727,148,4,3.300964,Girl with a Pearl Earring
9165,148,3,3.758632,Girl with a Pearl Earring
32055,148,3,3.12183,Girl with a Pearl Earring
10140,148,3,2.8816402,Girl with a Pearl Earring
7001,148,4,3.8797023,Girl with a Pearl Earring
20967,148,3,3.8291576,Girl with a Pearl Earring
10610,148,4,3.8230343,Girl with a Pearl Earring
14603,148,4,3.6024997,Girl with a Pearl Earring


**Generate recommendations**

In [46]:
# generate 10 recommendations for all users
rec = model.recommendForAllUsers(10)
rec.show()

In [47]:
# create temporary tables
rec.createOrReplaceTempView('rec')

# transform data frame into cleaner format
tidy_rec = spark.sql(
  """
  SELECT user_id,
  x.book_id AS book_id,
  x.rating AS prediction
  FROM rec
  LATERAL VIEW explode(recommendations) exploded_table AS x
  """)

# create temporary tables
tidy_rec.createOrReplaceTempView('tidy_rec')

In [48]:
%sql
-- select the final recommendations --
SELECT a.user_id, a.book_id, a.prediction, b.title
FROM tidy_rec a
LEFT JOIN books b
ON a.book_id = b.book_id
WHERE title <> 'null'
ORDER BY a.user_id, a.prediction DESC;

user_id,book_id,prediction,title
1,4264,3.7688138,Fever Pitch
1,5344,3.7579236,Hard Times
2,5344,5.3379664,Hard Times
3,5872,1.0544994,"Regeneration (Regeneration, #1)"
5,3885,5.175308,The Taste of Home Cookbook
7,5346,4.8078547,The Last Juror
8,9531,5.530328,"Peter and the Shadow Thieves (Peter and the Starcatchers, #2)"
11,5344,4.734245,Hard Times
12,4264,4.2636156,Fever Pitch
12,7455,4.123007,Sex and the City


In [49]:
%sql
-- predicted ratings for to read list --
SELECT a.user_id, a.book_id, b.prediction AS rating_pred
FROM to_read a
INNER JOIN tidy_rec b
ON a.user_id = b.user_id AND a.book_id = b.book_id;

user_id,book_id,rating_pred
7833,862,4.504927
21715,8563,4.0605936
51454,8362,5.307687
15173,4104,4.103098
19619,6582,5.0904326
20052,2290,3.9824455
25106,3459,4.981147
52100,3093,4.691145
3876,9830,5.014241
13898,964,4.0606694


It is interesting to see that some predictions made by ALS are outside the scale of 1 to 5, the rating scale in the original data set. I searched around and found that this has to do with ALS's approximation methods and it makes more sense to use the predictions as rankings for recommendations rather than simple predicted rating scores. A higher predicted rating score implies that the user is more interested in this item than that of a lower rating.

In [51]:
%sql
-- select minimum, maximum and average of prediction in to read list
SELECT MIN(b.prediction), MAX(b.prediction), AVG(b.prediction)
FROM to_read a
INNER JOIN tidy_rec b
ON a.user_id = b.user_id AND a.book_id = b.book_id;

min(prediction),max(prediction),avg(prediction)
0.96303517,6.084344,4.591680483160807


We can see that the average predicted rating in the to read list is 4.59, which is high and reasonable given that this is a reader's to do list and books in this list are those a reader has interests in. The minimum rating 0.96 is not necessarily a mistake or poor performance of the model, I believe there are circumstances that a reader may want to try something different but the model predicts that he/she will not like it or he/she will give a low rating score to that.