In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '12g')
sc = SparkContext(master='local', appName='Recommendation')

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession(sc)

In [6]:
data = spark.read.json("reviews_Video_Games_5.json.gz")

In [7]:
data.show(5,truncate=True)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0700099867|[8, 12]|    1.0|Installing the ga...| 07 9, 2012|A2HD75EMZR8QLN|                 123|Pay to unlock con...|    1341792000|
|0700099867| [0, 0]|    4.0|If you like rally...|06 30, 2013|A3UR8NLLY1ZHCX|Alejandro Henao "...|     Good rally game|    1372550400|
|0700099867| [0, 0]|    1.0|1st shipment rece...|06 28, 2014|A1INA0F5CWW3J4|Amazon Shopper "M...|           Wrong key|    1403913600|
|0700099867|[7, 10]|    3.0|I got this versio...|09 14, 2011|A1DLMTOTHQ4AST|            ampgreen|awesome game, if ...|    1315958400|
|0700099867| [2, 2]|    4.0|I had Dirt 2 on X...|06 14, 2011|A

In [8]:
data_sub = data.select(['reviewerID', 'asin', 'overall'])

In [9]:
data_sub.count()

231780

In [10]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [11]:
data_sub.show(5, truncate=True)

+--------------+----------+-------+
|    reviewerID|      asin|overall|
+--------------+----------+-------+
|A2HD75EMZR8QLN|0700099867|    1.0|
|A3UR8NLLY1ZHCX|0700099867|    4.0|
|A1INA0F5CWW3J4|0700099867|    1.0|
|A1DLMTOTHQ4AST|0700099867|    3.0|
|A361M14PU2GUEG|0700099867|    4.0|
+--------------+----------+-------+
only showing top 5 rows



In [12]:
data_sub.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)



In [13]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
reviewerID,0
asin,0
overall,0


##### do not have null

In [14]:
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()
display(numerator, users, products)

231780

24303

10672

In [15]:
denominator = users * products
denominator

259361616

In [16]:
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.9991063442479476)

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [18]:
indexer = StringIndexer(inputCol='reviewerID',outputCol='reviewerID_idx')
indexer_model = indexer.fit(data_sub)
data_indexed = indexer_model.transform(data_sub)

In [19]:
indexer = StringIndexer(inputCol='asin',outputCol='asin_idx')
indexer_model = indexer.fit(data_indexed)
data_indexed = indexer_model.transform(data_indexed)

In [20]:
data_indexed.show(5, truncate=True)

+--------------+----------+-------+--------------+--------+
|    reviewerID|      asin|overall|reviewerID_idx|asin_idx|
+--------------+----------+-------+--------------+--------+
|A2HD75EMZR8QLN|0700099867|    1.0|       14118.0|  2339.0|
|A3UR8NLLY1ZHCX|0700099867|    4.0|       23077.0|  2339.0|
|A1INA0F5CWW3J4|0700099867|    1.0|        8568.0|  2339.0|
|A1DLMTOTHQ4AST|0700099867|    3.0|        9219.0|  2339.0|
|A361M14PU2GUEG|0700099867|    4.0|         842.0|  2339.0|
+--------------+----------+-------+--------------+--------+
only showing top 5 rows



In [21]:
data_indexed.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerID_idx: double (nullable = false)
 |-- asin_idx: double (nullable = false)



In [22]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in data_indexed.columns]).toPandas().T

Unnamed: 0,0
reviewerID,0
asin,0
overall,0
reviewerID_idx,0
asin_idx,0


In [24]:
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from time import time

In [26]:
als = ALS(maxIter=10,
regParam=0.09,
rank = 25,
userCol="reviewerID_idx",
itemCol="asin_idx",
ratingCol="overall",
coldStartStrategy="drop",
nonnegative=True)
model = als.fit(training)

In [27]:
predictions = model.transform(test)
predictions.show(3)

+--------------+----------+-------+--------------+--------+----------+
|    reviewerID|      asin|overall|reviewerID_idx|asin_idx|prediction|
+--------------+----------+-------+--------------+--------+----------+
|A3M6TSEV71537G|B00BMFIXT2|    4.0|        2156.0|   148.0| 3.5747972|
|A27UH9ZPW41B9N|B00BMFIXT2|    3.0|       11059.0|   148.0| 1.8266209|
|A1DXS4XEIW98ZR|B00BMFIXT2|    5.0|       10010.0|   148.0| 4.4557986|
+--------------+----------+-------+--------------+--------+----------+
only showing top 3 rows



In [28]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [29]:
rmse

1.2147055258616

In [30]:
user_recs = model.recommendForAllUsers(5)
user_recs.show(10, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                |
+--------------+-----------------------------------------------------------------------------------------------+
|1580          |[[9976, 5.7968745], [8158, 5.7414174], [5678, 5.5667357], [2223, 5.526618], [4408, 5.5217166]] |
|4900          |[[5512, 6.133538], [6010, 6.0284367], [4484, 5.922407], [5704, 5.887791], [5518, 5.8752766]]   |
|5300          |[[5704, 6.218974], [6010, 5.9132857], [8703, 5.887152], [4484, 5.8721905], [6358, 5.7527514]]  |
|6620          |[[5704, 6.538619], [8969, 6.376055], [5512, 6.0152116], [9731, 5.996274], [1715, 5.990055]]    |
|7240          |[[5704, 5.7533484], [8969, 5.721592], [2240, 5.4041333], [2550, 5.303724], [9185, 5.300676]]   |
|7340          |[[5704, 6.0059805], [8969, 5.9056683], [6358, 5.6819654], [6010, 5.6410317], [55

In [31]:
user_recs.count()

24301

In [32]:
for user in user_recs.head(5):
    print(user)
    print("\n")

Row(reviewerID_idx=1580, recommendations=[Row(asin_idx=9976, rating=5.796874523162842), Row(asin_idx=8158, rating=5.741417407989502), Row(asin_idx=5678, rating=5.566735744476318), Row(asin_idx=2223, rating=5.526618003845215), Row(asin_idx=4408, rating=5.521716594696045)])


Row(reviewerID_idx=4900, recommendations=[Row(asin_idx=5512, rating=6.133537769317627), Row(asin_idx=6010, rating=6.028436660766602), Row(asin_idx=4484, rating=5.922407150268555), Row(asin_idx=5704, rating=5.887791156768799), Row(asin_idx=5518, rating=5.875276565551758)])


Row(reviewerID_idx=5300, recommendations=[Row(asin_idx=5704, rating=6.2189741134643555), Row(asin_idx=6010, rating=5.913285732269287), Row(asin_idx=8703, rating=5.887152194976807), Row(asin_idx=4484, rating=5.872190475463867), Row(asin_idx=6358, rating=5.752751350402832)])


Row(reviewerID_idx=6620, recommendations=[Row(asin_idx=5704, rating=6.538619041442871), Row(asin_idx=8969, rating=6.376054763793945), Row(asin_idx=5512, rating=6.015211582183

In [33]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|        7209.0|A293BVHBY4TIKH|
|         572.0|A3GNL2F86PUZZ6|
|       10974.0|A33CP5BH4D23Q1|
|        3516.0| ATOSQVSOA3D8Q|
|         823.0| AKFE1P1ZDBPXU|
+--------------+--------------+
only showing top 5 rows



In [34]:
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()
df_asin_asin_idx.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  4763.0|B00000I1BE|
|  9133.0|B000023VUP|
|  6268.0|B00002STYU|
|  9760.0|B000035XKY|
|  8283.0|B000035Y34|
+--------+----------+
only showing top 5 rows



In [35]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')
new_user_recs.show(10, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------+--------------+
|299           |[[5512, 6.896354], [8136, 6.5573545], [5704, 6.4178348], [8595, 6.3042483], [6358, 6.263135]]  |A357A1TI51VT1S|
|305           |[[5704, 6.740913], [9454, 6.645791], [8350, 6.638547], [8381, 6.5724926], [9675, 6.5681686]]   |A3V7F58M4ZXHIF|
|496           |[[8969, 5.545949], [8136, 5.276003], [5704, 5.2446284], [5377, 5.232302], [6358, 5.187225]]    |AUQQVMBVBOWL1 |
|558           |[[5704, 6.4668045], [5512, 6.39896], [3828, 6.19144], [8136, 6.151276], [6358, 6.0829864]]     |A2IF5C0I5BH11F|
|596           |[[5704, 6.7759337], [5512, 6.5030046], [8136, 6.3157387], [6358, 6.3061786], [6010, 6.29

In [36]:
new_user_recs.write.parquet('U.parquet', mode='overwrite')
df_asin_asin_idx.write.parquet('P.parquet', mode='overwrite')

In [37]:
new_user_recs = spark.read.parquet('U.parquet')
new_user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- reviewerID: string (nullable = true)



In [38]:
new_user_recs.show(2)

+--------------+--------------------+--------------+
|reviewerID_idx|     recommendations|    reviewerID|
+--------------+--------------------+--------------+
|           322|[[4728, 5.848777]...|A2G44NES0MQOAC|
|           417|[[6010, 5.440516]...|A14NA0W8ESGDSI|
+--------------+--------------------+--------------+
only showing top 2 rows



In [39]:
df_asin_asin_idx = spark.read.parquet('P.parquet')
df_asin_asin_idx.printSchema()

root
 |-- asin_idx: double (nullable = true)
 |-- asin: string (nullable = true)



In [40]:
df_asin_asin_idx.show(2)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  2619.0|9625990674|
|  7663.0|B00000I1BZ|
+--------+----------+
only showing top 2 rows



In [41]:
customers = ['A29KT7UP7DLM1J',  'A1WGVOVABHFDF3', 'A3DIS5O83SQJWW']

In [42]:
for i in customers:
    find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == i)
    find_user_rec.show(truncate=False)
    result = ''
    for user in find_user_rec.collect():
        lst = []
        for row in user['recommendations']:
            print(row)
            print('\n')
            row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])
            row_f_first = row_f.first()
            lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
        dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}
        result = dic_user_rec
    print("Recommendation for: ", i)
    print(result)
    print('\n')

+--------------+----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                               |reviewerID    |
+--------------+----------------------------------------------------------------------------------------------+--------------+
|4706          |[[6358, 4.6549654], [7933, 4.6307373], [7913, 4.5054855], [5512, 4.4796753], [262, 4.4604006]]|A29KT7UP7DLM1J|
+--------------+----------------------------------------------------------------------------------------------+--------------+

Row(asin_idx=6358, rating=4.654965400695801)


Row(asin_idx=7933, rating=4.6307373046875)


Row(asin_idx=7913, rating=4.505485534667969)


Row(asin_idx=5512, rating=4.47967529296875)


Row(asin_idx=262, rating=4.460400581359863)


Recommendation for:  A29KT7UP7DLM1J
{'reviewerID': 'A29KT7UP7DLM1J', 'recommendations': [(6358, 'B005PI17AY', 4.654965400695801), (79