# Init Spark

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext

# Init sparkcontext
sc = SparkContext(master="local", appName="app recommendation")

from pyspark.sql import SparkSession
spark = SparkSession(sc)
spark

25/01/27 10:37:35 WARN Utils: Your hostname, Sophies-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
25/01/27 10:37:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/27 10:37:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Import library

In [2]:
import pyspark.sql.functions as f
import pyspark.sql.types as t

import matplotlib.pyplot as plt 
import seaborn as sns
import pandas as pd
import pyspark.sql.types as t


from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Import df

In [None]:
df = spark.read.json('Data/reviews_Video_Games_5.json.gz')
df.show(5)

                                                                                

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0700099867|[8, 12]|    1.0|Installing the ga...| 07 9, 2012|A2HD75EMZR8QLN|                 123|Pay to unlock con...|    1341792000|
|0700099867| [0, 0]|    4.0|If you like rally...|06 30, 2013|A3UR8NLLY1ZHCX|Alejandro Henao "...|     Good rally game|    1372550400|
|0700099867| [0, 0]|    1.0|1st shipment rece...|06 28, 2014|A1INA0F5CWW3J4|Amazon Shopper "M...|           Wrong key|    1403913600|
|0700099867|[7, 10]|    3.0|I got this versio...|09 14, 2011|A1DLMTOTHQ4AST|            ampgreen|awesome game, if ...|    1315958400|
|0700099867| [2, 2]|    4.0|I had Dirt 2 on X...|06 14, 2011|A

In [4]:
final_data = df.select('asin', 'reviewerID', 'overall')
final_data.show(5)

+----------+--------------+-------+
|      asin|    reviewerID|overall|
+----------+--------------+-------+
|0700099867|A2HD75EMZR8QLN|    1.0|
|0700099867|A3UR8NLLY1ZHCX|    4.0|
|0700099867|A1INA0F5CWW3J4|    1.0|
|0700099867|A1DLMTOTHQ4AST|    3.0|
|0700099867|A361M14PU2GUEG|    4.0|
+----------+--------------+-------+
only showing top 5 rows



# Explore data

In [5]:
final_data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- overall: double (nullable = true)



In [6]:
final_data.describe().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+-------------------+--------------------+------------------+
|summary|               asin|          reviewerID|           overall|
+-------+-------------------+--------------------+------------------+
|  count|             231780|              231780|            231780|
|   mean|7.198617864286957E9|                NULL| 4.086396582966606|
| stddev|3.628023820739709E9|                NULL|1.2023296087789057|
|    min|         0700099867|A00263941WP7WCIL7...|               1.0|
|    max|         B00KHECZXO|       AZZTC2OYVNE2Q|               5.0|
+-------+-------------------+--------------------+------------------+



                                                                                

In [7]:
# Check null values
for col in final_data.columns:
    print(col, final_data.where(final_data[col].isNull()).count())

                                                                                

asin 0


                                                                                

reviewerID 0


[Stage 12:>                                                         (0 + 1) / 1]

overall 0


                                                                                

We can see that there is no NA values in columns

In [8]:
# drop duplicated values (if any)
df = df.dropDuplicates()

## Calculate sparsity

In [None]:
# Count the number of unique users in the dataset
users = final_data.select('reviewerID').distinct().count()
# Count the number of unique products in the dataset
products = final_data.select('asin').distinct().count()
# Count total number of user–product interactions (ratings/reviews)
numerator = final_data.count()

25/01/27 10:37:48 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [None]:
# Display the interaction count along with the number of users and products
display(numerator, users, products)

231780

24303

10672

In [11]:
# number of ratings matrix could contain if no empty cellers
denominator = users * products 
denominator

259361616

In [None]:
# Compute sparsity of the user–item matrix
sparsity = 1 - (numerator*1.0/ denominator)
print('sparsity: ', sparsity)

sparsity:  0.9991063442479476


# Transform data

In [13]:
# Create an indexer
indexer = StringIndexer(inputCol='asin',
                        outputCol='asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(final_data) 

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(final_data)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='reviewerID',
                         outputCol='reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

                                                                                

In [None]:
# Display the first 5 rows
data_indexed.show(5, truncate=True)

25/01/27 10:37:55 WARN DAGScheduler: Broadcasting large task binary with size 1434.3 KiB


+----------+--------------+-------+--------+--------------+
|      asin|    reviewerID|overall|asin_idx|reviewerID_idx|
+----------+--------------+-------+--------+--------------+
|0700099867|A2HD75EMZR8QLN|    1.0|  2269.0|       14157.0|
|0700099867|A3UR8NLLY1ZHCX|    4.0|  2269.0|       22489.0|
|0700099867|A1INA0F5CWW3J4|    1.0|  2269.0|        7934.0|
|0700099867|A1DLMTOTHQ4AST|    3.0|  2269.0|        7852.0|
|0700099867|A361M14PU2GUEG|    4.0|  2269.0|         847.0|
+----------+--------------+-------+--------+--------------+
only showing top 5 rows



In [15]:
# Check NA values again
data_indexed.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in
           data_indexed.columns]).toPandas().T

                                                                                

Unnamed: 0,0
asin,0
reviewerID,0
overall,0
asin_idx,0
reviewerID_idx,0


# Split dataset

In [16]:
# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data_indexed.randomSplit([0.8, 0.2])

# Build model

In [None]:
# Initialize the ALS (Alternating Least Squares) recommendation model
# - maxIter: number of optimization iterations
# - regParam: regularization parameter to prevent overfitting
# - rank: number of latent factors
# - userCol / itemCol: indexed user and item columns
# - ratingCol: column containing rating values
# - coldStartStrategy="drop": remove rows with unseen users/items during prediction
# - nonnegative=True: enforce non-negative latent factors
als = ALS(maxIter=10,
          regParam=0.1,
          rank = 15,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)

25/01/27 10:37:57 WARN DAGScheduler: Broadcasting large task binary with size 1462.2 KiB
25/01/27 10:37:59 WARN DAGScheduler: Broadcasting large task binary with size 1464.5 KiB
25/01/27 10:38:01 WARN DAGScheduler: Broadcasting large task binary with size 1466.0 KiB
25/01/27 10:38:01 WARN DAGScheduler: Broadcasting large task binary with size 1467.3 KiB
25/01/27 10:38:01 WARN DAGScheduler: Broadcasting large task binary with size 1466.3 KiB
25/01/27 10:38:02 WARN DAGScheduler: Broadcasting large task binary with size 1467.6 KiB
25/01/27 10:38:02 WARN DAGScheduler: Broadcasting large task binary with size 1468.4 KiB
25/01/27 10:38:02 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/01/27 10:38:02 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/01/27 10:38:02 WARN DAGScheduler: Broadcasting large task binary with size 1471.5 KiB
25/01/27 10:38:03 WARN DAGScheduler: Broadcasting large task binary wit

In [18]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [19]:
predictions.show(5)

25/01/27 10:38:10 WARN DAGScheduler: Broadcasting large task binary with size 1447.0 KiB
25/01/27 10:38:10 WARN DAGScheduler: Broadcasting large task binary with size 1506.4 KiB
25/01/27 10:38:10 WARN DAGScheduler: Broadcasting large task binary with size 1505.1 KiB
                                                                                

+----------+--------------+-------+--------+--------------+----------+
|      asin|    reviewerID|overall|asin_idx|reviewerID_idx|prediction|
+----------+--------------+-------+--------+--------------+----------+
|B00000DMAR|A1QHGON6QDTX2K|    5.0|  1621.0|       13285.0| 3.7676222|
|B00000DMAX|A2AV2TR28DGSGC|    5.0|   621.0|        1645.0|  3.969711|
|B00000F1GM|A2AV2TR28DGSGC|    5.0|   290.0|        1645.0| 3.4763653|
|B00000I1BY|A313Y959E605NE|    1.0|  1148.0|        4818.0| 3.8026812|
|B00000K10O|A313Y959E605NE|    4.0|  4417.0|        4818.0| 2.5002816|
+----------+--------------+-------+--------+--------------+----------+
only showing top 5 rows



# Evaluate model

In [None]:
# Create a RegressionEvaluator to compute the RMSE (Root Mean Squared Error)
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="overall",
                                predictionCol="prediction")

# Evaluate the predictions and calculate RMSE.
rmse = evaluator.evaluate(predictions)

25/01/27 10:38:12 WARN DAGScheduler: Broadcasting large task binary with size 1446.3 KiB
25/01/27 10:38:12 WARN DAGScheduler: Broadcasting large task binary with size 1506.4 KiB
25/01/27 10:38:12 WARN DAGScheduler: Broadcasting large task binary with size 1505.1 KiB
25/01/27 10:38:15 WARN DAGScheduler: Broadcasting large task binary with size 1554.1 KiB


In [21]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.2366237557457707


With an RMSE of 1.22, the error is relatively low. However, considering the rating scale ranges from 1 to 5, a gap of 1 to 2 can be considered significant.

In [22]:
# get 5 recommendations which have highest rating.
user_recs = model.recommendForAllUsers(5)

In [23]:
user_recs.show(10, truncate=False)

25/01/27 10:38:15 WARN DAGScheduler: Broadcasting large task binary with size 1553.6 KiB
25/01/27 10:38:26 WARN DAGScheduler: Broadcasting large task binary with size 1546.9 KiB


+--------------+-----------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                |
+--------------+-----------------------------------------------------------------------------------------------+
|0             |[{9197, 5.780767}, {10136, 5.7159014}, {2587, 5.702335}, {7944, 5.6433306}, {10028, 5.623686}] |
|1             |[{7944, 5.403418}, {7425, 5.371918}, {6024, 5.297246}, {6067, 5.2707233}, {4108, 5.2683897}]   |
|2             |[{2931, 5.1611834}, {6067, 5.076249}, {6024, 5.0754766}, {10460, 4.9933643}, {10286, 4.987646}]|
|3             |[{4651, 5.407189}, {3136, 5.301354}, {7944, 5.2374344}, {3964, 5.181002}, {6067, 5.139053}]    |
|4             |[{7944, 5.4843287}, {10434, 5.4207206}, {6067, 5.399769}, {7425, 5.3823752}, {6024, 5.3648324}]|
|5             |[{6067, 6.2858877}, {2931, 6.148175}, {6024, 6.102043}, {9084, 6.075832}, {6008,

                                                                                

# Recommend for users

 (A29KT7UP7DLM1J, A1WGVOVABHFDF3, A3DIS5O83SQJWW)

In [None]:
# Extract a mapping table of unique reviewer index → original reviewer ID
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()

In [25]:
df_reviewer_reviewer_id.count()

25/01/27 10:38:26 WARN DAGScheduler: Broadcasting large task binary with size 1124.3 KiB
25/01/27 10:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1132.6 KiB
                                                                                

24303

In [26]:
df_reviewer_reviewer_id.show(5)

25/01/27 10:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1124.3 KiB
[Stage 215:>                                                        (0 + 1) / 1]

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|       20806.0|A2ZYJOZO6BPV6K|
|         735.0|A3TQTYD0D6AUO3|
|        2580.0|A2QVKLB1VT903K|
|        9117.0|A3OMBKL5EOHA36|
|        2945.0|A2NWQA506BES77|
+--------------+--------------+
only showing top 5 rows



25/01/27 10:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1129.2 KiB
                                                                                

In [None]:
# Join the user recommendations with the mapping table
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')

In [28]:
new_user_recs.show(10, truncate=False)

25/01/27 10:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1553.6 KiB
25/01/27 10:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1124.3 KiB
25/01/27 10:38:39 WARN DAGScheduler: Broadcasting large task binary with size 1548.9 KiB
[Stage 242:>                (0 + 1) / 1][Stage 267:>                (0 + 0) / 1]

+--------------+-----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------+--------------+
|8             |[{6067, 5.776828}, {6024, 5.709651}, {4651, 5.6591644}, {5114, 5.6488113}, {2587, 5.6484017}]  |A1AISPOIIHTHXX|
|0             |[{9197, 5.780767}, {10136, 5.7159014}, {2587, 5.702335}, {7944, 5.6433306}, {10028, 5.623686}] |A3V6Z4RCDGRC44|
|7             |[{7509, 5.3638835}, {3880, 5.2723613}, {7947, 5.14127}, {4768, 5.1395497}, {261, 5.088324}]    |A20DZX38KRBIT8|
|1             |[{7944, 5.403418}, {7425, 5.371918}, {6024, 5.297246}, {6067, 5.2707233}, {4108, 5.2683897}]   |AJKWF4W7QD4NS |
|4             |[{7944, 5.4843287}, {10434, 5.4207206}, {6067, 5.399769}, {7425, 5.3823752}, {6024, 5.36

25/01/27 10:38:41 WARN DAGScheduler: Broadcasting large task binary with size 1128.4 KiB
                                                                                

In [29]:
# Recommendation for specific users'
reviewerID = ['A29KT7UP7DLM1J', 'A1WGVOVABHFDF3', 'A3DIS5O83SQJWW']
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'].isin(reviewerID))
find_user_rec.show(truncate=False)

25/01/27 10:38:41 WARN DAGScheduler: Broadcasting large task binary with size 1553.6 KiB
25/01/27 10:38:41 WARN DAGScheduler: Broadcasting large task binary with size 1126.6 KiB
25/01/27 10:38:50 WARN DAGScheduler: Broadcasting large task binary with size 1548.3 KiB
25/01/27 10:38:52 WARN DAGScheduler: Broadcasting large task binary with size 1129.6 KiB
[Stage 345:>                                                        (0 + 1) / 1]

+--------------+-----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------+--------------+
|6623          |[{4651, 6.1226444}, {7824, 6.078095}, {10434, 6.0312996}, {6067, 5.9631543}, {3831, 5.947102}] |A1WGVOVABHFDF3|
|4622          |[{9398, 5.3144712}, {7618, 5.2516456}, {2931, 5.1145115}, {8935, 5.107025}, {10155, 5.0728745}]|A29KT7UP7DLM1J|
|780           |[{4651, 5.919368}, {9156, 5.4918456}, {2371, 5.466826}, {10662, 5.4606595}, {5916, 5.3917036}] |A3DIS5O83SQJWW|
+--------------+-----------------------------------------------------------------------------------------------+--------------+



                                                                                