# Book Recommendations using PySpark

In this notebook, we build a book recommendation system using ALS model in PySpark. We do data cleaning, model training, save and model, then reload the model to give recommendations. 


## Contents

- [ 1 - Create SparkSession and Load Data](#1)
- [ 2 - Dataset Preparation](#2)
- [ 3 - Training a Recommendation System Using ALS](#3)
- [ 4 - Save and Load the Model](#4)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
import pyspark.sql.functions as F
import warnings
warnings.filterwarnings('ignore')

<a name = '1'></a>
## 1. Create SparkSession and Load Data

In [2]:
spark = SparkSession.builder.appName('books').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/06 10:31:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
bookpath = 'Books.csv'
ratepath = 'Ratings.csv'
userpath = 'Users.csv'

### We create schema for books dataset.

In [4]:
from pyspark.sql.types import *
book_schema = StructType((StructField('ISBN', StringType(), True),\
                          StructField('Book-Title', StringType(), True),\
                          StructField('Book-Author', StringType(), True),\
                          StructField('Year-Of-Publication', StringType(), True),\
                          StructField('Publisher', StringType(), True),\
                          StructField('Image-URL-S', StringType(), True),\
                          StructField('Image-URL-M', StringType(), True),\
                          StructField('Image-URL-L', StringType(), True)))

In [5]:
book_df = spark.read.csv(bookpath, header=True, schema=book_schema)

In [6]:
book_df.show()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

### The '-' in column names may cause some problems for later program running, so we change '-' to '_'.

In [7]:
for name in book_df.schema.names:
  book_df = book_df.withColumnRenamed(name, name.replace('-', '_'))

In [8]:
book_df.columns

['ISBN',
 'Book_Title',
 'Book_Author',
 'Year_Of_Publication',
 'Publisher',
 'Image_URL_S',
 'Image_URL_M',
 'Image_URL_L']

### Check how many books in this dataset.

In [9]:
book_df.count()

271360

### Create schema for ratings dataset.

In [10]:
rate_schema = StructType((StructField('User-ID', IntegerType(), True),\
                          StructField('ISBN', StringType(), True),\
                          StructField('Book-Rating', IntegerType(), True)))

In [11]:
rate_df = spark.read.csv(ratepath, header=True, schema=rate_schema)

In [12]:
for name in rate_df.schema.names:
  rate_df = rate_df.withColumnRenamed(name, name.replace('-', '_'))

In [13]:
rate_df.count()

1149780

In [14]:
rate_df.show()

+-------+----------+-----------+
|User_ID|      ISBN|Book_Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
| 276746|0425115801|          0|
| 276746|0449006522|          0|
| 276746|0553561618|          0|
| 276746|055356451X|          0|
| 276746|0786013990|          0|
| 276746|0786014512|          0|
| 276747|0060517794|          9|
| 276747|0451192001|          0|
| 276747|0609801279|          0|
| 276747|0671537458|          9|
+-------+----------+-----------+
only showing top 20 rows



### Create schema for users dataset.

In [15]:
user_schema = StructType((StructField('User-ID', IntegerType(), True),\
                          StructField('Location', StringType(), True),\
                          StructField('Age', StringType(), True)))

In [16]:
user_df = spark.read.csv(userpath, header=True, schema=user_schema)

In [17]:
for name in user_df.schema.names:
  user_df = user_df.withColumnRenamed(name, name.replace('-', '_'))

In [18]:
user_df.count()

278859

In [19]:
user_df.show()

+-------+--------------------+----+
|User_ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|null|
|      2|stockton, califor...|18.0|
|      3|moscow, yukon ter...|null|
|      4|porto, v.n.gaia, ...|17.0|
|      5|farnborough, hant...|null|
|      6|santa monica, cal...|61.0|
|      7| washington, dc, usa|null|
|      8|timmins, ontario,...|null|
|      9|germantown, tenne...|null|
|     10|albacete, wiscons...|26.0|
|     11|melbourne, victor...|14.0|
|     12|fort bragg, calif...|null|
|     13|barcelona, barcel...|26.0|
|     14|mediapolis, iowa,...|null|
|     15|calgary, alberta,...|null|
|     16|albuquerque, new ...|null|
|     17|chesapeake, virgi...|null|
|     18|rio de janeiro, r...|25.0|
|     19|           weston, ,|14.0|
|     20|langhorne, pennsy...|19.0|
+-------+--------------------+----+
only showing top 20 rows



<a name='2'></a>
## 2. Dataset Preparation

### First, we use SQL commands to join the data we need from these datasets.

In [20]:
book_df.createOrReplaceTempView('book_df')
rate_df.createOrReplaceTempView('rate_df')
user_df.createOrReplaceTempView('user_df')

In [21]:
query = "SELECT R.User_ID, B.Book_Title, R.Book_Rating\
         FROM rate_df R\
         INNER JOIN book_df B\
         ON R.ISBN = B.ISBN"
data = spark.sql(query)

### For convenience of the exhibition in our notebook, we use part of the dataset.

In [22]:
data = data.limit(2000)

In [23]:
data.show()



+-------+--------------------+-----------+
|User_ID|          Book_Title|Book_Rating|
+-------+--------------------+-----------+
|  23902|    The Fighting Man|          9|
|  23902|Nothing Can Be Be...|          0|
|  16319|       Which Colour?|          0|
|  26583|Huck Scarry's Ste...|          0|
| 156534|  The Big Honey Hunt|          0|
| 176062|ARE YOU MY MOTHER...|          0|
| 114216|                Bess|          0|
|  11676| THE COAL HOUSE T/PB|          0|
| 131402| THE COAL HOUSE T/PB|          0|
| 145927| THE COAL HOUSE T/PB|          0|
|  11676|Glue (First Facts...|          0|
| 181659|Glue (First Facts...|         10|
| 206979|Glue (First Facts...|          0|
|  11676|Count Duckula: Va...|          6|
| 110029|Count Duckula: Va...|          0|
| 206979|Count Duckula: Va...|          0|
| 201526|PADDINGTON GOES T...|          0|
|  93366|Frederick Street:...|          8|
|  26583|           Vancouver|          8|
| 101731|           Vancouver|          8|
+-------+--

                                                                                

### For the training of our model, we transform book titles to integer index.

In [24]:
book_indexer = StringIndexer(inputCol='Book_Title', outputCol='book_index')
book_indexer_model = book_indexer.fit(data)

idx_to_book = IndexToString(inputCol='book_index',outputCol='Book_Title').setLabels(book_indexer_model.labels)


data = book_indexer_model.transform(data)
data.show()

                                                                                

+-------+--------------------+-----------+----------+
|User_ID|          Book_Title|Book_Rating|book_index|
+-------+--------------------+-----------+----------+
|  23902|    The Fighting Man|          9|     480.0|
|  23902|Nothing Can Be Be...|          0|     391.0|
|  16319|       Which Colour?|          0|     545.0|
|  26583|Huck Scarry's Ste...|          0|     344.0|
| 156534|  The Big Honey Hunt|          0|     455.0|
| 176062|ARE YOU MY MOTHER...|          0|     238.0|
| 114216|                Bess|          0|     258.0|
|  11676| THE COAL HOUSE T/PB|          0|     121.0|
| 131402| THE COAL HOUSE T/PB|          0|     121.0|
| 145927| THE COAL HOUSE T/PB|          0|     121.0|
|  11676|Glue (First Facts...|          0|     106.0|
| 181659|Glue (First Facts...|         10|     106.0|
| 206979|Glue (First Facts...|          0|     106.0|
|  11676|Count Duckula: Va...|          6|     102.0|
| 110029|Count Duckula: Va...|          0|     102.0|
| 206979|Count Duckula: Va..

<a name='3'></a>
## 3. Train a recommendation system using ALS

### We split the data into train and test sets.

In [25]:
(train, test) = data.randomSplit([0.8, 0.2])

In [26]:
user = 'User_ID'
book = 'book_index'
rate = 'Book_Rating'

als = ALS(maxIter=5, regParam=0.01, userCol=user, itemCol=book, ratingCol=rate)
model = als.fit(train)

pred = model.transform(test)

23/09/06 10:31:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/09/06 10:31:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/09/06 10:31:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [27]:
pred.show()

[Stage 94:=>  (2 + 4) / 6][Stage 112:=>(7 + 3) / 10][Stage 113:> (0 + 3) / 10]

+-------+--------------------+-----------+----------+----------+
|User_ID|          Book_Title|Book_Rating|book_index|prediction|
+-------+--------------------+-----------+----------+----------+
|    387|        Rosie'S Walk|          0|      69.0|       NaN|
|   1674|Always Coming Hom...|          0|      96.0|       0.0|
|   2790|For Whome the Bel...|          6|      15.0|       NaN|
|   3363|Once upon a More ...|          0|       2.0|       0.0|
|   4157|Stupid White Men ...|          8|       1.0|       NaN|
|   5543|Mutant Message Do...|          0|      10.0|       NaN|
|   5694|     CADDIE WOODLAWN|          0|       9.0|       NaN|
|   5903|Princess in the S...|          0|      35.0|       0.0|
|   7962|The Sweet Hereaft...|          0|      52.0|       NaN|
|  10447|The Blue Jay's Da...|          0|     122.0|       NaN|
|  11676|Engaged to Die : ...|          8|      29.0| 7.0049353|
|  11676|    Guilty Creatures|          2|      19.0|-2.1854591|
|  11676|            Home

[Stage 116:>                                                      (0 + 10) / 10]                                                                                

In [28]:
recs = model.recommendForAllUsers(10)

In [29]:
recs.show()



+-------+--------------------+
|User_ID|     recommendations|
+-------+--------------------+
|    507|[{108, 0.0}, {107...|
|    882|[{35, 13.790217},...|
|   1025|[{79, 8.999305}, ...|
|   1435|[{108, 0.0}, {107...|
|   1485|[{108, 0.0}, {107...|
|   1903|[{108, 0.0}, {107...|
|   2891|[{108, 0.0}, {107...|
|   3814|[{6, 4.999617}, {...|
|   5048|[{30, 24.077822},...|
|   5499|[{44, 6.9994783},...|
|   5770|[{39, 7.9323525},...|
|   5971|[{544, 7.998626},...|
|   6543|[{56, 10.658414},...|
|   7158|[{108, 0.0}, {107...|
|   7178|[{108, 0.0}, {107...|
|   7286|[{108, 0.0}, {107...|
|   7346|[{51, 14.351246},...|
|   7925|[{108, 0.0}, {107...|
|   8067|[{128, 20.385635}...|
|   8681|[{108, 0.0}, {107...|
+-------+--------------------+
only showing top 20 rows





### We give recommendations for some users.

In [30]:
user_ids = [507, 882]
test_user_df = spark.createDataFrame(user_ids, IntegerType()).toDF('User_ID')

In [31]:
test_user_rec = model.recommendForUserSubset(test_user_df, 10)  # Get the top 10 recommendations

In [32]:
test_user_rec.show()

+-------+--------------------+
|User_ID|     recommendations|
+-------+--------------------+
|    882|[{35, 13.790217},...|
|    507|[{108, 0.0}, {107...|
+-------+--------------------+



### Next we need to transform the book-index to book titles. 

In [33]:
exploded_user_rec = test_user_rec.select('User_ID', F.explode('recommendations').alias("exploded_recommendations"))

In [34]:
exploded_user_rec.show()

+-------+------------------------+
|User_ID|exploded_recommendations|
+-------+------------------------+
|    882|         {35, 13.790217}|
|    882|         {40, 13.247996}|
|    882|         {22, 12.294685}|
|    882|        {205, 10.917315}|
|    882|         {24, 10.766273}|
|    882|           {9, 9.995795}|
|    882|          {53, 8.617926}|
|    882|         {353, 8.194262}|
|    882|          {84, 8.107805}|
|    882|        {480, 7.9890504}|
|    507|              {108, 0.0}|
|    507|              {107, 0.0}|
|    507|              {106, 0.0}|
|    507|              {101, 0.0}|
|    507|              {100, 0.0}|
|    507|               {99, 0.0}|
|    507|               {98, 0.0}|
|    507|               {97, 0.0}|
|    507|               {96, 0.0}|
|    507|               {95, 0.0}|
+-------+------------------------+



23/09/06 10:32:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Here we use indexToString.

In [35]:
rec_names = idx_to_book.transform(exploded_user_rec.select('User_ID', 'exploded_recommendations.book_index'))\
           .select('User_ID', 'Book_Title')

In [36]:
rec_names.show()

+-------+--------------------+
|User_ID|          Book_Title|
+-------+--------------------+
|    882|Princess in the S...|
|    882|Peace, Love and H...|
|    882|Keep It Simple, S...|
|    882|The First-Book Ma...|
|    882|"\Girls' Night Ou...|
|    882|     CADDIE WOODLAWN|
|    882|A Suitable Boy: A...|
|    882|Innocent ErÃ©ndir...|
|    882|Saying Grace: A N...|
|    882|    The Fighting Man|
|    507|      HOUSE OF MIRTH|
|    507|Going Postal : A ...|
|    507|Glue (First Facts...|
|    507|Cathedral, Forge,...|
|    507|Carbohydrate-Addi...|
|    507|            CHILDREN|
|    507|Bouncing Back: I'...|
|    507|               Billy|
|    507|Always Coming Hom...|
|    507|Airborne: A Senti...|
+-------+--------------------+



### Finally we retrieve all the information of the recommended books.

In [37]:
rec_names.createOrReplaceTempView('rec_names')

query = " SELECT R.User_ID, B.ISBN, B.Book_Title, B.Book_Author, B.Year_Of_Publication, B.Publisher\
          FROM book_df B\
          INNER JOIN rec_names R\
          ON R.Book_Title = B.Book_Title"
book_rec_info = spark.sql(query)

In [38]:
book_rec_info.show()

+-------+----------+--------------------+--------------------+-------------------+--------------------+
|User_ID|      ISBN|          Book_Title|         Book_Author|Year_Of_Publication|           Publisher|
+-------+----------+--------------------+--------------------+-------------------+--------------------+
|    882|0060195460|Keep It Simple, S...|      Judy Sheindlin|               2000|HarperCollins Pub...|
|    882|0064472795|Princess in the S...|           Meg Cabot|               2002|        HarperTrophy|
|    882|0060294655|Princess in the S...|           Meg Cabot|               2001|       HarperCollins|
|    507|006015456X|Always Coming Hom...|   Ursula K. Le Guin|               1985|         Harperaudio|
|    507|0007110928|               Billy|   Pamela Stephenson|               2002|HarperCollins Ent...|
|    507|0399135847|               Billy|    Whitley Strieber|               1990|    Putnam Pub Group|
|    507|0425129551|               Billy|    Whitley Strieber|  

<a name='4'></a>
## 4. Save and Load the Model

### Save the als model. 

In [39]:
model.write().overwrite().save('book_recommendations')

23/09/06 10:32:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/09/06 10:32:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/09/06 10:32:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/09/06 10:32:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/09/06 10:32:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


### Save the Index to String Information.

In [40]:
idx_to_book.write().overwrite().save("index_to_books")

### We load the models and make recommendations.

In [41]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.feature import IndexToString
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('books').getOrCreate()

model_loaded = ALSModel.load('book_recommendations')
IndexString_model_loaded = IndexToString.load('index_to_books')

In [42]:
user_ids = [507, 882]
test_user_df = spark.createDataFrame([(user_id,) for user_id in user_ids], ["User_ID"])

In [43]:
loaded_rec = model_loaded.recommendForUserSubset(test_user_df, 10)

In [44]:
exploded_loaded_rec = loaded_rec.select('User_ID', F.explode('recommendations').alias("exploded_recommendations"))

In [45]:
loaded_rec_names = IndexString_model_loaded.transform(exploded_loaded_rec.select('User_ID', 'exploded_recommendations.book_index'))\
            .select('User_ID', 'Book_Title')

In [46]:
loaded_rec_names.show()

+-------+--------------------+
|User_ID|          Book_Title|
+-------+--------------------+
|    882|Princess in the S...|
|    882|Peace, Love and H...|
|    882|Keep It Simple, S...|
|    882|The First-Book Ma...|
|    882|"\Girls' Night Ou...|
|    882|     CADDIE WOODLAWN|
|    882|A Suitable Boy: A...|
|    882|Innocent ErÃ©ndir...|
|    882|Saying Grace: A N...|
|    882|    The Fighting Man|
|    507|      HOUSE OF MIRTH|
|    507|Going Postal : A ...|
|    507|Glue (First Facts...|
|    507|Cathedral, Forge,...|
|    507|Carbohydrate-Addi...|
|    507|            CHILDREN|
|    507|Bouncing Back: I'...|
|    507|               Billy|
|    507|Always Coming Hom...|
|    507|Airborne: A Senti...|
+-------+--------------------+



### Now we get the complete information of the recommended books

First we load the book information.

In [47]:
bookpath = 'Books.csv'
book_info = spark.read.csv(bookpath, header=True, inferSchema=True)

In [48]:
book_info.show()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

In [49]:
for name in book_info.schema.names:
  book_info = book_info.withColumnRenamed(name, name.replace('-', '_'))

### Retrieve the information of recommended books.

In [50]:
book_info.createOrReplaceTempView('book_info')
loaded_rec_names.createOrReplaceTempView('loaded_rec_names')

query = " SELECT R.User_ID, B.ISBN, B.Book_Title, B.Book_Author, B.Year_Of_Publication, B.Publisher\
          FROM book_info B\
          INNER JOIN loaded_rec_names R\
          ON R.Book_Title = B.Book_Title"
book_loaded_info = spark.sql(query)

In [51]:
book_loaded_info.show()

+-------+----------+--------------------+--------------------+-------------------+--------------------+
|User_ID|      ISBN|          Book_Title|         Book_Author|Year_Of_Publication|           Publisher|
+-------+----------+--------------------+--------------------+-------------------+--------------------+
|    882|0060195460|Keep It Simple, S...|      Judy Sheindlin|               2000|HarperCollins Pub...|
|    882|0064472795|Princess in the S...|           Meg Cabot|               2002|        HarperTrophy|
|    882|0060294655|Princess in the S...|           Meg Cabot|               2001|       HarperCollins|
|    507|006015456X|Always Coming Hom...|   Ursula K. Le Guin|               1985|         Harperaudio|
|    507|0007110928|               Billy|   Pamela Stephenson|               2002|HarperCollins Ent...|
|    507|0399135847|               Billy|    Whitley Strieber|               1990|    Putnam Pub Group|
|    507|0425129551|               Billy|    Whitley Strieber|  