In [1]:
val bs = spark
import bs.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.recommendation.{ALS,ALSModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

Waiting for a Spark session to start...

bs = org.apache.spark.sql.SparkSession@41cf1731


# Introduction

In the second part of this project we would like to make our recommendations usings a model built with Spark.

Once again our aim is to recommend books to users. To do this we will train a model on the dataset of books available to us. Make predictions for how users would rate books they have not rated, and then the most highly rated books they have not read can be recommended to them.

The rationale for using spark is that it will achieve better peformance on large datasets. The python implementation only used the top .001% percent of the books available in the dataset. With spark we expect to be able to increase the size of the dataset without significant performance penalties.



# Dataset

The dataset is the same book crossing dataset, using only explicit ratings.

In [2]:
//For switching between local and AWS deployments
val prepend ="s3://colon/E4751/"
//val prepend =""

prepend = s3://colon/E4751/


s3://colon/E4751/

In [3]:
//This is super important to know
spark.version

2.2.0

In [4]:
val desc_file = "E4751/BX-Books.csv"

val ratings_file = "E4751/BX-Book-Ratings.csv"

desc_file = E4751/BX-Books.csv
ratings_file = E4751/BX-Book-Ratings.csv


E4751/BX-Book-Ratings.csv

In [5]:
//Create a schema for the csv to save reading the file twice to infer the schema
val ratings_schema = StructType(Array(
    StructField("User-ID", IntegerType, true),
    StructField("ISBN", StringType, true),
    StructField("Book-Rating", IntegerType, true)))


ratings_schema = StructType(StructField(User-ID,IntegerType,true), StructField(ISBN,StringType,true), StructField(Book-Rating,IntegerType,true))


StructType(StructField(User-ID,IntegerType,true), StructField(ISBN,StringType,true), StructField(Book-Rating,IntegerType,true))

In [6]:
val df = spark.read.
    option("sep",";").
    option("mode","FAILFAST").
    option("header",true).
    option("encoding","IBM850").
    schema(ratings_schema).
    csv(ratings_file)

df = [User-ID: int, ISBN: string ... 1 more field]


[User-ID: int, ISBN: string ... 1 more field]

In [7]:
//Sanity check on the data
df.show(5)

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
+-------+----------+-----------+
only showing top 5 rows



In [8]:
val rows = df.count()
println("we have %d rows".format(rows))

we have 1149780 rows                                                            


rows = 1149780


1149780

In [9]:
val unique_rows = df.dropDuplicates(Array("User-ID","ISBN")).count()
val duplicates = rows - unique_rows

println("we have %d duplicate rows".format(duplicates))

we have 0 duplicate rows                                                        


unique_rows = 1149780
duplicates = 0


0

In [10]:
//We're using explicit ratings so only greater than 0
val df_explicit = df.filter($"Book-Rating">0)

df_explicit = [User-ID: int, ISBN: string ... 1 more field]


[User-ID: int, ISBN: string ... 1 more field]

In [11]:
df_explicit.show(5)

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276726|0155061224|          5|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
+-------+----------+-----------+
only showing top 5 rows



In [12]:
// a quick check on what the data looks like
val unique_users = df_explicit.select("User-ID").distinct().count()
val unique_books = df_explicit.select("ISBN").distinct().count()
val total_ratings = df_explicit.count()




unique_users = 77805
unique_books = 185973
total_ratings = 433671


433671

In [13]:
println("The number of unique users is %d".format(unique_users))
println("The number of unique books is %d".format(unique_books))
println("The number of ratings is %d".format(total_ratings))

The number of unique users is 77805
The number of unique books is 185973
The number of ratings is 433671


In [14]:
//what do ratings look like
df_explicit.describe("Book-Rating").show()

|summary|       Book-Rating|
+-------+------------------+
|  count|            433671|
|   mean| 7.601066246071331|
| stddev|1.8437976309993231|
|    min|                 1|
|    max|                10|
+-------+------------------+



In [15]:
//how many times books have been rated
df_explicit.groupBy("ISBN").count().describe("count").show()

|summary|             count|
+-------+------------------+
|  count|            185973|
|   mean|2.3319030181800584|
| stddev| 6.834667418109391|
|    min|                 1|
|    max|               707|
+-------+------------------+



In [16]:
//how many books have users rated
df_explicit.groupBy("User-ID").count().describe("count").show()

|summary|            count|
+-------+-----------------+
|  count|            77805|
|   mean|5.573819163292847|
| stddev|44.00187870029838|
|    min|                1|
|    max|             8524|
+-------+-----------------+



In [17]:
//Take a look at the distributions of 
//the number of times books have been rated
//the number of books users have rated 
//75th to 99th quantiles

val desired_quantiles= (.75 to .99 by.01).toArray
//https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/DataFrameStatFunctions.html#approxQuantile(java.lang.String,%20double[],%20double)
val book_counts_quantile_values = df_explicit.groupBy("ISBN").count().stat.approxQuantile("count",desired_quantiles,0)
val book_counts_quantile_map = (desired_quantiles zip book_counts_quantile_values).toMap




desired_quantiles = Array(0.75, 0.76, 0.77, 0.78, 0.79, 0.8, 0.81, 0.8200000000000001, 0.83, 0.84, 0.85, 0.86, 0.87, 0.88, 0.89, 0.9, 0.91, 0.92, 0.9299999999999999, 0.94, 0.95, 0.96, 0.97, 0.98, 0.99)
book_counts_quantile_values = Array(2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 5.0, 6.0, 6.0, 8.0, 10.0, 13.0, 22.0)
book_counts_quantile_map = Map(0.84 -> 3.0, 0.86 -> 3.0, 0.88 -> 3.0, 0.89 -> 3.0, 0.96 -> 8.0, 0.91 -> 4.0, 0.9299999999999999 -> 5.0, 0.8 -> 2.0, 0.99 -> 22.0, 0.75 -> 2.0, 0.78 -> 2.0, 0.81 -> 2.0, 0.87 -> 3.0, 0.79 -> 2.0, 0.77 -> 2.0, 0.98 -> 13.0, 0.83 -> 2.0, 0.85 -> 3.0, 0.76 -> 2.0, 0.9 -> 4.0, 0.94 -> 6.0, 0.92 -> 4.0, 0.97 -> 10.0, 0.95 -> 6...


Map(0.84 -> 3.0, 0.86 -> 3.0, 0.88 -> 3.0, 0.89 -> 3.0, 0.96 -> 8.0, 0.91 -> 4.0, 0.9299999999999999 -> 5.0, 0.8 -> 2.0, 0.99 -> 22.0, 0.75 -> 2.0, 0.78 -> 2.0, 0.81 -> 2.0, 0.87 -> 3.0, 0.79 -> 2.0, 0.77 -> 2.0, 0.98 -> 13.0, 0.83 -> 2.0, 0.85 -> 3.0, 0.76 -> 2.0, 0.9 -> 4.0, 0.94 -> 6.0, 0.92 -> 4.0, 0.97 -> 10.0, 0.95 -> 6.0, 0.8200000000000001 -> 2.0)

In [18]:
//quantiles for the number of times a book has been rated
for (i <- desired_quantiles.indices){
    println("%f : %f".format(desired_quantiles(i),book_counts_quantile_values(i)))
}

0.750000 : 2.000000
0.760000 : 2.000000
0.770000 : 2.000000
0.780000 : 2.000000
0.790000 : 2.000000
0.800000 : 2.000000
0.810000 : 2.000000
0.820000 : 2.000000
0.830000 : 2.000000
0.840000 : 3.000000
0.850000 : 3.000000
0.860000 : 3.000000
0.870000 : 3.000000
0.880000 : 3.000000
0.890000 : 3.000000
0.900000 : 4.000000
0.910000 : 4.000000
0.920000 : 4.000000
0.930000 : 5.000000
0.940000 : 6.000000
0.950000 : 6.000000
0.960000 : 8.000000
0.970000 : 10.000000
0.980000 : 13.000000
0.990000 : 22.000000


In [19]:
val user_counts_quantile_values = df_explicit.groupBy("User-ID").count().stat.approxQuantile("count",desired_quantiles,0)
val user_counts_quantile_map = (desired_quantiles zip user_counts_quantile_values).toMap




user_counts_quantile_values = Array(3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 5.0, 5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 8.0, 9.0, 10.0, 11.0, 13.0, 16.0, 19.0, 23.0, 30.0, 44.0, 74.0)
user_counts_quantile_map = Map(0.84 -> 5.0, 0.86 -> 6.0, 0.88 -> 7.0, 0.89 -> 8.0, 0.96 -> 23.0, 0.91 -> 10.0, 0.9299999999999999 -> 13.0, 0.8 -> 4.0, 0.99 -> 74.0, 0.75 -> 3.0, 0.78 -> 4.0, 0.81 -> 4.0, 0.87 -> 7.0, 0.79 -> 4.0, 0.77 -> 3.0, 0.98 -> 44.0, 0.83 -> 5.0, 0.85 -> 6.0, 0.76 -> 3.0, 0.9 -> 9.0, 0.94 -> 16.0, 0.92 -> 11.0, 0.97 -> 30.0, 0.95 -> 19.0, 0.8200000000000001 -> 5.0)


Map(0.84 -> 5.0, 0.86 -> 6.0, 0.88 -> 7.0, 0.89 -> 8.0, 0.96 -> 23.0, 0.91 -> 10.0, 0.9299999999999999 -> 13.0, 0.8 -> 4.0, 0.99 -> 74.0, 0.75 -> 3.0, 0.78 -> 4.0, 0.81 -> 4.0, 0.87 -> 7.0, 0.79 -> 4.0, 0.77 -> 3.0, 0.98 -> 44.0, 0.83 -> 5.0, 0.85 -> 6.0, 0.76 -> 3.0, 0.9 -> 9.0, 0.94 -> 16.0, 0.92 -> 11.0, 0.97 -> 30.0, 0.95 -> 19.0, 0.8200000000000001 -> 5.0)

In [20]:
//quantiles for the number of books a user has rated
for (i <- desired_quantiles.indices){
    println("%f : %f".format(desired_quantiles(i),user_counts_quantile_values(i)))
}

0.750000 : 3.000000
0.760000 : 3.000000
0.770000 : 3.000000
0.780000 : 4.000000
0.790000 : 4.000000
0.800000 : 4.000000
0.810000 : 4.000000
0.820000 : 5.000000
0.830000 : 5.000000
0.840000 : 5.000000
0.850000 : 6.000000
0.860000 : 6.000000
0.870000 : 7.000000
0.880000 : 7.000000
0.890000 : 8.000000
0.900000 : 9.000000
0.910000 : 10.000000
0.920000 : 11.000000
0.930000 : 13.000000
0.940000 : 16.000000
0.950000 : 19.000000
0.960000 : 23.000000
0.970000 : 30.000000
0.980000 : 44.000000
0.990000 : 74.000000


In [21]:
//Create a schema for the csv to save reading the file twice to infer the schema
val ratings_schema = StructType(Array(
    StructField("ISBN", StringType, true),
    StructField("Book-Title", StringType, true),
    StructField("Book-Author", StringType, true),
    StructField("Year-Of-Publication", IntegerType, true),
    StructField("Publisher", StringType, true),
    StructField("Image-URL-S", StringType, true),
    StructField("Image-URL-M", StringType, true),
    StructField("Image-URL-L", StringType, true)))

ratings_schema = StructType(StructField(ISBN,StringType,true), StructField(Book-Title,StringType,true), StructField(Book-Author,StringType,true), StructField(Year-Of-Publication,IntegerType,true), StructField(Publisher,StringType,true), StructField(Image-URL-S,StringType,true), StructField(Image-URL-M,StringType,true), StructField(Image-URL-L,StringType,true))


StructType(StructField(ISBN,StringType,true), StructField(Book-Title,StringType,true), StructField(Book-Author,StringType,true), StructField(Year-Of-Publication,IntegerType,true), StructField(Publisher,StringType,true), StructField(Image-URL-S,StringType,true), StructField(Image-URL-M,StringType,true), StructField(Image-URL-L,StringType,true))

In [22]:
//Can we get some idea of what the most frequently rated books are?
//Can we see what the most positively rated book is 

//read in a dataset of book titles
val df_desc = spark.read.option("sep",";").
    option("header",true).
    option("encoding","IBM850").
    option("escape","\\").
    schema(ratings_schema).
    csv(desc_file)

df_desc = [ISBN: string, Book-Title: string ... 6 more fields]


[ISBN: string, Book-Title: string ... 6 more fields]

In [23]:
//quick sanity check

df_desc.show(3)

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
+----------+--------------------+-------------

In [24]:
//merge the ratings and book descriptions

val df_merged = df_explicit.join(df_desc,Array("ISBN"))
df_merged.cache

df_merged = [ISBN: string, User-ID: int ... 8 more fields]


[ISBN: string, User-ID: int ... 8 more fields]

In [25]:
//what's the most frequently rated book

val ratings_count = df_merged.groupBy("ISBN").count().orderBy($"count".desc)
ratings_count.cache
ratings_count.show(10,false)

|ISBN      |count|
+----------+-----+
|0316666343|707  |
|0971880107|581  |
|0385504209|487  |
|0312195516|383  |
|0060928336|320  |
|059035342X|313  |
|0142001740|307  |
|0446672211|295  |
|044023722X|281  |
|0452282152|278  |
+----------+-----+
only showing top 10 rows



ratings_count = [ISBN: string, count: bigint]


[ISBN: string, count: bigint]

In [26]:
val users_count = df_merged.groupBy("User-ID").count()
users_count.cache
users_count.show(3)

|User-ID|count|
+-------+-----+
| 161234|   18|
| 178199|   94|
|  41751|    6|
+-------+-----+
only showing top 3 rows



users_count = [User-ID: int, count: bigint]


[User-ID: int, count: bigint]

In [27]:
//The books with the highest average rating
val avg_ratings = df_merged.groupBy("ISBN").avg("Book-Rating").withColumnRenamed("avg(Book-Rating)","avg_rating").orderBy($"avg_rating".desc)
avg_ratings.show(10,false)

|ISBN      |avg_rating|
+----------+----------+
|0140096655|10.0      |
|0226752275|10.0      |
|0140144196|10.0      |
|0006717047|10.0      |
|0140165886|10.0      |
|0131453580|10.0      |
|0192827529|10.0      |
|0226504646|10.0      |
|0062511327|10.0      |
|0195084829|10.0      |
+----------+----------+
only showing top 10 rows



avg_ratings = [ISBN: string, avg_rating: double]


[ISBN: string, avg_rating: double]

In [28]:
val combined_rating = avg_ratings.join(ratings_count,Array("ISBN")) 

combined_rating = [ISBN: string, avg_rating: double ... 1 more field]


[ISBN: string, avg_rating: double ... 1 more field]

In [29]:
//the most frequently rated books and their average rating 
combined_rating.orderBy($"count".desc).show(10,false)

|ISBN      |avg_rating        |count|
+----------+------------------+-----+
|0316666343|8.185289957567186 |707  |
|0971880107|4.3907056798623065|581  |
|0385504209|8.435318275154003 |487  |
|0312195516|8.182767624020888 |383  |
|0060928336|7.8875            |320  |
|059035342X|8.939297124600639 |313  |
|0142001740|8.452768729641694 |307  |
|0446672211|8.142372881355932 |295  |
|044023722X|7.338078291814947 |281  |
|0452282152|7.982014388489208 |278  |
+----------+------------------+-----+
only showing top 10 rows



In [30]:
//the highest rated books and their number of times rated
combined_rating.orderBy($"avg_rating".desc).show(10,false)

+----------+----------+-----+                                                   
|ISBN      |avg_rating|count|
+----------+----------+-----+
|0310385709|10.0      |2    |
|0307124568|10.0      |1    |
|0192834096|10.0      |1    |
|014200135X|10.0      |1    |
|0140109269|10.0      |1    |
|0140512047|10.0      |1    |
|0140447113|10.0      |1    |
|0060803312|10.0      |2    |
|0060173890|10.0      |1    |
|0060595264|10.0      |1    |
+----------+----------+-----+
only showing top 10 rows



We filter the ratings so that we are only looking at the most popular books and have users that have rated at least 2 books. 
To compare with the previous python implementation we will use the same cutoff values

In [31]:
//based on the qunatiles computed earlier these will be the cutoffs for filtering the ratings records
//books with more than 90 ratings
//users with more than 1 book rated
val book_quantile = 90

val user_quantile = 1


book_quantile = 90
user_quantile = 1


1

In [32]:
val top_percentile_books = df_merged.join(ratings_count,Seq("ISBN")).
filter($"count">book_quantile).
drop("count")

val top_percentile_books_user = top_percentile_books.join(top_percentile_books.groupBy("User-ID").count(),Seq("User-ID")).
filter($"count">user_quantile).
drop("count")

val temp = top_percentile_books_user



top_percentile_books = [ISBN: string, User-ID: int ... 8 more fields]
top_percentile_books_user = [User-ID: int, ISBN: string ... 8 more fields]
temp = [User-ID: int, ISBN: string ... 8 more fields]


[User-ID: int, ISBN: string ... 8 more fields]

In [33]:
temp.cache

[User-ID: int, ISBN: string ... 8 more fields]

In [34]:
//check that every book has at least one rating after filtering on users
temp.groupBy("ISBN").count().filter($"count"===0).show()

|ISBN|count|
+----+-----+
+----+-----+



In [35]:
val N = temp.select("User-ID").distinct().count()
println("The number of unique users is in the trimmed dataset is  %d".format(N))

val M = temp.select("ISBN").distinct().count()
println("The number of unique books in the trimmed dataset is %d".format(M))
println("The number of ratings in the trimmed dataset is %d".format(temp.count()))


The number of unique books in the trimmed dataset is 178                        
The number of ratings in the trimmed dataset is 17195


N = 4328
M = 178


178

In [36]:
//generate new ids for books and users that are consecutive integers
val uid_indexer = new StringIndexer().
    setInputCol("User-ID").
    setOutputCol("uid")
    
val bid_indexer = new StringIndexer().
    setInputCol("ISBN").
    setOutputCol("bid")
      
val uid_indexer_fitted = uid_indexer.fit(df_merged)

val bid_indexer_fitted = bid_indexer.fit(df_merged)

val t1 = uid_indexer_fitted.transform(temp)

val t2 = bid_indexer_fitted.transform(t1)

val data = t2





[User-ID: int, ISBN: string ... 10 more fields]

uid_indexer = strIdx_99696053c4b8
bid_indexer = strIdx_787076725cf2
uid_indexer_fitted = strIdx_99696053c4b8
bid_indexer_fitted = strIdx_787076725cf2
t1 = [User-ID: int, ISBN: string ... 9 more fields]
t2 = [User-ID: int, ISBN: string ... 10 more fields]
data = [User-ID: int, ISBN: string ... 10 more fields]


In [37]:
//train test split 
val Array(training, test) = data.randomSplit(Array(0.8, 0.2))

training = [User-ID: int, ISBN: string ... 10 more fields]
test = [User-ID: int, ISBN: string ... 10 more fields]


[User-ID: int, ISBN: string ... 10 more fields]

# Baseline model

Before we start using collaborative filtering  we first implement a baseline model that simply uses the average book rating of the dataset as the predicted rating for a book.  

In [38]:
//I suspect there is some inefficency in the way in which the mean is calculated and distributed to workers
val avg_rating = training.select(mean($"Book-Rating")).head().getDouble(0)

val rmse_baseline = math.sqrt(test.select("Book-Rating").as[(Double)].rdd.map(x=>math.pow(avg_rating-x,2)).mean)



avg_rating = 7.9347084029341275
rmse_baseline = 1.7784020259751816


1.7784020259751816

In [39]:
println("RMSE for baseline model is %f".format(rmse_baseline))

RMSE for baseline model is 1.778402


In [40]:
//a regression evaluator that does rmse
val evaluator = new RegressionEvaluator().
    setMetricName("rmse").
    setLabelCol("Book-Rating").
    setPredictionCol("prediction")

evaluator = regEval_25d7d1c9a5b9


regEval_25d7d1c9a5b9

In [52]:
val als = new ALS().
    setMaxIter(10).
    setUserCol("uid").
    setItemCol("bid").
    setRatingCol("Book-Rating").
    setColdStartStrategy("drop")
    

val implicit_als = new ALS().
    setMaxIter(10).
    setUserCol("uid").
    setItemCol("bid").
    setRatingCol("Book-Rating").
    setImplicitPrefs(true).
    setColdStartStrategy("drop")

val nn_als = new ALS().
    setMaxIter(10).
    setUserCol("uid").
    setItemCol("bid").
    setRatingCol("Book-Rating").
    setColdStartStrategy("drop").
    setNonnegative(true)
    
    


als = als_cc7eb7096bc9
implicit_als = als_40414a6ea62c
nn_als = als_97c85689df9f


als_97c85689df9f

In [53]:
val als_rmse = evaluator.evaluate(als.fit(training).transform(test))
val implicit_als_rmse = evaluator.evaluate(implicit_als.fit(training).transform(test))
val nn_als_rmse = evaluator.evaluate(nn_als.fit(training).transform(test))
println("RMSE for explicit als is %f".format(als_rmse))
println("RMSE for implicit als is %f".format(implicit_als_rmse))
println("RMSE for nonnegative explicit als is %f".format(nn_als_rmse))

RMSE for explicit als is 2.920287                                               
RMSE for implicit als is 7.953853
RMSE for nonnegative explicit als is 2.509765


als_rmse = 2.920286503259058
implicit_als_rmse = 7.953852766799263
nn_als_rmse = 2.509765207804227


2.509765207804227

# Improvements

We want to illustrate some of the problems with the dataset we are working with. 
Here we see that for the same book it appears under several different ISBNS. Even ignoring audio books and other formats, we still have several versions.  
This does not even consider the same book in different languages.  
There is one easy correction we can make, that is to either uppercase or lowercase all the ISBNS to reduce unify those instances where books are split accross ISBNs because of that. 


In [43]:
val cols = Seq("ISBN","Book-Title")
df_merged.dropDuplicates("ISBN").filter($"Book-Title" rlike ".*Harry Potter and the Sorcerer's Stone.*").select(cols.map(x=>col(x)):_*).show(100,false)

+----------+----------------------------------------------------------------+   
|ISBN      |Book-Title                                                      |
+----------+----------------------------------------------------------------+
|0807281956|Harry Potter and the Sorcerer's Stone (Book 1 Audio CD)         |
|1594130000|Harry Potter and the Sorcerer's Stone                           |
|059035342x|Harry Potter and the Sorcerer's Stone (Harry Potter (Paperback))|
|0807281751|Harry Potter and the Sorcerer's Stone (Book 1, Audio)           |
|043920352X|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|043936213X|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|043936213x|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|0590353403|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|0439294827|Harry Potter and the Sorcerer's Stone: A Deluxe Pop-up Book     |
|0786222727|Harry Potter and the Sorcerer's Stone (Book 1, La

cols = List(ISBN, Book-Title)


List(ISBN, Book-Title)

In [44]:
//make a new df unified and use that from now on with ISBN transformed to uppercase
//count unique books again after doing that
val df_unified =df_merged.withColumn("ISBN", upper(col("ISBN")));

df_unified = [ISBN: string, User-ID: int ... 8 more fields]


[ISBN: string, User-ID: int ... 8 more fields]

In [45]:
df_unified.dropDuplicates("ISBN").filter($"Book-Title" rlike ".*Harry Potter and the Sorcerer's Stone.*").select(cols.map(x=>col(x)):_*).show(100,false)

+----------+----------------------------------------------------------------+   
|ISBN      |Book-Title                                                      |
+----------+----------------------------------------------------------------+
|0807281956|Harry Potter and the Sorcerer's Stone (Book 1 Audio CD)         |
|1594130000|Harry Potter and the Sorcerer's Stone                           |
|0807281751|Harry Potter and the Sorcerer's Stone (Book 1, Audio)           |
|043920352X|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|043936213X|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|0590353403|Harry Potter and the Sorcerer's Stone (Book 1)                  |
|0439294827|Harry Potter and the Sorcerer's Stone: A Deluxe Pop-up Book     |
|0786222727|Harry Potter and the Sorcerer's Stone (Book 1, Large Print)     |
|0439286239|Harry Potter and the Sorcerer's Stone Movie Poster Book         |
|059035342X|Harry Potter and the Sorcerer's Stone (Harry Pott

In [46]:
//count the number of distinct books again

println("The number of unique books is now %d".format(df_unified.select("ISBN").distinct().count()))

The number of unique books is now 149723                                        


We've managed to reduce the number of unique books from  185,973 to 149,723.  
About a 20% reduction.  
Moving forward we will use this unified dataset

In [47]:
//split into new train and test based on the unified dataset
val top_percentile_books_unified = df_unified.join(ratings_count,Seq("ISBN")).
filter($"count">book_quantile).
drop("count")

val top_percentile_books_user_unified = top_percentile_books_unified.join(top_percentile_books_unified.groupBy("User-ID").count(),Seq("User-ID")).
filter($"count">user_quantile).
drop("count")

val temp_2 = top_percentile_books_user_unified

val t3 = uid_indexer_fitted.transform(temp_2)

val t4 = bid_indexer_fitted.transform(t3)

val data_unified = t4

data_unified.cache

val model_cols = Seq("uid","bid","Book-Rating")

val Array(training_unified, test_unified) = data_unified.select(model_cols.map(x=>col(x)):_*).randomSplit(Array(0.8, 0.2))

top_percentile_books_unified = [ISBN: string, User-ID: int ... 8 more fields]
top_percentile_books_user_unified = [User-ID: int, ISBN: string ... 8 more fields]
temp_2 = [User-ID: int, ISBN: string ... 8 more fields]
t3 = [User-ID: int, ISBN: string ... 9 more fields]
t4 = [User-ID: int, ISBN: string ... 10 more fields]
data_unified = [User-ID: int, ISBN: string ... 10 more fields]
model_cols = List(uid, bid, Book-Rating)
training_unified = [uid: double, bid: double ... 1 more field]


test_unified: org.apache.spark.sql.Dataset[org....


[uid: double, bid: double ... 1 more field]

We will now tune the parameters of the models, by searching for optimal hyperparameters

In [54]:
//gridsearch using cross validation to find parameters for regularization and rank

//NOTE
//gridsearching is incredibly time-intensive, even though the following code runs correctly it was not actually used
//instead the default rank of 10 and regularisation 1

val regs = Array(.01,1,10)
val ranks = Array(2,6,10)

val param_grid = new ParamGridBuilder().
    addGrid(als.regParam,regs).
    addGrid(als.rank,ranks).
    build()

val nn_param_grid = new ParamGridBuilder().
    addGrid(nn_als.regParam,regs).
    addGrid(nn_als.rank,ranks).
    build()

val implicit_param_grid = new ParamGridBuilder().
    addGrid(implicit_als.regParam,regs).
    addGrid(implicit_als.rank,ranks).
    build()

val cv = new CrossValidator().
    setEstimator(als).
    setEvaluator(evaluator).
    setEstimatorParamMaps(param_grid).
    setNumFolds(3) 

val nn_cv = new CrossValidator().
    setEstimator(nn_als).
    setEvaluator(evaluator).
    setEstimatorParamMaps(nn_param_grid).
    setNumFolds(3)

val implicit_cv = new CrossValidator().
    setEstimator(implicit_als).
    setEvaluator(evaluator).
    setEstimatorParamMaps(implicit_param_grid).
    setNumFolds(3)

regs = Array(0.01, 1.0, 10.0)
ranks = Array(2, 6, 10)
param_grid = 
nn_param_grid = 


Array({
	als_cc7eb7096bc9-rank: 2,
	als_cc7eb7096bc9-regParam: 0.01
}, {
	als_cc7eb7096bc9-rank: 2,
	als_cc7eb7096bc9-regParam: 1.0
}, {
	als_cc7eb7096bc9-rank: 2,
	als_cc7eb7096bc9-regParam: 10.0
}, {
	als_cc7eb7096bc9-rank: 6,
	als_cc7eb7096bc9-regParam: 0.01
}, {
	als_cc7eb7096bc9-rank: 6,
	als_cc7eb7096bc9-regParam: 1.0
}, {
	als_cc7eb7096bc9-rank: 6,
	als_cc7eb7096bc9-regParam: 10.0
}, {
	als_cc7eb7096bc9-rank: 10,
	als_cc7eb7096bc9-regParam: 0.01
}, {
	als_cc7eb7096bc9-rank: 10,
	als_cc7eb7096bc9-regParam: 1.0
}, {
	als_cc7eb7096bc9-rank: 10,
	als_cc7eb7096bc9-regParam: 10.0
})
Array({
	a...


[{
	als_97c85689df9f-rank: 2,
	als_97c85689df9f-regParam: 0.01
}, {
	als_97c85689df9f-rank: 2,
	als_97c85689df9f-regParam: 1.0
}, {
	als_97c85689df9f-rank: 2,
	als_97c85689df9f-regParam: 10.0
}, {
	als_97c85689df9f-rank: 6,
	als_97c85689df9f-regParam: 0.01
}, {
	als_97c85689df9f-rank: 6,
	als_97c85689df9f-regParam: 1.0
}, {
	als_97c85689df9f-rank: 6,
	als_97c85689df9f-regParam: 10.0
}, {
	als_97c85689df9f-rank: 10,
	als_97c85689df9f-regParam: 0.01
}, {
	als_97c85689df9f-rank: 10,
	als_97c85689df9f-regParam: 1.0
}, {
	als_97c85689df9f-rank: 10,
	als_97c85689df9f-regParam: 10.0
}]

In [55]:
val cv_model = als.fit(training_unified)



cv_model = als_cc7eb7096bc9


als_cc7eb7096bc9

In [56]:
val nn_cv_model = nn_als.fit(training_unified)



nn_cv_model = als_97c85689df9f


als_97c85689df9f

In [57]:
val implicit_cv_model = implicit_als.fit(training_unified)



implicit_cv_model = als_40414a6ea62c


als_40414a6ea62c

In [58]:
val predictions = cv_model.transform(test_unified)
val train_predictions = cv_model.transform(training_unified)
val rmse = evaluator.evaluate(predictions)
val train_rmse = evaluator.evaluate(train_predictions)



predictions = [uid: double, bid: double ... 2 more fields]
train_predictions = [uid: double, bid: double ... 2 more fields]
rmse = 2.9177706075757435
train_rmse = 0.3518713085472958


0.3518713085472958

In [59]:
val nn_predictions = nn_cv_model.transform(test_unified)
val nn_train_predictions = nn_cv_model.transform(training_unified)
val nn_rmse = evaluator.evaluate(nn_predictions)
val nn_train_rmse = evaluator.evaluate(nn_train_predictions)



nn_predictions = [uid: double, bid: double ... 2 more fields]
nn_train_predictions = [uid: double, bid: double ... 2 more fields]
nn_rmse = 2.541436381885685
nn_train_rmse = 0.36550044872860676


0.36550044872860676

In [60]:
val implicit_predictions = implicit_cv_model.transform(test_unified)
val implicit_train_predictions = implicit_cv_model.transform(training_unified)
val implicit_rmse = evaluator.evaluate(implicit_predictions)
val implicit_train_rmse = evaluator.evaluate(implicit_train_predictions)



implicit_predictions = [uid: double, bid: double ... 2 more fields]
implicit_train_predictions = [uid: double, bid: double ... 2 more fields]
implicit_rmse = 7.912597543101461
implicit_train_rmse = 7.642094651564277


7.642094651564277

In [None]:
//Only used for grid searching
val best_reg_param = cv_model.bestModel.parent.getParam("regParam")
val best_rank_param = cv_model.bestModel.parent.getParam("rank")
val best_param_map = cv_model.bestModel.parent.extractParamMap()
val best_reg = best_param_map.get(best_reg_param).get.asInstanceOf[Double]
val best_rank = best_param_map.get(best_rank_param).get.asInstanceOf[Int]

In [None]:
//Only used for grid searching
val nn_best_reg_param = nn_cv_model.bestModel.parent.getParam("regParam")
val nn_best_rank_param = nn_cv_model.bestModel.parent.getParam("rank")
val nn_best_param_map = nn_cv_model.bestModel.parent.extractParamMap()
val nn_best_reg = nn_best_param_map.get(nn_best_reg_param).get.asInstanceOf[Double]
val nn_best_rank = nn_best_param_map.get(nn_best_rank_param).get.asInstanceOf[Int]

In [None]:
//Only used for grid searching
val implicit_best_reg_param = implicit_cv_model.bestModel.parent.getParam("regParam")
val implicit_best_rank_param = implicit_cv_model.bestModel.parent.getParam("rank")
val implicit_best_param_map = implicit_cv_model.bestModel.parent.extractParamMap()
val implicit_best_reg = implicit_best_param_map.get(implicit_best_reg_param).get.asInstanceOf[Double]
val implicit_best_rank = implicit_best_param_map.get(implicit_best_rank_param).get.asInstanceOf[Int]

In [61]:
val model_df = List(("explicit",10,1,train_rmse,rmse),
                   ("implicit",10,1,implicit_train_rmse,implicit_rmse),
                   ("explicit_nonnegative",10,1,nn_train_rmse,nn_rmse)).
    toDF("model_type","rank","regularization","train_rmse","test_rmse")

model_df = [model_type: string, rank: int ... 3 more fields]


[model_type: string, rank: int ... 3 more fields]

In [62]:
model_df.show()

+--------------------+----+--------------+-------------------+------------------+
|          model_type|rank|regularization|         train_rmse|         test_rmse|
+--------------------+----+--------------+-------------------+------------------+
|            explicit|  10|             1| 0.3518713085472958|2.9177706075757435|
|            implicit|  10|             1|  7.642094651564277| 7.912597543101461|
|explicit_nonnegative|  10|             1|0.36550044872860676| 2.541436381885685|
+--------------------+----+--------------+-------------------+------------------+



In [63]:
//save the dataframe of results
model_df.coalesce(1).write.option("header",true).mode("overwrite").csv(prepend+"output")

In [64]:
//save a copy of the model in case we want to use it without refitting
cv_model.write.overwrite().save(prepend+"model")



# Dataset size
We now want to consider what happens when we expand the dataset from the top .001% of books to all books that have at least one rating

In [65]:
//filter only to ensure that every book has at least 2 ratings and every user has rated at least two books
//val top_percentile_books = df_merged.join(ratings_count,Seq("ISBN","Book-Title")).
//filter($"count">book_quantile).
//drop("count")

val full_dataset_books_filtered = df_unified.join(df_merged.groupBy("ISBN").count(),Seq("ISBN")).
    filter($"count">1).
    drop("count")
val full_dataset_users_filtered = full_dataset_books_filtered.
    join(full_dataset_books_filtered.groupBy("User-ID").count(),Seq("User-ID")).
    filter($"count">1).
    drop("count")
    
val temp_3 = full_dataset_users_filtered

full_dataset_books_filtered = [ISBN: string, User-ID: int ... 8 more fields]
full_dataset_users_filtered = [User-ID: int, ISBN: string ... 8 more fields]
temp_3 = [User-ID: int, ISBN: string ... 8 more fields]


[User-ID: int, ISBN: string ... 8 more fields]

In [66]:
val full_N = temp_3.select("User-ID").distinct().count()
println("The number of unique users is in the 'full' dataset is  %d".format(full_N))

val full_M = temp_3.select("ISBN").distinct().count()
println("The number of unique books in the 'full' dataset is %d".format(full_M))
println("The number of ratings in the 'full' dataset is %d".format(temp_3.count()))

The number of unique users is in the 'full' dataset is  24754                   ]
The number of unique books in the 'full' dataset is 49803                       ]
The number of ratings in the 'full' dataset is 250076                           ]


full_N = 24754
full_M = 49803


49803

In [67]:
val t5 = uid_indexer_fitted.transform(temp_3)

val t6 = bid_indexer_fitted.transform(t5)

val full_data = t6

t5 = [User-ID: int, ISBN: string ... 9 more fields]
t6 = [User-ID: int, ISBN: string ... 10 more fields]
full_data = [User-ID: int, ISBN: string ... 10 more fields]


[User-ID: int, ISBN: string ... 10 more fields]

In [68]:
val Array(full_training, full_test) = full_data.select(model_cols.map(x=>col(x)):_*).randomSplit(Array(0.8, 0.2))

full_training = [uid: double, bid: double ... 1 more field]
full_test = [uid: double, bid: double ... 1 more field]


[uid: double, bid: double ... 1 more field]

In [69]:
val best_als = new ALS().
    setMaxIter(10).
    setUserCol("uid").
    setItemCol("bid").
    setRatingCol("Book-Rating").
    setColdStartStrategy("drop").
    setNonnegative(true)

best_als = als_4026c48a5a3b


als_4026c48a5a3b

In [70]:
val best_als_fitted = best_als.fit(full_training)



best_als_fitted = als_4026c48a5a3b


als_4026c48a5a3b

In [71]:
val full_predictions = best_als_fitted.transform(full_test)

full_predictions = [uid: double, bid: double ... 2 more fields]


[uid: double, bid: double ... 2 more fields]

In [72]:
val full_test_rmse = evaluator.evaluate(full_predictions)



full_test_rmse = 2.4238782074852407


2.4238782074852407

In [73]:
println("The RMSE for the test set for a model trained on the 'full' dataset is %f".format(full_test_rmse))

The RMSE for the test set for a model trained on the 'full' dataset is 2.423878


In [74]:
evaluator.evaluate(best_als_fitted.transform(full_training))



0.3862291077569126

In [75]:
//sparsity calculation
val total_possible_ratings = unique_users*unique_books*1.0

val sparsity = (total_ratings/total_possible_ratings)*100

total_possible_ratings = 1.4469629265E10
sparsity = 0.002997112034162404


0.002997112034162404

# Conclusion

Our models have not been able to outperform our baseline.  

The best performing model was an excplicit rating als with non negative constraints. 

One reason for this may be the sparsity of this particular dataset. Collaborative filtering does not peform well on sparse datasets and the sparsity of the book-crossing dataset is .0029%

It may also be the case that they would benefit from better tuning of some of the parameters.
The large difference between the RMSE on training and test sets indicates that the algorithm is overfitting to the training set.  

The spark implementation is probably not worth it for this particular dataset.  
The baseline implementation achieves better results for a fraction of the computing resources and time.  
A different recommender algorithm that includes other features about a book might be able to produce better recommendations. 

As compared to the python implementation the spark one achieves better coverage since we are able to use more of the books from the dataset.  

The spark implementation whoever takes an incredibly long time to grid search parameters, there may be some optimizations or cluster configurations that would speed things up.  
Despite how common parameter tuning is in other libraries like scikit-learn, most Spark ALS tutorials don;t do any parameter tuning. 

Considering the poor performance and significant technical hurdles the spark option would only be viable for a model that needs to cover a large number of the items in the dataset and does not need a lot of tuning. 