In [None]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# data science imports
import math
import numpy as np
import pandas as pd

# data viz imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
# spark config
spark = SparkSession \
    .builder \
    .appName("ebook recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# spark context
sc = spark.sparkContext

In [None]:
data_path = os.path.join('Your_file_here')

In [None]:
# read file
df = spark.read.load(os.path.join(data_path), format='csv', header=True, inferSchema=True)

In [None]:
df = df.sample(0.1)

In [None]:
# drop rows with nulls
df.na.drop()

DataFrame[ : string, asin: string, helpful: string, overall: int, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: string]

In [None]:
df.show()

+---+----------+--------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|   |      asin| helpful|overall|          reviewText|          reviewTime|          reviewerID|        reviewerName|             summary|unixReviewTime|
+---+----------+--------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|  1|B000F83SZQ|  [2, 2]|      4|This book is a re...|          01 6, 2014|       AN0N05A9LIJEQ|            critters|        Different...|    1388966400|
|  7|B000F83SZQ|  [1, 1]|      4|Never heard of Am...|         03 22, 2014|      A3DE6XGZ2EPADS|                 WPY|Enjoyable reading...|    1395446400|
| 13|B000FA64PK|  [0, 0]|      3|Troy Denning's no...|         03 15, 2012|      A3SZMGJMV0G16C|"Andrew Pruette "...|Han and Leia reun...|    1331769600|
| 32|B000FBFMVG|  [0, 0]|      5|I love the storie...|         01 27, 2014| 

In [None]:
# isolate asin, rating & reviewer id
ratings = df.select(df['asin'],df['overall'],df['reviewerID'])
ratings.show()

+----------+-------+--------------------+
|      asin|overall|          reviewerID|
+----------+-------+--------------------+
|B000F83SZQ|      4|       AN0N05A9LIJEQ|
|B000F83SZQ|      4|      A3DE6XGZ2EPADS|
|B000FA64PK|      3|      A3SZMGJMV0G16C|
|B000FBFMVG|      5|      A1ZT7WV0ZUA0OJ|
|B000FC1BN8|      3| Twister sees a p...|
|B000FC26RI|      4|      A2Y1X56N8NPH8G|
|B000FC2MB8|      5|      A1CQ8WG6CUDBNV|
|B000FC2MB8|      5|       AEC18J7P03ZBP|
|B000GFK7L6|      5|      A3GXR6CHHPX0JS|
|B000GFK7L6|      1|      A3BBW3E3I1QWU1|
|B000GFK7L6|      4|       AJLPS4A9HGOQK|
|B000GFK7L6|      5|      A1W5C4LPCVTPXE|
|B000HA4FKY|      5|      A32Q5HN7RQN6GL|
|B000HC48T0|      4|      A3AKG73AM4OKSI|
|B000HC48T0|      5|      A3DKP8M0GSP8UK|
|B000JMKNQ0|      5|      A1ZT7WV0ZUA0OJ|
|B000JMKX4W|      2|       AV4VME0NQTIH3|
|B000JMKXYW|      1|      A2G3C7LJGXLEGF|
|B000JML1QG|      5|      A3PMV0LPE6HTLQ|
|B000JMLBHU|      5|      A260HE0AYLYBWA|
+----------+-------+--------------

In [None]:
# most active reviewers
ratings_by_user = ratings.groupBy("reviewerID").count().orderBy('count', ascending=False)
ratings_by_user.show()

+--------------+-----+
|    reviewerID|count|
+--------------+-----+
|A2WZJDFX12QXKD|   97|
|A13QTZ8CIMHHG4|   87|
|A3PTWPKPXOG8Y5|   81|
| A2YJ8VP1SSHJ7|   72|
| A320TMDV6KCFU|   71|
|A37LY77Q2YPJVL|   62|
|A2VXSQHJWZAQGY|   61|
|A2JZCZYHNQHSCP|   60|
|A328S9RN3U5M68|   57|
| AR9RKLQQC2L6K|   55|
|A3A7FF87LEVCQ1|   51|
| A5JZNVV4TEAWU|   49|
|A20R37WRPLUM1D|   49|
| AWAP0KEX6POQV|   47|
|A14PRVP4JK88E7|   47|
|A3U41P3MHAEXYU|   46|
| A3LXRIY0HE71K|   46|
|A2G5IFYYHFIQNB|   45|
|A3KH1OB5BYYQ8H|   45|
|A3GSSX9XHONVCD|   45|
+--------------+-----+
only showing top 20 rows



In [None]:
# most frequently reviewed ebooks
ratings_by_ebook = ratings.groupBy("asin").count().orderBy('count', ascending=False)
ratings_by_ebook.show()

+----------+-----+
|      asin|count|
+----------+-----+
|B006GWO5WK|  127|
|B00BTIDW4S|   80|
|B00BT0J8ZS|   52|
|B00BSX4U04|   51|
|B00H0V069M|   47|
|B005DOK8NW|   46|
|B00BTIDXVU|   45|
|B00KF0URBM|   40|
|B004E10W0E|   39|
|B007R5YDYA|   38|
|B00BTIDOO6|   37|
|B0050PJZLK|   36|
|B0038KX8S0|   35|
|B00EYMXM2I|   35|
|B00KOSOX2O|   35|
|B005QEA87A|   34|
|B007YJ3JV2|   34|
|B00HYQJPC2|   33|
|B00E5JAIP4|   33|
|B005ME39HU|   33|
+----------+-----+
only showing top 20 rows



In [None]:
# ALS only takes int params
# convert reviewerid & ASIN to int
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='skip')
           for column in list(set(ratings.columns)-set(['overall'])) ]
pipeline = Pipeline(stages=indexer)

transformed = pipeline.fit(ratings).transform(ratings)
transformed.show()

+----------+-------+--------------------+----------+----------------+
|      asin|overall|          reviewerID|asin_index|reviewerID_index|
+----------+-------+--------------------+----------+----------------+
|B000F83SZQ|      4|       AN0N05A9LIJEQ|   11468.0|          1107.0|
|B000F83SZQ|      4|      A3DE6XGZ2EPADS|   11468.0|         15790.0|
|B000FA64PK|      3|      A3SZMGJMV0G16C|   21625.0|         40904.0|
|B000FBFMVG|      5|      A1ZT7WV0ZUA0OJ|   21626.0|         12235.0|
|B000FC1BN8|      3| Twister sees a p...|   21627.0|         20268.0|
|B000FC26RI|      4|      A2Y1X56N8NPH8G|   21628.0|          2217.0|
|B000FC2MB8|      5|      A1CQ8WG6CUDBNV|   11469.0|         26949.0|
|B000FC2MB8|      5|       AEC18J7P03ZBP|   11469.0|         43045.0|
|B000GFK7L6|      5|      A3GXR6CHHPX0JS|    4400.0|         16048.0|
|B000GFK7L6|      1|      A3BBW3E3I1QWU1|    4400.0|         15640.0|
|B000GFK7L6|      4|       AJLPS4A9HGOQK|    4400.0|         43868.0|
|B000GFK7L6|      5|

In [None]:
(train,test) = transformed.randomSplit([0.8, 0.2], seed=2021)

In [None]:
als = ALS(maxIter=5,
          regParam=0.15,
          rank=100,
          userCol="reviewerID_index",
          itemCol="asin_index",
          ratingCol="overall",
          coldStartStrategy="drop",
          implicitPrefs=False,
          nonnegative=True)

In [None]:
model = als.fit(train)

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall", predictionCol="prediction")
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print("RMSE = " + str(rmse))
predictions.show()

RMSE = 1.9897225225710997
+----------+-------+--------------+----------+----------------+----------+
|      asin|overall|    reviewerID|asin_index|reviewerID_index|prediction|
+----------+-------+--------------+----------+----------------+----------+
|B00H5XAQOU|      5|A2GH8KS5V49L1W|     148.0|         13390.0| 2.1768775|
|B00H5XAQOU|      5|A1A34DY20PYGDY|     148.0|          1907.0| 4.5533795|
|B00H5XAQOU|      5| ANXJZG2443VYJ|     148.0|         18642.0| 3.8712943|
|B00H5XAQOU|      4|A3MWMOHKRH0LL1|     148.0|          2340.0| 4.4260373|
|B00586XP5S|      5|A1EU8RXIQIDKD8|     463.0|         10753.0|  2.927475|
|B00ASH7SJG|      5| ALN8WLYCEK22X|     496.0|           711.0| 3.7765102|
|B00EBGBKN0|      5|A3UMD6AT2JDDOS|     833.0|         17020.0| 3.2867382|
|B00EBGBKN0|      5|A13H6BLWC6IGOX|     833.0|           390.0| 4.1971483|
|B00EBGBKN0|      5|A1J0CIRFPTG4N8|     833.0|         11087.0| 3.7641933|
|B00EBGBKN0|      5|A2AQB7O9JOGM78|     833.0|         12984.0| 3.6036403|

In [None]:
# top 10 ebook recs for reviewer subset
reviewers = transformed.select(als.getUserCol()).distinct().limit(20)
reviewer_subset_recs = model.recommendForUserSubset(reviewers, 10) 
reviewer_subset_recs.show()

+----------------+--------------------+
|reviewerID_index|     recommendations|
+----------------+--------------------+
|           15846|[{15941, 4.932573...|
|            1051|[{4439, 4.956513}...|
|             596|[{13249, 4.841562...|
|           28153|[{10181, 2.224671...|
|           24801|[{19179, 4.347111...|
|           20593|[{22221, 3.926091...|
|           10338|[{10111, 4.321809...|
|             692|[{6140, 4.9419117...|
|           21606|[{22499, 2.908265...|
|             299|[{21932, 4.919957...|
|            9753|[{4445, 3.883203}...|
|            4066|[{1464, 4.9105616...|
|           36797|[{18464, 4.424031...|
|            7487|[{3211, 4.048864}...|
|           25175|[{6886, 3.9506187...|
|            3597|[{12882, 4.983201...|
|             769|[{14429, 5.018352...|
|           10129|[{6868, 4.368421}...|
|           17633|[{12118, 5.012356...|
|             934|[{31956, 5.138658...|
+----------------+--------------------+



In [None]:
reviewer_subset_recs = reviewer_subset_recs.toPandas()
reviewer_subset_recs.head()

Unnamed: 0,reviewerID_index,recommendations
0,49202,"[(45660, 4.652079105377197), (57352, 4.5650959..."
1,6067,"[(39900, 7.333075523376465), (54358, 7.2294416..."
2,692,"[(39900, 6.409720420837402), (40344, 6.3910088..."
3,44711,"[(53624, 5.558377265930176), (56525, 5.3301939..."
4,42470,"[(45968, 8.464125633239746), (39900, 8.3477182..."


In [None]:
# function to take ebook ASINs
def asin_index_taker(row):
    asin_index = ''
    for i in row['recommendations']:
        asin_index += str(i['asin_index']) + ','
    return asin_index

In [None]:
reviewer_subset_recs['recommended_ebooks'] = reviewer_subset_recs.apply(asin_index_taker, axis=1)
reviewer_subset_recs

Unnamed: 0,reviewerID_index,recommendations,recommended_ebooks
0,49202,"[(45660, 4.652079105377197), (57352, 4.5650959...","45660,57352,35294,45753,41317,28804,40031,5558..."
1,6067,"[(39900, 7.333075523376465), (54358, 7.2294416...","39900,54358,49692,50738,43645,43644,45968,5197..."
2,692,"[(39900, 6.409720420837402), (40344, 6.3910088...","39900,40344,35294,45968,39879,39736,35410,4859..."
3,44711,"[(53624, 5.558377265930176), (56525, 5.3301939...","53624,56525,56538,57836,59307,55505,53388,5558..."
4,42470,"[(45968, 8.464125633239746), (39900, 8.3477182...","45968,39900,45225,39736,46474,45753,45319,4547..."
5,10561,"[(40344, 7.181835174560547), (45968, 6.9910750...","40344,45968,40170,39900,45660,46474,25663,3225..."
6,64502,"[(45968, 7.220042705535889), (45225, 7.1862854...","45968,45225,39900,40549,41317,46660,51965,5197..."
7,11935,"[(24821, 6.852869987487793), (56435, 6.7208681...","24821,56435,32250,25663,45231,48596,51459,4977..."
8,11967,"[(40344, 6.112934112548828), (40170, 5.5954408...","40344,40170,54035,52554,56305,43125,45660,3529..."
9,38249,"[(53877, 7.046797752380371), (45968, 6.9446749...","53877,45968,54177,40344,40703,46265,53898,4666..."


In [None]:
# top 10 reviewer recs for ebook subset
ebooks = transformed.select(als.getItemCol()).distinct().limit(20) 
ebook_subset_recs = model.recommendForItemSubset(ebooks, 10) 
ebook_subset_recs.show()

+----------+--------------------+
|asin_index|     recommendations|
+----------+--------------------+
|       496|[{14888, 5.697975...|
|     16916|[{19077, 5.598465...|
|     52567|[{39117, 5.958514...|
|     52435|[{23615, 5.552301...|
|     39252|[{33733, 5.203089...|
|     12467|[{4250, 5.721651}...|
|     34614|[{3903, 6.417584}...|
|     27240|[{3903, 5.943151}...|
|     24533|[{64595, 6.794214...|
|     27352|[{9061, 5.4639626...|
|     22195|[{40675, 6.327536...|
|       299|[{64595, 5.368647...|
|     14452|[{64595, 6.192099...|
|     45177|[{14888, 7.275992...|
|     18436|[{49696, 5.884264...|
|     52633|[{16336, 7.35494}...|
|     52546|[{10929, 5.082058...|
|      1761|[{61872, 7.091876...|
|     39221|[{45916, 6.358160...|
|     52827|[{25977, 5.326917...|
+----------+--------------------+



In [None]:
ebook_subset_recs = ebook_subset_recs.toPandas()
ebook_subset_recs.head()

Unnamed: 0,asin_index,recommendations
0,496,"[(14888, 5.697975158691406), (18901, 5.6228938..."
1,16916,"[(19077, 5.598465442657471), (42474, 5.5538649..."
2,52567,"[(39117, 5.95851469039917), (5504, 5.848324298..."
3,52435,"[(23615, 5.552301406860352), (7545, 5.51210021..."
4,39252,"[(33733, 5.203089237213135), (5543, 5.12551689..."


In [None]:
# function to take reviewer ids
def reviewer_id_index_taker(row):
    reviewerID_index = ''
    for i in row['recommendations']:
        reviewerID_index += str(i['reviewerID_index']) + ','
    return reviewerID_index

In [None]:
ebook_subset_recs['users_recommended_to'] = ebook_subset_recs.apply(reviewer_id_index_taker, axis=1)
ebook_subset_recs

Unnamed: 0,asin_index,recommendations,recommended_reviewers
0,496,"[(14888, 5.697975158691406), (18901, 5.6228938...","14888,18901,16403,5228,38438,62898,40086,30723..."
1,16916,"[(19077, 5.598465442657471), (42474, 5.5538649...","19077,42474,25775,10929,10273,23615,56328,4245..."
2,52567,"[(39117, 5.95851469039917), (5504, 5.848324298...","39117,5504,4734,3511,34855,59316,55864,10095,3..."
3,52435,"[(23615, 5.552301406860352), (7545, 5.51210021...","23615,7545,2226,62585,12253,10929,32355,32341,..."
4,39252,"[(33733, 5.203089237213135), (5543, 5.12551689...","33733,5543,41170,9061,55390,584,29902,12253,97..."
5,12467,"[(4250, 5.721651077270508), (4734, 5.653865337...","4250,4734,16118,2546,2333,20987,17121,59744,51..."
6,34614,"[(3903, 6.41758394241333), (44458, 6.357718944...","3903,44458,10929,7545,42474,14888,5228,2420,35..."
7,27240,"[(3903, 5.943150997161865), (33830, 5.92713689...","3903,33830,42474,28742,5228,1874,12253,56328,4..."
8,24533,"[(64595, 6.794214725494385), (12204, 6.6569657...","64595,12204,49748,10929,32355,63523,18538,3950..."
9,27352,"[(9061, 5.463962554931641), (14888, 5.41079139...","9061,14888,34799,37852,63888,10929,2420,64027,..."
