### Amazon Review Recommender System based on Ratings (Book categories)
### dataset: http://jmcauley.ucsd.edu/data/amazon/links.html
### Collabrative Filtering & SparkALS method
### Sijie Li

In [1]:
!pip install pyspark



In [2]:
#import statements
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


In [3]:
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

In [4]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


In [4]:
#names = ["userId", "itemId", "rating","timestamp"]

names = StructType([ \
    StructField("userId",StringType(),True), \
    StructField("itemId",StringType(),True), \
    StructField("rating",StringType(),True), \
    StructField("timestamp", StringType(), True)])

In [5]:
ratings = spark.read.csv("review_book_labledf.csv", inferSchema=True, header=True)

In [None]:
ratings = spark.read.csv("review_book_labledf.csv", inferSchema=True, header=True)

In [6]:
ratings.show(5)

+---+-------+------+------+
|_c0| userId|itemId|rating|
+---+-------+------+------+
|  0|6913414|     0|   4.0|
|  1|3220076|     0|   1.0|
|  2|1733392|     1|   4.0|
|  3|7818254|     2|   4.0|
|  4|6060841|     2|   5.0|
+---+-------+------+------+
only showing top 5 rows



In [7]:
ratings2 = ratings.select(['userId', 'itemId', 'rating'])

In [8]:
ratings2.show(5)

+-------+------+------+
| userId|itemId|rating|
+-------+------+------+
|6913414|     0|   4.0|
|3220076|     0|   1.0|
|1733392|     1|   4.0|
|7818254|     2|   4.0|
|6060841|     2|   5.0|
+-------+------+------+
only showing top 5 rows



In [63]:
#ratings3 = pd.read_csv("ratings_Books.csv", names = ['userId', 'itemId', 'rating','timestamp'])

In [11]:
#string_indexer = StringIndexer(inputCol="userId", outputCol="userId_index")
#string_indexer_model = string_indexer.fit(ratings2)
#data = string_indexer_model.transform(ratings2)

In [12]:
#stringIndexer = StringIndexer(inputCol="userId", outputCol="userId_index")
#stringIndexer.setHandleInvalid("error")

In [48]:
#indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['itemId', 'userId']]
#pipeline = Pipeline(stages=indexer)
#transformed = pipeline.fit(ratings2).transform(ratings2)
#transformed.select(['business_id', 'user_id','business_id_index', 'user_id_index'])

In [9]:
training, test = ratings2.randomSplit([0.8,0.2])

In [10]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.01, rank = 10, userCol="userId", itemCol="itemId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True)

In [11]:
#fit and predict
model = als.fit(training)
predictions = model.transform(test)

In [12]:
#explain parameters of the model
model.explainParams()

"blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)\ncoldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)\nitemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: itemId)\npredictionCol: prediction column name. (default: prediction)\nuserCol: column name for user ids. Ids must be within the integer value range. (default: user, current: userId)"

In [None]:
#item factors 
model.itemFactors.show(10, truncate = False)

In [19]:
predictions.show(10, truncate = False)

+-------+------+------+----------+
|userId |itemId|rating|prediction|
+-------+------+------+----------+
|8523   |65    |5.0   |1.1309323 |
|118222 |91    |1.0   |2.329087  |
|1905782|91    |5.0   |5.811846  |
|7930535|91    |5.0   |7.0274143 |
|4991337|91    |5.0   |6.8522778 |
|7643917|91    |5.0   |6.609948  |
|6698895|103   |5.0   |11.253433 |
|4738494|108   |5.0   |2.136536  |
|1922918|148   |5.0   |0.8499359 |
|2020240|192   |5.0   |3.2126093 |
+-------+------+------+----------+
only showing top 10 rows



In [22]:
#performance metrics
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.658003936956168


In [None]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

movieRecs.show(10, truncate=False)

In [14]:
# Generate top 2 items recommendations for a specified set of users
users = ratings2.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 2)

userSubsetRecs.show(2, truncate=False)



+------+-------------------------------------------+
|userId|recommendations                            |
+------+-------------------------------------------+
|1     |[{40836, 6.119079}, {406113, 6.0965824}]   |
|12    |[{973624, 16.178135}, {1123290, 15.787145}]|
+------+-------------------------------------------+



In [15]:
# Generate top 2 user recommendations for a specified set of movies
items = ratings2.select(als.getItemCol()).distinct().limit(3)
itemSubSetRecs = model.recommendForItemSubset(items, 2)

itemSubSetRecs.show(10, truncate=False)

+------+--------------------------------------------+
|itemId|recommendations                             |
+------+--------------------------------------------+
|12    |[{902608, 7.098238}, {3193333, 6.5842714}]  |
|6     |[{2956461, 36.892284}, {3061942, 35.400402}]|
+------+--------------------------------------------+

