## Demonstrate how to load a dataset suitable for recommendation systems into a PySpark Dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f 
import findspark

findspark.init()

spark = SparkSession.builder.getOrCreate()

dataset_path = "./movies 1.json"

df = spark.read.json(dataset_path).select("user_id","product_id","score","time","helpfulness")

df.show()



+--------------+----------+-----+----------+-----------+
|       user_id|product_id|score|      time|helpfulness|
+--------------+----------+-----+----------+-----------+
|A141HP4LYPWMSR|B003AI2VGA|  3.0|1182729600|        7/7|
|A328S9RN3U5M68|B003AI2VGA|  3.0|1181952000|        4/4|
|A1I7QGUDP043DG|B003AI2VGA|  5.0|1164844800|       8/10|
|A1M5405JH9THP9|B003AI2VGA|  3.0|1197158400|        1/1|
| ATXL536YX71TR|B003AI2VGA|  3.0|1188345600|        1/1|
|A3QYDL5CDNYN66|B003AI2VGA|  2.0|1229040000|        0/0|
| AQJVNDW6YZFQS|B003AI2VGA|  1.0|1164153600|       3/11|
| AD4CDZK7D31XP|B00006HAXW|  5.0|1060473600|      64/65|
|A3Q4S5DFVPB70D|B00006HAXW|  5.0|1041292800|      26/26|
|A2P7UB02HAVEPB|B00006HAXW|  5.0|1061164800|      24/24|
|A2TX99AZKDK0V7|B00006HAXW|  4.0|1039564800|      22/23|
| AFC8IKR407HSK|B00006HAXW|  5.0|1045526400|      14/14|
|A1FRPGQYQTAOR1|B00006HAXW|  5.0|1062979200|        9/9|
|A1RSDE90N6RSZF|B00006HAXW|  5.0|1042502400|        9/9|
|A1OUBOGB5970AO|B00006HAXW|  4.

## Implement a PySpark script that splits the data and trains a recommendation model

In [2]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

# Convert Alpha Numeric values to Numeric using StringIndexer

df = df.drop("user_index")
df1 = df

user_indexer = StringIndexer(inputCol = "user_id", outputCol = "user_index").fit(df)
df1 = user_indexer.transform(df)

product_indexer = StringIndexer(inputCol = "product_id",outputCol = "product_index").fit(df1)
df1 = product_indexer.transform(df1)
df1.show()

als = ALS(maxIter=10, regParam = 0.5, userCol = "user_index", itemCol = "product_index", ratingCol = "score",coldStartStrategy = "drop")
train,test = df1.randomSplit([0.8,0.2])

alsModel = als.fit(train)

+--------------+----------+-----+----------+-----------+----------+-------------+
|       user_id|product_id|score|      time|helpfulness|user_index|product_index|
+--------------+----------+-----+----------+-----------+----------+-------------+
|A141HP4LYPWMSR|B003AI2VGA|  3.0|1182729600|        7/7|      32.0|        731.0|
|A328S9RN3U5M68|B003AI2VGA|  3.0|1181952000|        4/4|       3.0|        731.0|
|A1I7QGUDP043DG|B003AI2VGA|  5.0|1164844800|       8/10|     312.0|        731.0|
|A1M5405JH9THP9|B003AI2VGA|  3.0|1197158400|        1/1|   10917.0|        731.0|
| ATXL536YX71TR|B003AI2VGA|  3.0|1188345600|        1/1|     173.0|        731.0|
|A3QYDL5CDNYN66|B003AI2VGA|  2.0|1229040000|        0/0|   28065.0|        731.0|
| AQJVNDW6YZFQS|B003AI2VGA|  1.0|1164153600|       3/11|   34353.0|        731.0|
| AD4CDZK7D31XP|B00006HAXW|  5.0|1060473600|      64/65|   31316.0|        527.0|
|A3Q4S5DFVPB70D|B00006HAXW|  5.0|1041292800|      26/26|   27884.0|        527.0|
|A2P7UB02HAVEPB|

## Implement a PySpark script using the ALS algorithm for collaborative filtering

In [3]:
predictions = alsModel.transform(test)
predictions.show()

+--------------+----------+-----+----------+-----------+----------+-------------+----------+
|       user_id|product_id|score|      time|helpfulness|user_index|product_index|prediction|
+--------------+----------+-----+----------+-----------+----------+-------------+----------+
|A13OMT8D4GPIBV|6304286961|  4.0|1189296000|        0/0|    1143.0|         85.0| 4.0525866|
| AQ01Q3070LT29|B000063W1R|  1.0|1110240000|       0/33|      53.0|         37.0| 3.6974883|
|A1TW9ZGRDQQZ2Y|B0001G6PZC|  5.0|1070150400|      12/14|     133.0|          7.0| 3.3604755|
|A3FU2A91KSXOHZ|B002OHDRF2|  5.0|1329436800|        0/0|    4531.0|         21.0|  4.301674|
|A3OIZEXS8CGBOD|B0001G6PZC|  1.0|1082851200|      23/42|    1005.0|          7.0|0.88350415|
|A1TTN5AGHRRF2M|B002OHDRF2|  4.0|1259798400|        2/3|     613.0|         21.0| 3.1643414|
|A1HO9J4DCQDGP9|B0001G6PZC|  5.0|1111795200|        2/3|     597.0|          7.0|   3.87507|
|A2EIR50X0I6HHA|B0006FFRD4|  2.0|1221782400|       3/18|    3441.0|   

In [4]:
user1 = test.filter(test['user_index'] == 1.0).select(['product_id','user_id','product_index','user_index'])
user1.show()

# Recommendations for a Single Specific User(3 Items)

recommendations = alsModel.transform(user1)
recommendations.orderBy('prediction',ascending = False).show(3)

+----------+--------------+-------------+----------+
|product_id|       user_id|product_index|user_index|
+----------+--------------+-------------+----------+
|B000063W1R|A2NJO6YE954DBH|         37.0|       1.0|
|B000UGBOT0|A2NJO6YE954DBH|         78.0|       1.0|
|B00005M1Y0|A2NJO6YE954DBH|         66.0|       1.0|
|B001QB5SCM|A2NJO6YE954DBH|         87.0|       1.0|
|B00178U3CY|A2NJO6YE954DBH|        416.0|       1.0|
|B00005Y6YB|A2NJO6YE954DBH|         32.0|       1.0|
|B00005Y6YM|A2NJO6YE954DBH|        366.0|       1.0|
|B000YIGNCW|A2NJO6YE954DBH|        226.0|       1.0|
+----------+--------------+-------------+----------+

+----------+--------------+-------------+----------+----------+
|product_id|       user_id|product_index|user_index|prediction|
+----------+--------------+-------------+----------+----------+
|B00005Y6YB|A2NJO6YE954DBH|         32.0|       1.0| 3.9981668|
|B000UGBOT0|A2NJO6YE954DBH|         78.0|       1.0| 3.8675025|
|B001QB5SCM|A2NJO6YE954DBH|         87.0|  

In [5]:
#Recommendations for All Users(3 items)

recommended_movie_df = alsModel.recommendForAllUsers(3)
recommended_movie_df.show(10,False)

+----------+-------------------------------------------------------+
|user_index|recommendations                                        |
+----------+-------------------------------------------------------+
|31        |[{1381, 4.221618}, {632, 4.157598}, {870, 3.9762528}]  |
|34        |[{632, 4.4065833}, {1381, 4.2414126}, {950, 4.0590825}]|
|53        |[{1381, 4.8835588}, {632, 4.7488575}, {293, 4.5494223}]|
|65        |[{818, 4.9362164}, {632, 4.9025645}, {698, 4.798117}]  |
|78        |[{1381, 4.15531}, {632, 4.0624266}, {950, 3.9154723}]  |
|81        |[{1381, 4.1988163}, {950, 4.1114035}, {632, 4.046092}] |
|85        |[{1381, 4.724676}, {632, 4.583629}, {950, 4.4981704}]  |
|101       |[{632, 4.248592}, {1338, 4.056736}, {1337, 4.056736}]  |
|108       |[{632, 4.559693}, {1381, 4.482481}, {950, 4.3110886}]  |
|115       |[{1381, 4.8148413}, {632, 4.8041644}, {193, 4.6484275}]|
+----------+-------------------------------------------------------+
only showing top 10 rows



## Implement code to evaluate the performace of the ALS Model using appropriate metrics

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_mse = RegressionEvaluator(metricName = 'mse', labelCol = 'score', predictionCol = 'prediction')
mse = evaluator_mse.evaluate(predictions)
print(f'Mean Squared Error(MSE) :{mse}')

evaluator_rmse = RegressionEvaluator(metricName = 'rmse', labelCol = 'score', predictionCol = 'prediction')
rmse = evaluator_rmse.evaluate(predictions)
print(f'Root Mean Squared Error(RMSE): {rmse}')

evaluator_mae = RegressionEvaluator(metricName = 'mae', labelCol = 'score', predictionCol = 'prediction')
mae = evaluator_mae.evaluate(predictions)
print(f'Mean Absolute Error(MAE): {mae}')

Mean Squared Error(MSE) :2.192201098242345
Root Mean Squared Error(RMSE): 1.4806083541039288
Mean Absolute Error(MAE): 1.1131567133495817


## Q1-Q4 on RDD

In [7]:
import json
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

fields = ['product_id',
          'user_id',
          'score',
          'time']

fields2 = ['product_id',
           'user_id',
           'review',
           'profile_name',
           'helpfulness',
           'score',
           'time']

fields3 = ['product_id',
           'user_id',
           'time']

fields4 = ['user_id',
           'score',
           'time']

def validate(line):
    for field in fields2:
        if field not in line: return False
    return True

reviews_raw = sc.textFile('/home/lplab/Desktop/210962021/Lab4/movies 1.json')

reviews = reviews_raw.map(lambda line: json.loads(line)).filter(validate)
reviews.cache()

PythonRDD[373] at RDD at PythonRDD.scala:53

In [8]:
reviews.take(1)

[{'user_id': 'A141HP4LYPWMSR',
  'product_id': 'B003AI2VGA',
  'review': 'Synopsis: On the daily trek from Juarez, Mexico to El Paso, Texas an ever increasing number of female workers are found raped and murdered in the surrounding desert. Investigative reporter Karina Danes (Minnie Driver) arrives from Los Angeles to pursue the story and angers both the local police and the factory owners who employee the undocumented aliens with her pointed questions and relentless quest for the truth.<br /><br />Her story goes nationwide when a young girl named Mariela (Ana Claudia Talancon) survives a vicious attack and walks out of the desert crediting the Blessed Virgin for her rescue. Her story is further enhanced when the "Wounds of Christ" (stigmata) appear in her palms. She also claims to have received a message of hope for the Virgin Mary and soon a fanatical movement forms around her to fight against the evil that holds such a stranglehold on the area.<br /><br />Critique: Possessing a life

In [9]:
from pyspark.mllib.recommendation import ALS
from numpy import array
import hashlib
import math

def get_hash(s):
    return int(hashlib.sha1(s).hexdigest(), 16) % (10 ** 8)

ratings = reviews.map(lambda entry: tuple([get_hash(entry['user_id'].encode('utf-8')),get_hash(entry['product_id'].encode('utf-8')),int(entry['score'])]))

train_data = ratings.filter(lambda entry: ((entry[0] + entry[1]) %10) >= 2)
test_data = ratings.filter(lambda entry: ((entry[0] + entry[1]) %10) <2)
train_data.cache()

print ("Number of train samples: ",train_data.count())
print ("Number of test samples: ",test_data.count())

Number of train samples:  39992
Number of test samples:  10008


In [10]:
from math import sqrt

rank = 20
numIterations = 20
model = ALS.train(train_data, rank, numIterations)

def convertToFloat(lines):
    returnedLine = []
    for x in lines:
        returnedLine.append(float(x))
    return returnedLine

unknown = test_data.map(lambda entry: (int(entry[0]),int(entry[1])))
predictions = model.predictAll(unknown).map(lambda r: ((int(r[0]),int(r[1])),r[2]))
true_and_predictions = test_data.map(lambda r: ((int(r[0]),int(r[1])),r[2])).join(predictions)
MSE = true_and_predictions.map(lambda r: (int(r[1][0]) - int(r[1][1])**2).reduce(lambda x,y: x+y)/true_and_predictions.count())

In [11]:
print("{:<15} {:<15} {:<15} {:<15}".format("User ID", "Product ID", "True Score", "Predicted Score"))

for row in true_and_predictions.take(10):
    user_id, product_id = row[0]
    true_score, predicted_score = row[1]
    print("{:<15} {:<15} {:<15} {:<15}".format(user_id, product_id, true_score, predicted_score))


User ID         Product ID      True Score      Predicted Score
65965270        62577830        5               2.1131747398003418
7383110         62577830        5               1.6965143450188906
22477285        58302865        4               0.5464244490395593
5479805         58302865        5               0.732133386847093
39998009        30926631        3               4.005404037024607
56482098        109252          3               1.029691343377415
89302666        43077444        3               1.499224739524898
6480656         80115934        5               5.003909118993409
19486884        36531906        4               1.047248028578161
99927280        47280600        5               1.6646441378729149


In [12]:
MSE = true_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean()

RMSE = sqrt(MSE)

MAE = true_and_predictions.map(lambda r: abs(r[1][0] - r[1][1])).mean()

print("Mean Squared Error (MSE): {}".format(MSE))
print("Root Mean Squared Error (RMSE): {}".format(RMSE))
print("Mean Absolute Error (MAE): {}".format(MAE))


Mean Squared Error (MSE): 7.062586919528138
Root Mean Squared Error (RMSE): 2.6575528065361445
Mean Absolute Error (MAE): 1.9645283756709189
