# Setup

In [3]:
pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [4]:
from pyspark.sql import SparkSession

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Load data

In [5]:
spark = SparkSession.builder.appName("ALS Book Recommendation System").getOrCreate()

In [6]:
bookDF = spark.read.csv(path="./data/books.csv", header=True, inferSchema=True)
bookDF.show(5)

+---+-------+------------+-------+-----------+---------+----------------+--------------------+-------------------------+--------------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+--------------------+--------------------+
| id|book_id|best_book_id|work_id|books_count|     isbn|          isbn13|             authors|original_publication_year|      original_title|               title|language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|           image_url|     small_image_url|
+---+-------+------------+-------+-----------+---------+----------------+--------------------+-------------------------+--------------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+--------------------

In [7]:
ratingDF = spark.read.csv(path="./data/ratings.csv", header=True, inferSchema=True)
ratingDF.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



# Data preprocessing

In [8]:
(train_data, test_data) = ratingDF.randomSplit([0.8, 0.2], seed=42)

# Model selection


In [9]:
als = ALS(userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True, implicitPrefs=False)

In [10]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [1, 10]) \
            .addGrid(als.regParam, [.01, .1]) \
            .build()

In [11]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           labelCol="rating",
           predictionCol="prediction",
           metricName="rmse"
)
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  4


In [12]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [13]:
#Fit cross validator to the 'train' dataset
cv_model = cv.fit(train_data)

#Extract best model from the cv model above
best_model = cv_model.bestModel

In [14]:
print("**Best Model**")
print("  Rank:", best_model._java_obj.parent().getRank())
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 1
  MaxIter: 10
  RegParam: 0.01


# Predictions

In [15]:
# View the predictions
predictions = best_model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(rmse)

0.8505858846376764


# Recommendations

In [16]:
bookrecommend = best_model.recommendForAllUsers(numItems=5)
bookrecommend.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{4868, 4.315791}...|
|      3|[{4868, 1.2045949...|
|      5|[{4868, 5.4236927...|
|      6|[{4868, 5.539203}...|
|      9|[{4868, 4.0334263...|
|     12|[{4868, 5.058095}...|
|     13|[{4868, 5.178205}...|
|     15|[{4868, 4.267346}...|
|     16|[{4868, 4.0462804...|
|     17|[{4868, 5.9262886...|
|     19|[{4868, 3.9333553...|
|     20|[{4868, 5.8160963...|
|     22|[{4868, 4.4196696...|
|     26|[{4868, 4.368599}...|
|     27|[{4868, 4.8119326...|
|     28|[{4868, 4.267009}...|
|     31|[{4868, 3.8295481...|
|     34|[{4868, 3.697308}...|
|     35|[{4868, 3.8327618...|
|     37|[{4868, 5.253877}...|
+-------+--------------------+
only showing top 20 rows



In [17]:
userrecommend = best_model.recommendForAllItems(numUsers=5)
userrecommend.show(5, truncate=False)

+-------+--------------------------------------------------------------------------------------------------+
|book_id|recommendations                                                                                   |
+-------+--------------------------------------------------------------------------------------------------+
|1      |[{38076, 8.849665}, {43252, 8.665192}, {23353, 8.5885}, {33914, 8.021262}, {21791, 7.849293}]     |
|3      |[{38076, 6.2262554}, {43252, 6.096468}, {23353, 6.0425115}, {33914, 5.643426}, {21791, 5.522436}] |
|5      |[{38076, 8.006217}, {43252, 7.839326}, {23353, 7.769944}, {33914, 7.256768}, {21791, 7.10119}]    |
|6      |[{38076, 8.082029}, {43252, 7.9135575}, {23353, 7.843519}, {33914, 7.325484}, {21791, 7.168432}]  |
|9      |[{38076, 7.019982}, {43252, 6.8736486}, {23353, 6.8128138}, {33914, 6.3628526}, {21791, 6.226439}]|
+-------+--------------------------------------------------------------------------------------------------+
only showing top 5 

In [18]:
userrecommend.first()

Row(book_id=1, recommendations=[Row(user_id=38076, rating=8.849664688110352), Row(user_id=43252, rating=8.665191650390625), Row(user_id=23353, rating=8.588500022888184), Row(user_id=33914, rating=8.021262168884277), Row(user_id=21791, rating=7.849293231964111)])