## DATASET

Amazon reviews: Kindle Store Category
    
    https://www.kaggle.com/bharadwaj6/kindle-reviews

## Initialization

In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME

import findspark
findspark.init()

In [2]:
# Import required library

from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Recommendation Systems") \
    .getOrCreate()

In [3]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001654F1A65C0>


## Loading Data & Pre Processing

In [4]:
# Import Dataset Amazon Reviews of Kindle Store Category
data = spark.read.csv("kindle_reviews.csv", header=True, inferSchema=True)

In [5]:
data.count()

982820

In [6]:
#Drop missing value
df = data.dropna()

In [7]:
df.count()

978657

In [8]:
df.show()

+---+----------+-------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0|      asin|helpful|overall|          reviewText|          reviewTime|          reviewerID|        reviewerName|             summary|      unixReviewTime|
+---+----------+-------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|B000F83SZQ| [0, 0]|      5|I enjoy vintage b...|          05 5, 2014|      A1F6404F1VG29J|          Avidreader|  Nice vintage story|          1399248000|
|  1|B000F83SZQ| [2, 2]|      4|This book is a re...|          01 6, 2014|       AN0N05A9LIJEQ|            critters|        Different...|          1388966400|
|  2|B000F83SZQ| [2, 2]|      4|This was a fairly...|          04 4, 2014|       A795DMNCJILA6|                 dot|               Oldie|          1396569600|
|  3|B000F83SZQ| [1, 1]|      5|I'd never read

In [9]:
#Select only used column for colaborative filtering
df2 = df.select('asin','reviewerID','overall')
df2.show()

+----------+--------------------+-------+
|      asin|          reviewerID|overall|
+----------+--------------------+-------+
|B000F83SZQ|      A1F6404F1VG29J|      5|
|B000F83SZQ|       AN0N05A9LIJEQ|      4|
|B000F83SZQ|       A795DMNCJILA6|      4|
|B000F83SZQ|      A1FV0SX13TWVXQ|      5|
|B000F83SZQ|      A3SPTOKDG7WBLN|      4|
|B000F83SZQ|      A1RK2OCZDSGC6R|      4|
|B000F83SZQ|      A2HSAKHC3IBRE6|      4|
|B000F83SZQ|      A3DE6XGZ2EPADS|      4|
|B000FA64PA|      A1UG4Q4D3OAH3A|      5|
|B000FA64PA|       AQZH7YTWQPOBE|      4|
|B000FA64PA|      A1ZT7WV0ZUA0OJ|      5|
|B000FA64PA|      A2ZFR72PT054YS|      4|
|B000FA64PA|       A2QK1U70OJ74P|      3|
|B000FA64PK|      A3SZMGJMV0G16C|      3|
|B000FA64PK|      A3H8PE1UFK04JZ|      5|
|B000FA64PK|      A2EN84QHDRZLP2|      5|
|B000FA64PK|      A1UG4Q4D3OAH3A|      5|
|B000FA64PK|      A38Z3Q6DTDIH9J|      3|
|B000FA64PK|      A1ZT7WV0ZUA0OJ|      5|
|B000FA64PK| it surely failed...|      4|
+----------+--------------------+-

In [10]:
# Convert string column into numberic type for ALS algorithm

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df2) for column in list(set(df2.columns)-set(['overall']))]

pipeline = Pipeline(stages=indexers)
indexed = pipeline.fit(df2).transform(df2)

indexed.show()

+----------+--------------------+-------+----------------+----------+
|      asin|          reviewerID|overall|reviewerID_index|asin_index|
+----------+--------------------+-------+----------------+----------+
|B000F83SZQ|      A1F6404F1VG29J|      5|         19786.0|   38086.0|
|B000F83SZQ|       AN0N05A9LIJEQ|      4|          2070.0|   38086.0|
|B000F83SZQ|       A795DMNCJILA6|      4|          8162.0|   38086.0|
|B000F83SZQ|      A1FV0SX13TWVXQ|      5|          9256.0|   38086.0|
|B000F83SZQ|      A3SPTOKDG7WBLN|      4|         34056.0|   38086.0|
|B000F83SZQ|      A1RK2OCZDSGC6R|      4|         35261.0|   38086.0|
|B000F83SZQ|      A2HSAKHC3IBRE6|      4|          1171.0|   38086.0|
|B000F83SZQ|      A3DE6XGZ2EPADS|      4|         15433.0|   38086.0|
|B000FA64PA|      A1UG4Q4D3OAH3A|      5|         36693.0|   58017.0|
|B000FA64PA|       AQZH7YTWQPOBE|      4|         68644.0|   58017.0|
|B000FA64PA|      A1ZT7WV0ZUA0OJ|      5|         47726.0|   58017.0|
|B000FA64PA|      A2

In [11]:
df3 = indexed.select('asin_index','reviewerID_index','overall')
df3.show()

+----------+----------------+-------+
|asin_index|reviewerID_index|overall|
+----------+----------------+-------+
|   38086.0|         19786.0|      5|
|   38086.0|          2070.0|      4|
|   38086.0|          8162.0|      4|
|   38086.0|          9256.0|      5|
|   38086.0|         34056.0|      4|
|   38086.0|         35261.0|      4|
|   38086.0|          1171.0|      4|
|   38086.0|         15433.0|      4|
|   58017.0|         36693.0|      5|
|   58017.0|         68644.0|      4|
|   58017.0|         47726.0|      5|
|   58017.0|         35831.0|      4|
|   58017.0|         60060.0|      3|
|   36896.0|         53121.0|      3|
|   36896.0|         47500.0|      5|
|   36896.0|         41707.0|      5|
|   36896.0|         36693.0|      5|
|   36896.0|         61772.0|      3|
|   36896.0|         47726.0|      5|
|   36896.0|        101445.0|      4|
+----------+----------------+-------+
only showing top 20 rows



## Model with ALS

In [12]:
(training, test) = df3.randomSplit([0.8, 0.2])

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="reviewerID_index", itemCol="asin_index", ratingCol="overall",
          coldStartStrategy="drop")
model = als.fit(training)

In [14]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 4.724202440635187


In [15]:
# Generate top 10 user recommendations for each kindle
kindleRecs = model.recommendForAllItems(10)
kindleRecs.show()

+----------+--------------------+
|asin_index|     recommendations|
+----------+--------------------+
|       148|[[14731, 23.45349...|
|       463|[[19287, 25.64172...|
|       471|[[19448, 23.35985...|
|       496|[[16873, 28.33516...|
|       833|[[14084, 41.93853...|
|      1088|[[18258, 16.16814...|
|      1238|[[28944, 25.72022...|
|      1342|[[19448, 25.47693...|
|      1580|[[13385, 29.41845...|
|      1591|[[32796, 26.12643...|
|      1645|[[12503, 30.78527...|
|      1829|[[28317, 39.39565...|
|      1959|[[15822, 23.56037...|
|      2122|[[25768, 36.85136...|
|      2142|[[14084, 49.70704...|
|      2366|[[32958, 35.49619...|
|      2659|[[17802, 30.64790...|
|      2866|[[20957, 25.81566...|
|      3175|[[28616, 30.14335...|
|      3749|[[9828, 28.906378...|
+----------+--------------------+
only showing top 20 rows



In [16]:
# Generate top 10 kindle recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()

+----------------+--------------------+
|reviewerID_index|     recommendations|
+----------------+--------------------+
|             148|[[28268, 9.548618...|
|             463|[[9181, 13.729944...|
|             471|[[19997, 18.1739]...|
|             496|[[12659, 19.33063...|
|             833|[[15131, 13.66852...|
|            1088|[[20863, 12.56752...|
|            1238|[[34577, 11.18894...|
|            1342|[[18337, 14.15636...|
|            1580|[[15953, 15.79888...|
|            1591|[[13615, 11.84372...|
|            1645|[[29634, 12.05676...|
|            1829|[[12702, 13.27963...|
|            1959|[[8588, 21.006062...|
|            2122|[[28674, 14.98121...|
|            2142|[[39159, 19.46223...|
|            2366|[[20751, 18.72396...|
|            2659|[[41959, 18.69562...|
|            2866|[[18207, 27.77227...|
|            3175|[[21440, 20.63795...|
|            3749|[[17587, 13.57192...|
+----------------+--------------------+
only showing top 20 rows

