Creating a local Spark Session.

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Mounting on Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### **Importing Libraries and Data**

---


In [None]:
#from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pandas.core.reshape.concat import concat
from pyspark.ml.feature import PCA
import time
import pickle
import plotly.express as px
import pandas as pd

In [None]:
data = spark.read.json("/Amazon/350k/*.json")

### **Recommendation System Using Alternating Least Squares**

---


In [None]:
df = data.select('asin', 'reviewerID', 'overall').withColumnRenamed("asin","product_id").withColumnRenamed("overall","rating")
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()  # check for nulls

+----------+----------+------+
|product_id|reviewerID|rating|
+----------+----------+------+
|         0|         0|     0|
+----------+----------+------+



In [None]:
# Converting data type of rating to double type
df = df.withColumn("rating", df["rating"].cast(DoubleType()))

# Indexing reviewerID column into integer type
user_model = StringIndexer(inputCol="reviewerID", outputCol="reviewer_id_index").fit(df)
indexed = user_model.transform(df)
indexed_df = indexed.select(indexed["reviewer_id_index"].cast(IntegerType()).alias("reviewerID"), indexed["product_id"], indexed["rating"])

# Indexing product_id column into integer type
prod_model = StringIndexer(inputCol="product_id", outputCol="prod_id_index").fit(indexed_df)
p_indexed = prod_model.transform(indexed_df)
als_df = p_indexed.select(p_indexed["reviewerID"], p_indexed["prod_id_index"].cast(IntegerType()).alias("productID"), p_indexed["rating"])
als_df.show(5)

+----------+---------+------+
|reviewerID|productID|rating|
+----------+---------+------+
|     65306|    39691|   5.0|
|    217184|    28657|   5.0|
|     64176|    28657|   5.0|
|     19590|    28658|   5.0|
|     11180|    28658|   5.0|
+----------+---------+------+
only showing top 5 rows



In [None]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = als_df.randomSplit([0.8, 0.2])

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="reviewerID", itemCol="productID", ratingCol="rating", nonnegative= True, coldStartStrategy='drop')
model = als.fit(train)

# Print the Root Mean Square Error of ALS Model
print("RMSE = ", evaluator.evaluate(model.transform(test)))

# Print default model parameters
print("Rank: ", model.rank)
print("Max Iter: ", model._java_obj.parent().getMaxIter())
print("Reg Param: ", model._java_obj.parent().getRegParam())

RMSE =  1.869301479197581
Rank:  10
Max Iter:  10
Reg Param:  0.1


In [None]:
#Tuning model with Parameter Grid Builder
param_grid = ParamGridBuilder() \
     .addGrid(als.rank, [25]) \
     .addGrid(als.regParam, [.1]) \
     .addGrid(als.maxIter, [10]) \
     .build()
# #print(param_grid)

#Build a 10 fold cross validation
crossvalidation = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds=10)   

#Fit ALS model to training data
best_model = crossvalidation.fit(train).bestModel

#Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("RMSE value after cross validation is: ", evaluator.evaluate(best_model.transform(test)))

RMSE value after cross validation is:  1.769650981519195


In [None]:
# Generate n Recommendations for all users
prod_rec = best_model.recommendForAllUsers(10)
#prod_rec.show()

recommend = prod_rec.toPandas()
recommend.head(5)

# Append user_id and product_id into a list and create a dataframe
users = []
recommendations = []
#For all data iterations
for i in range(len(recommend)):

  users.append(recommend.iloc[i,0])         #Add user_id to list
  user_recs = "" 

  for item in recommend.iloc[i,1]:          #Fetching only the item ID's from the recommendations
    user_recs = user_recs + ", " + str(item.asDict()['productID'])
  
  recommendations.append(user_recs[2:])     #Append the itemID's to a list

#Create a dataframe with the appended data
recommendations_df = pd.DataFrame(data = zip(users, recommendations), columns=["UserID", "ProductID"])

#Displaying users and product recommendations
#(top 10 products) for the first 10 users
recommendations_df.head(10)

Unnamed: 0,UserID,ProductID
0,148,"56694, 51047, 21976, 59743, 32237, 19953, 2449..."
1,463,"24992, 24828, 26397, 28977, 8789, 18503, 45389..."
2,471,"29700, 42556, 66960, 49079, 25174, 17528, 1073..."
3,496,"14487, 28539, 28663, 25273, 37174, 39229, 4277..."
4,833,"59792, 29301, 35926, 22937, 17691, 26233, 6807..."
5,1088,"30686, 16985, 5670, 35524, 17529, 39062, 23701..."
6,1238,"21669, 22536, 38732, 27213, 52148, 47825, 6605..."
7,1342,"71462, 62300, 32783, 37731, 14990, 62552, 3413..."
8,1580,"48698, 83204, 55618, 23596, 16985, 5670, 31111..."
9,1591,"37911, 82603, 83885, 64470, 63807, 62671, 5790..."


In [None]:
#write to a text file
recommendations_df.to_csv('ALSRecommendation_Output.csv', index=False)