In [35]:
import pyspark as ps
from pyspark.sql import SQLContext, SparkSession 
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf, col, when
import numpy as np
from IPython.display import Image
from IPython.display import display

In [36]:
spark = ps.sql.SparkSession.builder \
            .master("local") \
            .appName("Book_recommendation") \
            .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)



In [37]:
ratings_df = spark.read.csv('C:/Users/MyPC/Documents/PYTHON/Big_data/Big_data/goodbooks-10k/ratings.csv', header=True, inferSchema=True)
ratings_df.printSchema()

books_df = spark.read.csv('C:/Users/MyPC/Documents/PYTHON/Big_data/Big_data/goodbooks-10k/books.csv', header=True, inferSchema=True)
books_df.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- best_book_id: integer (nullable = true)
 |-- work_id: integer (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: double (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: double (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- work_ratings_count: string (nullable = true)
 |-- work_text_reviews_count: string (nullable = true)
 |-- ratings_1: double (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- 

In [38]:
ratings_df.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



In [40]:
books_df.show(1)

+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+--------------------+--------------------+
| id|book_id|best_book_id|work_id|books_count|     isbn|          isbn13|        authors|original_publication_year|  original_title|               title|language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|           image_url|     small_image_url|
+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+--------------------+--------------------+
|  1

In [41]:
training_df, validation_df = ratings_df.randomSplit([.8, .2])

In [42]:
iterations = 10
regularization_parameter = 0.1
rank = 4
errors = []
err = 0

In [72]:
als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=4, userCol="user_id", itemCol="book_id", ratingCol="rating")
model = als.fit(training_df)
predictions = model.transform(validation_df)
new_predictions = predictions.filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName='rmse', labelCol="rating", predictionCol="prediction")
rmse = evaluator.eval uate(new_predictions)
print("Root Mean Square Error Value = " + str(rmse))

Root Mean Square Error Value = 0.8960000361640219


In [30]:
als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=5, userCol="user_id", itemCol="book_id", ratingCol="rating")
model = als.fit(training_df)
predictions = model.transform(validation_df)
new_predictions = predictions.filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName='rmse', labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(new_predictions)
print("Root Mean Square Error Value = " + str(rmse))

Root Mean Square Error Value = 0.8992263548594341


In [43]:
for rank in range(4,7):
    als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=rank, userCol="user_id", itemCol="book_id", ratingCol="rating")
    model = als.fit(training_df)
    predictions = model.transform(validation_df)
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName='rmse', labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    print("Root Mean Square Error Value = " + str(rmse))

Root Mean Square Error Value = 0.8960000361640219
Root Mean Square Error Value = 0.9029444727550067
Root Mean Square Error Value = 0.9028908423711779


In [44]:
predictions = model.transform(validation_df)
predictions.show(n=10)

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|    148|  10727|     4| 3.5015664|
|    148|   3922|     3| 3.5108101|
|    148|  32055|     3| 3.1069179|
|    148|   7001|     4| 3.8818383|
|    148|  20967|     3| 3.9724705|
|    148|  14248|     4| 3.5841208|
|    148|  18313|     4|  3.227728|
|    148|   5461|     4| 3.9333022|
|    148|  11569|     2|  3.675868|
|    148|  27934|     4| 3.2381752|
+-------+-------+------+----------+
only showing top 10 rows



In [45]:
predictions.join(books_df, "book_id").select("user_id","title","prediction").show(5)

+-------+----------+----------+
|user_id|     title|prediction|
+-------+----------+----------+
|   6213|Lysistrata|  2.942966|
|  40820|Lysistrata| 3.5782363|
|  11326|Lysistrata| 4.1027613|
|  12466|Lysistrata| 3.9606495|
|  22034|Lysistrata| 4.2580414|
+-------+----------+----------+
only showing top 5 rows



In [66]:
one_user = predictions.join(books_df, "book_id").select("user_id", "title", "image_url", "prediction")
one_user.count()

15975

In [67]:
one_user.show()

+-------+----------+--------------------+----------+
|user_id|     title|           image_url|prediction|
+-------+----------+--------------------+----------+
|   6213|Lysistrata|https://images.gr...|  2.942966|
|  40820|Lysistrata|https://images.gr...| 3.5782363|
|  11326|Lysistrata|https://images.gr...| 4.1027613|
|  12466|Lysistrata|https://images.gr...| 3.9606495|
|  22034|Lysistrata|https://images.gr...| 4.2580414|
|  38734|Lysistrata|https://images.gr...| 4.4405894|
|  51663|Lysistrata|https://images.gr...|  3.790297|
|  41249|Lysistrata|https://images.gr...| 3.9698422|
|  12395|Lysistrata|https://images.gr...|   4.48356|
|  26812|Lysistrata|https://images.gr...|  3.753693|
|   2854|Lysistrata|https://images.gr...| 3.4827216|
|  37585|Lysistrata|https://images.gr...|  3.102296|
|  23376|Lysistrata|https://images.gr...|  3.305453|
|  41329|Lysistrata|https://images.gr...|  4.028338|
|  39423|Lysistrata|https://images.gr...| 3.6367629|
|  42508|Lysistrata|https://images.gr...|  4.2

In [55]:
for book in one_user.take(10):
    print(book.title)
    display(Image(url=book.image_url))

Notes from a Small Island


The Ultimate Hitchhiker's Guide to the Galaxy


The Lord of the Rings: The Art of The Fellowship of the Ring


Freakonomics: A Rogue Economist Explores the Hidden Side of Everything (Freakonomics, #1)


In [56]:
userRecomments = model.recommendForAllUsers(5)
bookRecomments = model.recommendForAllItems(5)

In [57]:
userRecomments.select("user_id","recommendations.book_id").show(10, False)

+-------+------------------------------+
|user_id|book_id                       |
+-------+------------------------------+
|148    |[8703, 3491, 8362, 1338, 2590]|
|463    |[9842, 7947, 4609, 1937, 4154]|
|471    |[3491, 6920, 862, 6590, 2840] |
|496    |[2051, 3457, 7480, 9182, 3920]|
|833    |[8521, 9842, 3953, 8703, 7550]|
|1088   |[1788, 6920, 5207, 3628, 7254]|
|1238   |[3802, 3482, 7157, 5286, 3459]|
|1342   |[3628, 9076, 3282, 5207, 1788]|
|1580   |[3491, 6920, 6902, 7440, 3885]|
|1591   |[3746, 7762, 6102, 5376, 8606]|
+-------+------------------------------+
only showing top 10 rows



In [58]:
userRecomments.printSchema()

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [59]:
bookRecomments.select("book_id", "recommendations.user_id").show(10, False)

+-------+-----------------------------------+
|book_id|user_id                            |
+-------+-----------------------------------+
|1580   |[30757, 23041, 28493, 31411, 41819]|
|4900   |[49360, 27206, 6505, 41168, 33351] |
|5300   |[20122, 25541, 27771, 41807, 5592] |
|6620   |[50333, 45247, 25287, 30757, 27969]|
|7240   |[49360, 41819, 1653, 19137, 51626] |
|7340   |[41819, 6975, 13033, 31186, 52453] |
|7880   |[33211, 43853, 30699, 34886, 34547]|
|9900   |[16210, 38076, 21791, 27645, 9274] |
|471    |[41819, 49360, 31411, 51614, 51190]|
|1591   |[31186, 6975, 31411, 41819, 47036] |
+-------+-----------------------------------+
only showing top 10 rows



In [60]:
users = ratings_df.select("user_id").distinct().limit(3);
users.show()

+-------+
|user_id|
+-------+
|  32592|
|  19984|
|  35982|
+-------+



In [62]:
userSubsetRecs = model.recommendForUserSubset(users, 10)
userSubsetRecs.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|  32592|[{3491, 4.756344}...|
|  35982|[{1788, 5.0571103...|
|  19984|[{8233, 6.1203356...|
+-------+--------------------+



In [63]:
userSubsetRecs.select("user_id","recommendations.book_id").show(10, False)

+-------+------------------------------------------------------------+
|user_id|book_id                                                     |
+-------+------------------------------------------------------------+
|32592  |[3491, 862, 8362, 562, 2840, 6920, 267, 852, 3885, 6590]    |
|35982  |[1788, 3628, 5207, 2205, 9076, 7254, 5880, 6591, 9504, 8757]|
|19984  |[8233, 3491, 8362, 6920, 8926, 6862, 267, 2840, 6400, 3885] |
+-------+------------------------------------------------------------+



In [64]:
books = ratings_df.select("book_id").distinct().limit(3)
books.show()

+-------+
|book_id|
+-------+
|    148|
|    463|
|    471|
+-------+

