In [None]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# data science imports
import math
import numpy as np
import pandas as pd

# data viz imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
# spark config
spark = SparkSession \
    .builder \
    .appName("ebook recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# spark context
sc = spark.sparkContext

In [None]:
# Get connect to Google Cloud
from google.cloud import storage
from io import BytesIO
bucket_name = "bdproject1" #Assign the bucket name of my google cloud storage
storage_client = storage.Client()

bucket = storage_client.get_bucket(bucket_name)

blob = storage.blob.Blob("kindle_reviews.csv",bucket)
blob

<Blob: bdproject1, kindle_reviews.csv, None>

In [None]:
# Convert to a pandas dataframe
content = blob.download_as_string()
df0 = pd.read_csv(BytesIO(content))

In [None]:
df0.pop('Unnamed: 0')

0              0
1              1
2              2
3              3
4              4
           ...  
982614    982614
982615    982615
982616    982616
982617    982617
982618    982618
Name: Unnamed: 0, Length: 982619, dtype: int64

In [None]:
schema = StructType([StructField("asin", StringType(), True),
                     StructField("helpful",StringType(), True), 
                     StructField("overall", IntegerType(), True),
                     StructField("reviewText",StringType(), True),
                     StructField("reviewTime",StringType(), True),
                     StructField("reviewerID",StringType(), True),
                     StructField("reviewerName",StringType(), True),
                     StructField("summary",StringType(), True),
                     StructField("unixReviewTime",StringType(), True)])
df = spark.createDataFrame(df0, schema=schema)

In [None]:
# We don't use this if we connect to data in GCP, but we can use this if we just run the code in GCP but do not connect data in GCP
# data_path = os.path.join('kindle_reviews.csv')
# read file
#df = spark.read.load(os.path.join(data_path), format='csv', header=True, inferSchema=True)

In [None]:
df = df.sample(0.1)

In [None]:
# drop rows with nulls
df.na.drop()

DataFrame[asin: string, helpful: string, overall: int, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: string]

In [None]:
df.show()

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|B000F83SZQ|  [2, 2]|      4|This book is a re...| 01 6, 2014| AN0N05A9LIJEQ|            critters|        Different...|    1388966400|
|B000FA64QO|  [2, 2]|      4|Most of the New J...|05 17, 2011| AQZH7YTWQPOBE|            Enjolras|    A Thrackan story|    1305590400|
|B000FC1BN8|  [0, 0]|      3|** Note: this sto...|05 14, 2012|A2OIGPPBTR65MR|                para|Enjoyable, if unr...|    1336953600|
|B000FDJ0FS|  [3, 3]|      1|I read nearly all...|11 23, 2012|A3L25SHGZKH8Q1|     John K. Pearson|Not quite a disaster|    1353628800|
|B000FDJ0FS|[12, 13]|      1|My Combination On...|09 14

In [None]:
# isolate asin, rating & reviewer id
ratings = df.select(df['asin'],df['overall'],df['reviewerID'])
ratings.show()

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|B000F83SZQ|      4| AN0N05A9LIJEQ|
|B000FA64QO|      4| AQZH7YTWQPOBE|
|B000FC1BN8|      3|A2OIGPPBTR65MR|
|B000FDJ0FS|      1|A3L25SHGZKH8Q1|
|B000FDJ0FS|      1|A1AKKYXG0Y0ULL|
|B000GFK7L6|      5|A3GXR6CHHPX0JS|
|B000GFK7L6|      4| AJLPS4A9HGOQK|
|B000GFK7L6|      4|A18FT4SOVZ4CCW|
|B000HA4FKY|      4|A3AKG73AM4OKSI|
|B000HC48T0|      2|A3775OP5VTX5ON|
|B000JMKU0Y|      4| A23U4O4NX29LP|
|B000JMKX4W|      4|A2P99LRDXBF8NF|
|B000JMKX4W|      3| A9L0KCZBVZDJB|
|B000JMKXYW|      5| ALRH7EYOMZ5RH|
|B000JML1QG|      2|A1PSYTEX4HIVAC|
|B000JML5JY|      4|A1YRYH5LD5W189|
|B000JML5JY|      5| AMYS6YEEYSNB1|
|B000JMLBHU|      4|A27EEX3H2CB70G|
|B000JMLBHU|      5|A3MC71A07AH7S3|
|B000JMLBHU|      3|A25J168NSTHMYW|
+----------+-------+--------------+
only showing top 20 rows



In [None]:
# most active reviewers
ratings_by_user = ratings.groupBy("reviewerID").count().orderBy('count', ascending=False)
ratings_by_user.show()

+--------------+-----+
|    reviewerID|count|
+--------------+-----+
|A2WZJDFX12QXKD|  117|
|A13QTZ8CIMHHG4|  112|
|A3PTWPKPXOG8Y5|   82|
| A320TMDV6KCFU|   82|
|A1JLU5H1CCENWX|   78|
|A37LY77Q2YPJVL|   67|
|A2JZCZYHNQHSCP|   65|
| ANOSVLTGRKABQ|   64|
|A3A7FF87LEVCQ1|   62|
| A5JZNVV4TEAWU|   61|
| A2YJ8VP1SSHJ7|   60|
| A3LXRIY0HE71K|   60|
|A328S9RN3U5M68|   58|
|A2VXSQHJWZAQGY|   57|
|A33W5CVYPB8ENS|   49|
| AWAP0KEX6POQV|   48|
|A1JKGTL51HHTU1|   48|
|A23GRXCXQU3SM8|   48|
|A1U5XB6PCIDLIK|   48|
| ADDT2MU773IYL|   47|
+--------------+-----+
only showing top 20 rows



In [None]:
# most frequently reviewed ebooks
ratings_by_ebook = ratings.groupBy("asin").count().orderBy('count', ascending=False)
ratings_by_ebook.show()

+----------+-----+
|      asin|count|
+----------+-----+
|B006GWO5WK|  106|
|B00BTIDW4S|   81|
|B00H0V069M|   55|
|B00JDYC5OI|   52|
|B007R5YDYA|   49|
|B00BTIDXVU|   48|
|B005DOK8NW|   47|
|B005ME39HU|   45|
|B00BT0J8ZS|   45|
|B004E10W0E|   43|
|B00BTIDOO6|   43|
|B00BSX4U04|   43|
|B00DJB6KE2|   42|
|B00KF0URBM|   41|
|B00HYQJPC2|   37|
|B00CCRTFSC|   37|
|B00E5JAIP4|   37|
|B008IL48BK|   37|
|B00FGFY86G|   35|
|B00BTN2DSY|   34|
+----------+-----+
only showing top 20 rows



In [None]:
# ALS only takes int params
# convert reviewerid & ASIN to int
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='skip')
           for column in list(set(ratings.columns)-set(['overall'])) ]
pipeline = Pipeline(stages=indexer)

transformed = pipeline.fit(ratings).transform(ratings)
transformed.show()

+----------+-------+--------------+----------------+----------+
|      asin|overall|    reviewerID|reviewerID_index|asin_index|
+----------+-------+--------------+----------------+----------+
|B000F83SZQ|      4| AN0N05A9LIJEQ|          1270.0|   21488.0|
|B000FA64QO|      4| AQZH7YTWQPOBE|         40709.0|   21489.0|
|B000FC1BN8|      3|A2OIGPPBTR65MR|          7897.0|   21490.0|
|B000FDJ0FS|      1|A3L25SHGZKH8Q1|         35299.0|   11453.0|
|B000FDJ0FS|      1|A1AKKYXG0Y0ULL|         21875.0|   11453.0|
|B000GFK7L6|      5|A3GXR6CHHPX0JS|         16720.0|    6880.0|
|B000GFK7L6|      4| AJLPS4A9HGOQK|         19037.0|    6880.0|
|B000GFK7L6|      4|A18FT4SOVZ4CCW|         21557.0|    6880.0|
|B000HA4FKY|      4|A3AKG73AM4OKSI|         33563.0|   21491.0|
|B000HC48T0|      2|A3775OP5VTX5ON|         32998.0|   21492.0|
|B000JMKU0Y|      4| A23U4O4NX29LP|          3186.0|   21493.0|
|B000JMKX4W|      4|A2P99LRDXBF8NF|          7913.0|   11454.0|
|B000JMKX4W|      3| A9L0KCZBVZDJB|     

In [None]:
(train,test) = transformed.randomSplit([0.8, 0.2], seed=2021)

In [None]:
als = ALS(maxIter=5,
          regParam=0.15,
          rank=100,
          userCol="reviewerID_index",
          itemCol="asin_index",
          ratingCol="overall",
          coldStartStrategy="drop",
          implicitPrefs=False,
          nonnegative=True)

In [None]:
model = als.fit(train)

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall", predictionCol="prediction")
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print("RMSE = " + str(rmse))
predictions.show()

RMSE = 1.9171296719848283
+----------+-------+--------------+----------------+----------+----------+
|      asin|overall|    reviewerID|reviewerID_index|asin_index|prediction|
+----------+-------+--------------+----------------+----------+----------+
|B007UAUPT4|      4|A1LM14JMSUTXEZ|          4356.0|     148.0| 2.8183692|
|B00IIYHLM0|      5| AD6ROXTU7305R|          5759.0|     463.0| 3.4258456|
|B00IIYHLM0|      5| ADK827JF6CKJ0|          5764.0|     463.0| 3.9775424|
|B00IIYHLM0|      4|A3JHMDPDDFUSLV|          2631.0|     463.0| 4.2370834|
|B00IIYHLM0|      5| A396UOEM7XPB5|          8518.0|     463.0| 2.5575194|
|B0095612HK|      5|A2U8YWPP1PYHJM|            53.0|     496.0|  4.188701|
|B0095612HK|      5|A371U2QY2N2R66|         15995.0|     496.0| 4.0281844|
|B00E0QS0D4|      5|A3KG5TKIRPXXD7|         16968.0|     833.0|  3.180298|
|B00E0QS0D4|      5|A2HUVS5F8FQKBG|         14143.0|     833.0| 2.4019878|
|B00E0QS0D4|      4|A3JGY8WYUZBU6D|          1964.0|     833.0|  3.120148|

In [None]:
# top 10 ebook recs for reviewer subset
reviewers = transformed.select(als.getUserCol()).distinct().limit(20)
reviewer_subset_recs = model.recommendForUserSubset(reviewers, 10) 
reviewer_subset_recs.show()

+----------------+--------------------+
|reviewerID_index|     recommendations|
+----------------+--------------------+
|            7554|[{31535, 5.061816...|
|           16916|[{4943, 3.8377805...|
|           29811|[{37164, 4.335366...|
|            1051|[{9460, 4.8500547...|
|             596|[{8850, 4.9752464...|
|           28153|[{21544, 4.939356...|
|           39817|[{26625, 3.695491...|
|             305|[{11787, 5.019263...|
|           30867|[{11999, 3.915433...|
|           40999|[{11824, 2.954538...|
|           10561|[{15843, 4.850876...|
|            6433|[{16218, 4.894594...|
|            4142|[{6601, 2.8786082...|
|            8649|[{27225, 3.962237...|
|            3597|[{26760, 4.831100...|
|           10129|[{2148, 4.93216},...|
|           11924|[{2369, 4.772122}...|
|            7782|[{5682, 4.693063}...|
+----------------+--------------------+



In [None]:
reviewer_subset_recs = reviewer_subset_recs.toPandas()
reviewer_subset_recs.head()

Unnamed: 0,reviewerID_index,recommendations
0,7554,"[(31535, 5.061816692352295), (549, 5.009945392..."
1,16916,"[(4943, 3.837780475616455), (7013, 3.618249893..."
2,29811,"[(37164, 4.3353657722473145), (23177, 4.289042..."
3,1051,"[(9460, 4.850054740905762), (1040, 4.827525138..."
4,596,"[(8850, 4.975246429443359), (5971, 4.963124752..."


In [None]:
# function to take ebook ASINs
def asin_index_taker(row):
    asin_index = ''
    for i in row['recommendations']:
        asin_index += str(i['asin_index']) + ','
    return asin_index

In [None]:
reviewer_subset_recs['recommended_ebooks'] = reviewer_subset_recs.apply(asin_index_taker, axis=1)
reviewer_subset_recs

Unnamed: 0,reviewerID_index,recommendations,recommended_ebooks
0,7554,"[(31535, 5.061816692352295), (549, 5.009945392...","31535,549,25232,8196,18559,1743,10888,5629,417..."
1,16916,"[(4943, 3.837780475616455), (7013, 3.618249893...","4943,7013,10976,1208,6062,5019,3512,2210,6253,..."
2,29811,"[(37164, 4.3353657722473145), (23177, 4.289042...","37164,23177,17410,15442,2685,34027,9597,19622,..."
3,1051,"[(9460, 4.850054740905762), (1040, 4.827525138...","9460,1040,36069,2752,11142,3217,10836,5629,111..."
4,596,"[(8850, 4.975246429443359), (5971, 4.963124752...","8850,5971,10632,16098,7009,3077,19980,6301,935..."
5,28153,"[(21544, 4.939355850219727), (2842, 3.06026959...","21544,2842,17311,18944,11856,4237,2609,4756,35..."
6,39817,"[(26625, 3.695491075515747), (14071, 3.4853115...","26625,14071,5421,7057,3490,3945,2428,15217,757..."
7,305,"[(11787, 5.019262790679932), (22347, 4.9408221...","11787,22347,11514,21607,32388,6903,3217,6978,4..."
8,30867,"[(11999, 3.915433406829834), (5920, 3.65094876...","11999,5920,9738,19328,19111,11388,9978,2884,13..."
9,40999,"[(11824, 2.954538345336914), (22131, 2.4487910...","11824,22131,6032,4720,20086,12312,13336,19865,..."


In [None]:
# top 10 reviewer recs for ebook subset
ebooks = transformed.select(als.getItemCol()).distinct().limit(20) 
ebook_subset_recs = model.recommendForItemSubset(ebooks, 10) 
ebook_subset_recs.show()

+----------+--------------------+
|asin_index|     recommendations|
+----------+--------------------+
|     11757|[{39110, 2.906642...|
|      7171|[{27766, 4.169920...|
|     11766|[{9093, 5.500515}...|
|     22984|[{9680, 4.3154173...|
|     21606|[{27672, 3.927634...|
|     22195|[{32424, 4.936210...|
|     12172|[{7474, 4.8595257...|
|     21911|[{10662, 1.237143...|
|     22797|[{8199, 4.968968}...|
|     11935|[{19137, 4.460495...|
|     11967|[{9869, 5.423187}...|
|     22331|[{40899, 3.924944...|
|     21791|[{9687, 4.33749},...|
|     22274|[{3550, 4.893343}...|
|      7115|[{21415, 3.893912...|
|     21933|[{5391, 2.0466504...|
|     12275|[{31424, 4.942535...|
|     11924|[{2, 5.0259}, {21...|
|     21825|[{19498, 4.925064...|
+----------+--------------------+



In [None]:
ebook_subset_recs = ebook_subset_recs.toPandas()
ebook_subset_recs.head()

Unnamed: 0,asin_index,recommendations
0,11757,"[(39110, 2.906642198562622), (9093, 2.51824808..."
1,7171,"[(27766, 4.169920444488525), (23524, 4.1699204..."
2,11766,"[(9093, 5.500514984130859), (9687, 5.385972976..."
3,22984,"[(9680, 4.315417289733887), (38615, 4.14181375..."
4,21606,"[(27672, 3.9276347160339355), (12634, 3.372845..."


In [None]:
# function to take reviewer ids
def reviewer_id_index_taker(row):
    reviewerID_index = ''
    for i in row['recommendations']:
        reviewerID_index += str(i['reviewerID_index']) + ','
    return reviewerID_index

In [None]:
ebook_subset_recs['users_recommended_to'] = ebook_subset_recs.apply(reviewer_id_index_taker, axis=1)
ebook_subset_recs

Unnamed: 0,asin_index,recommendations,users_recommended_to
0,11757,"[(39110, 2.906642198562622), (9093, 2.51824808...","39110,9093,9680,36169,41623,40854,22152,19699,..."
1,7171,"[(27766, 4.169920444488525), (23524, 4.1699204...","27766,23524,19915,7127,39082,10,20776,35806,40..."
2,11766,"[(9093, 5.500514984130859), (9687, 5.385972976...","9093,9687,38834,15808,22152,41623,40854,39082,..."
3,22984,"[(9680, 4.315417289733887), (38615, 4.14181375...","9680,38615,39082,21973,8662,7004,31887,104,233..."
4,21606,"[(27672, 3.9276347160339355), (12634, 3.372845...","27672,12634,13315,39117,21888,20639,36176,9563..."
5,22195,"[(32424, 4.936210632324219), (14559, 4.1451697...","32424,14559,11697,13123,30761,12943,7827,5678,..."
6,12172,"[(7474, 4.859525680541992), (9746, 4.616447448...","7474,9746,18080,9687,31437,15808,7701,36169,11..."
7,21911,"[(10662, 1.237143635749817), (18926, 1.2156422...","10662,18926,22969,2835,8115,19968,38037,26303,..."
8,22797,"[(8199, 4.968967914581299), (8100, 4.927923202...","8199,8100,12081,4550,39082,1271,37355,9746,968..."
9,11935,"[(19137, 4.4604949951171875), (16156, 4.246379...","19137,16156,3414,2857,4273,1420,36169,38615,52..."
