In [62]:
import pandas as pd
import numpy as np

# ! pip install pyspark
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import explode, col, round, abs, when
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession \
    .builder \
    .appName("Book Rec System") \
    .getOrCreate()

21/12/10 12:48:51 WARN Utils: Your hostname, cliodhna-Lenovo-ideapad-530S-14IKB resolves to a loopback address: 127.0.1.1; using 192.168.0.37 instead (on interface wlp1s0)
21/12/10 12:48:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 12:48:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Reading Datasets

In [42]:
interactions = spark.read.csv("data/interactions.csv", sep=',', header=True)

In [43]:
interactions = interactions.withColumn("user_id",interactions["user_id"].cast("int"))
interactions = interactions.withColumn("rating",interactions["rating"].cast("int"))
interactions = interactions.withColumn("book_id",interactions["book_id"].cast("int"))
# interactions = interactions.dropDuplicates(["book_id"])

In [44]:
interactions.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- is_read: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: string (nullable = true)
 |-- user_count: string (nullable = true)



In [45]:
interactions.show(n=5)

+-------+-------+-------+------+-----------+----------+
|user_id|book_id|is_read|rating|is_reviewed|user_count|
+-------+-------+-------+------+-----------+----------+
|      0|    915|      1|     5|        1.0|        15|
|      0|    873|      1|     4|        0.0|        15|
|      0|    871|      1|     2|        0.0|        15|
|      0|    870|      1|     3|        0.0|        15|
|      0|    824|      1|     5|        1.0|        15|
+-------+-------+-------+------+-----------+----------+
only showing top 5 rows



In [38]:
books = spark.read.csv("data/books.csv", sep=',', header=True)

In [39]:
cols = ("isbn","is_ebook","kindle_asin", "country_code", "language_code", "asin","description", "format", "link", "publication_day", "isbn13",
        "publication_month", "edition_information", "url", "image_url", "work_id", "text_reviews_count", "title_without_series")

books = books.drop(*cols)
books.show(5)
books = books.select("book_id","title","genre","authors","publisher", "average_rating", "publication_year", "popular_shelves", "similar_books", "ratings_count")
books = books.dropDuplicates(["book_id"])

+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+---------+----------------+-------+-------------+----------+--------+
|              series|     popular_shelves|average_rating|       similar_books|             authors|           publisher|num_pages|publication_year|book_id|ratings_count|     title|   genre|
+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+---------+----------------+-------+-------------+----------+--------+
|                  []|[{'count': '450',...|          4.43|['834493', '45218...|[{'author_id': '5...|      Blue Sky Press|     40.0|          1995.0|  89378|         1331|Dog Heaven|Children|
|                  []|[{'count': '90', ...|          3.95|['886410', '19743...|                null|                null|     null|            null|   null|         null|      null|    null|
|                null|                null|  

In [32]:
books.show(4)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|             book_id|               title|               genre|             authors|           publisher|average_rating|    publication_year|     popular_shelves|       similar_books|       ratings_count|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
| "" but his image...|[{'author_id': '1...|               784.0|   wild anachronisms| hallucinatory ha...|          4.12|              photos|[{'count': '522',...|['855422', '84351...|           Paperback|
|        Edwin Torres|                null|                null|          Bob Holman| Christian X. Hunter|    Kathy Ebel|        Hal Sirowitz|          Todd Colby|      Janice 

# Training ALS with Cross Validation

In [98]:
(training, test) = interactions.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)
model = als.fit(training)

In [99]:
param_grid = ParamGridBuilder() \
 .addGrid(als.rank, [10, 50, 75, 100]) \
 .addGrid(als.maxIter, [5, 50, 75, 100]) \
 .addGrid(als.regParam, [.01, .05, .1, .15]) \
 .build()

evaluator = RegressionEvaluator(metricName = "rmse", 
 labelCol = "rating", 
 predictionCol = "prediction")

print ("Num models to be tested using param_grid: ", len(param_grid))

Num models to be tested using param_grid:  64


In [100]:
cv = CrossValidator(estimator = als, 
 estimatorParamMaps = param_grid, 
 evaluator = evaluator, 
 numFolds = 5)

model = als.fit(training)
predictions = model.transform(test)

predictions.show(n = 10)

+-------+-------+-------+------+-----------+----------+----------+
|user_id|book_id|is_read|rating|is_reviewed|user_count|prediction|
+-------+-------+-------+------+-----------+----------+----------+
|    471|   1387|      1|     3|        0.0|        19| 3.0971603|
|    471|  14706|      1|     5|        0.0|        19|  3.692721|
|    471|  28726|      1|     5|        0.0|        19| 1.7009088|
|    496|  20396|      1|     5|        0.0|        11|  3.764448|
|    496|  60220|      1|     4|        0.0|        11| 3.5646281|
|    833|   1434|      1|     1|        0.0|         7| 2.0837808|
|    833|  60834|      1|     4|        0.0|         7| 2.1608448|
|   1238|    586|      1|     3|        0.0|        26|  4.651348|
|   1238|   5402|      1|     5|        0.0|        26|  3.722077|
|   1238|   6687|      1|     5|        0.0|        26|  6.006633|
+-------+-------+-------+------+-----------+----------+----------+
only showing top 10 rows



In [101]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.6296875119269387


# Rounding Predictions

In [103]:
predictions = predictions.withColumn("rounded_prediction", round(col("prediction"), 0))
predictions = predictions.withColumn("rounded_prediction",predictions["rounded_prediction"].cast("int"))


In [105]:
new_pred = predictions.drop(col("prediction"))

new_pred = new_pred.withColumnRenamed("rounded_prediction", "prediction")
new_pred = new_pred.withColumn("prediction", when(new_pred["prediction"] > 5, 5).otherwise(new_pred['prediction']).alias("prediction"))

new_pred.show(n=10)

+-------+-------+-------+------+-----------+----------+----------+
|user_id|book_id|is_read|rating|is_reviewed|user_count|prediction|
+-------+-------+-------+------+-----------+----------+----------+
|    471|   1387|      1|     3|        0.0|        19|         3|
|    471|  14706|      1|     5|        0.0|        19|         4|
|    471|  28726|      1|     5|        0.0|        19|         2|
|    496|  20396|      1|     5|        0.0|        11|         4|
|    496|  60220|      1|     4|        0.0|        11|         4|
|    833|   1434|      1|     1|        0.0|         7|         2|
|    833|  60834|      1|     4|        0.0|         7|         2|
|   1238|    586|      1|     3|        0.0|        26|         5|
|   1238|   5402|      1|     5|        0.0|        26|         4|
|   1238|   6687|      1|     5|        0.0|        26|         5|
+-------+-------+-------+------+-----------+----------+----------+
only showing top 10 rows



In [107]:
rmse = evaluator.evaluate(new_pred)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.279026391771162


In [106]:
new_pred = new_pred.withColumn("prediction",new_pred["prediction"].cast("double"))

eval_accuracy = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="accuracy")
eval_precision = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="precisionByLabel")
eval_recall = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="recallByLabel")
eval_f1 = MulticlassClassificationEvaluator(labelCol="rating", predictionCol="prediction", metricName="f1")

accuracy = eval_accuracy.evaluate(new_pred)
# precision = eval_precision.evaluate(new_pred)
# recall = eval_recall.evaluate(new_pred)
# f1score = eval_f1.evaluate(new_pred)
accuracy

[Stage 1928:>                                                       (0 + 1) / 1]                                                                                

0.34344425902293774

# Recommendations

In [18]:
userRecs = model.recommendForAllUsers(10)
bookRecs = model.recommendForAllItems(5)



In [19]:
bookRecs.show(5, False)



+-------+--------------------------------------------------------------------------------------------------+
|book_id|recommendations                                                                                   |
+-------+--------------------------------------------------------------------------------------------------+
|236    |[{7115, 7.9971437}, {137, 7.3445115}, {16429, 6.667948}, {341, 6.522622}, {128, 6.5151815}]       |
|1068   |[{10344, 7.718821}, {552, 7.561115}, {1813, 7.0963063}, {9178, 7.01012}, {971, 6.8828287}]        |
|1226   |[{3132, 7.3775134}, {6814, 6.982759}, {4796, 6.948283}, {1269, 6.5885906}, {15352, 6.291789}]     |
|1265   |[{3132, 9.6307}, {4796, 8.80101}, {16863, 8.458717}, {15703, 8.347777}, {10462, 7.99389}]         |
|1363   |[{552, 12.575844}, {16863, 11.4340725}, {10683, 11.297866}, {1292, 11.236174}, {16720, 11.196016}]|
+-------+--------------------------------------------------------------------------------------------------+
only showing top 5 

                                                                                

In [20]:
userRecs.show(5, False)



+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                                |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[{553387, 14.715152}, {816582, 14.715152}, {816581, 14.715152}, {116511, 14.715152}, {864757, 13.87485}, {864758, 13.87485}, {258334, 13.384239}, {144350, 12.89467}, {377891, 12.761853}, {43488, 12.75015}]  |
|28     |[{15245, 7.530599}, {397183, 7.4533806}, {229575, 7.4522176}, {43536, 7.3541775}, {43747, 7.241123}, {116511, 7.131547}

                                                                                

# Making score column in pandas

In [210]:
user_recs = model.recommendForAllUsers(100)



In [211]:
df = user_recs.toPandas()
df

                                                                                

Unnamed: 0,user_id,recommendations
0,26,"[(864386, 17.755340576171875), (864758, 17.755..."
1,28,"[(397183, 10.6409330368042), (125232, 9.860329..."
2,31,"[(816581, 13.870176315307617), (816582, 13.870..."
3,34,"[(492565, 17.5319766998291), (432016, 16.87717..."
4,44,"[(688081, 11.493675231933594), (125232, 10.683..."
...,...,...
9917,22982,"[(43747, 21.323205947875977), (124740, 19.1596..."
9918,23098,"[(212936, 20.873018264770508), (397183, 20.474..."
9919,23161,"[(43747, 12.351412773132324), (183220, 11.7844..."
9920,23181,"[(397183, 9.964000701904297), (43747, 9.156210..."


In [212]:
df = df.assign(recommendations=list(df.recommendations)).explode('recommendations')

lst = [(x[0], x[1]) for x in df.recommendations]
df[['book_id', 'value']] = lst

In [213]:
max_values = df.groupby(['user_id'], sort=False)['value'].max()
df = df.merge(max_values, how='left', left_on='user_id', right_index=True)
df.columns = ['user_id', 'recommendations', 'book_id', 'value', 'max']
df['score'] = df['value'] / df['max']

In [214]:
df

Unnamed: 0,user_id,recommendations,book_id,value,max,score
0,26,"(864386, 17.755340576171875)",864386.0,17.755341,17.755341,1.000000
0,26,"(864758, 17.755340576171875)",864758.0,17.755341,17.755341,1.000000
0,26,"(864757, 17.755340576171875)",864757.0,17.755341,17.755341,1.000000
0,26,"(432016, 15.268040657043457)",432016.0,15.268041,17.755341,0.859913
0,26,"(492565, 14.56356143951416)",492565.0,14.563561,17.755341,0.820236
...,...,...,...,...,...,...
9921,23187,"(45684, 12.045822143554688)",45684.0,12.045822,19.840441,0.607135
9921,23187,"(38969, 12.024578094482422)",38969.0,12.024578,19.840441,0.606064
9921,23187,"(127722, 12.022064208984375)",127722.0,12.022064,19.840441,0.605937
9921,23187,"(13083, 12.01167106628418)",13083.0,12.011671,19.840441,0.605414


# Exploring recommendations

In [21]:
top_users = interactions.groupBy(interactions['user_id']).agg({'rating':"count"}).sort("count(rating)", ascending=False).dropna().limit(10)

In [22]:
top_users.show()

+-------+-------------+
|user_id|count(rating)|
+-------+-------------+
|   9880|          195|
|   6678|          150|
|  15833|          148|
|  18539|          143|
|  19445|          132|
|   7261|          131|
|  14864|          129|
|  16279|          118|
|  17077|          115|
|   9217|          115|
+-------+-------------+



In [23]:
top_user_list = [row.user_id for row in top_users.select('user_id').collect()]

In [203]:
d = {}
for user in top_user_list:
    rec = userRecs.where(userRecs.user_id == user).select("recommendations").collect()
    d[user] = [i.book_id for i in rec[0]["recommendations"]]
d.keys()

                                                                                

dict_keys([9880, 6678, 15833, 18539, 19445, 7261, 14864, 16279, 17077, 9217])

In [206]:
rec_1 = books.filter(books["book_id"].isin(d[7261]))

In [207]:
rec_1.show()

+-------+--------------------+-----------------+--------------------+------------------+--------------+----------------+--------------------+--------------------+-------------+
|book_id|               title|            genre|             authors|         publisher|average_rating|publication_year|     popular_shelves|       similar_books|ratings_count|
+-------+--------------------+-----------------+--------------------+------------------+--------------+----------------+--------------------+--------------------+-------------+
| 816582|If I Had You (de ...|History/Biography|[{'author_id': '1...|Berkley Publishing|           4.2|          2000.0|[{'count': '649',...|['50742', '121591...|         1914|
| 864758|Knocked Out by My...|      Young Adult|[{'author_id': '6...|    Harper Collins|          3.97|          2006.0|[{'count': '6704'...|['405838', '51618...|          957|
| 864757|'... then he ate ...|      Young Adult|[{'author_id': '6...|              null|          4.09|            

Young adult and history/biography. The user has read a lot of history/biography but not much young adult so thats a bit of a strange recommendation.

In [229]:
user1_books = [x[0] for x in interactions.filter(interactions.user_id == 7261).select('book_id').collect()]
user1 = books.filter(books["book_id"].isin(user1_books))
user1.show(100)

+-------+--------------------+-----------------+--------------------+--------------------+--------------+----------------+--------------------+--------------------+-------------+
|book_id|               title|            genre|             authors|           publisher|average_rating|publication_year|     popular_shelves|       similar_books|ratings_count|
+-------+--------------------+-----------------+--------------------+--------------------+--------------+----------------+--------------------+--------------------+-------------+
| 118222|The Corpse in Ooz...|          Mystery|[{'author_id': '6...|The Mysterious Press|          3.76|          1988.0|[{'count': '149',...|['673410', '86058...|          453|
| 128605|Chester Cricket's...|         Children|[{'author_id': '1...|            Yearling|           3.7|          1984.0|[{'count': '149',...|['460620', '10460...|          266|
|  12937|See You Around, S...|         Children|[{'author_id': '2...|            Yearling|          3.66|

In [230]:
rec_2 = books.filter(books["book_id"].isin(d[17077]))
rec_2.show()

+-------+--------------------+-----------------+--------------------+------------------+--------------+----------------+--------------------+--------------------+-------------+
|book_id|               title|            genre|             authors|         publisher|average_rating|publication_year|     popular_shelves|       similar_books|ratings_count|
+-------+--------------------+-----------------+--------------------+------------------+--------------+----------------+--------------------+--------------------+-------------+
| 816582|If I Had You (de ...|History/Biography|[{'author_id': '1...|Berkley Publishing|           4.2|          2000.0|[{'count': '649',...|['50742', '121591...|         1914|
| 864758|Knocked Out by My...|      Young Adult|[{'author_id': '6...|    Harper Collins|          3.97|          2006.0|[{'count': '6704'...|['405838', '51618...|          957|
| 864757|'... then he ate ...|      Young Adult|[{'author_id': '6...|              null|          4.09|            

In [231]:
user2_books = [x[0] for x in interactions.filter(interactions.user_id == 17077).select('book_id').collect()]
user2 = books.filter(books["book_id"].isin(user2_books))
user2.show(100)

+-------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|book_id|               title|            genre|             authors|           publisher|      average_rating|    publication_year|     popular_shelves|       similar_books|ratings_count|
+-------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|  14654|Encyclopedia Brow...|         Children|[{'author_id': '9...|            Yearling|                4.04|              2000.0|[{'count': '303',...|['363271', '51071...|         3043|
|   1496|  Iphigenia in Aulis|           Poetry|[{'author_id': '9...|Ivan R. Dee Publi...|                3.99|              1997.0|[{'count': '104',...|['237794', '76170...|         2211|
|  16320|Witness for the P...|          Mystery|[{'auth

Has read some of every genre so any recommendation would make sense really.