<a href="https://colab.research.google.com/github/parksoy/toolbox2023/blob/main/Copy_of_Recommender_System_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h2><h2 align = "middle"><b>Product Recommendation System Using PySpark</b></h2>

Notebook is from https://github.com/krishangi-deka/Product-Personalization-Engine/blob/main/Recommender_System_Code.ipynb

Data is from here 
https://nijianmo.github.io/amazon/index.html


### **Initiation**

---


Installing Spark and JDK and Setting environment variables.

The following cell is only for Linux/ Tested in Google colab. 

In [1]:
%%sh
apt-get install openjdk-8-jdk-headless -qq > /dev/null
wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
tar -xvf spark-3.0.0-bin-hadoop3.2.tgz
pip install -q findspark

sh: line 1: apt-get: command not found
sh: line 2: wget: command not found
tar: Error opening archive: Failed to open 'spark-3.0.0-bin-hadoop3.2.tgz'


The following is for Mac OS.

In [5]:
#soyoungpark:23-06-0711:27@~$brew install openjdk@8  
#sudo ln -sfn /usr/local/opt/openjdk@8/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-8.jdk  

In [2]:
import os
import pandas as pd
import json
#import gzip
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [2]:
# #for MacOS local
# import os
# import pandas as pd
# import json
# #import gzip
# os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/openjdk-8.jdk" #this seems not compatible. 
# os.environ["SPARK_HOME"] ='/Applications/spark-3.0.0-bin-hadoop3.2'

Creating a local Spark Session.

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Mounting on Google Drive

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### **Importing Libraries and Data**

---


In [5]:
#from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
# from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
# from sklearn.neighbors import NearestNeighbors
# from sklearn.cluster import KMeans
# from sklearn.metrics import adjusted_rand_score
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pandas.core.reshape.concat import concat
from pyspark.ml.feature import PCA
import time
import pickle
import plotly.express as px
import pandas as pd

In [7]:
# Getting the data
#!wget https://nijianmo.github.io/amazon/index.html

In [None]:
#!gzip -d /content/drive/MyDrive/data/Sports_and_Outdoors_5.json.gz

In [11]:
data = spark.read.json("/content/drive/MyDrive/data/Sports_and_Outdoors_5.json")

In [28]:
data.head(2)

[Row(asin='0000032034', image=None, overall=5.0, reviewText='What a spectacular tutu! Very slimming.', reviewTime='06 3, 2015', reviewerID='A180LQZBUWVOLF', reviewerName='Michelle A', style=None, summary='Five Stars', unixReviewTime=1433289600, verified=True, vote=None),
 Row(asin='0000032034', image=None, overall=1.0, reviewText='What the heck? Is this a tutu for nuns? I know you can cut it but STILL. Also there aren\'t several layers of the tutu making it "poof out" It just lays flat. Needless to say it was returned.', reviewTime='04 1, 2015', reviewerID='ATMFGKU5SVEYY', reviewerName='Crystal R', style=None, summary='Is this a tutu for nuns?!', unixReviewTime=1427846400, verified=True, vote=None)]

### **Recommendation System Using Alternating Least Squares**

---


In [12]:
df = data.select('asin', 'reviewerID', 'overall').withColumnRenamed("asin","product_id").withColumnRenamed("overall","rating")
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()  # check for nulls

+----------+----------+------+
|product_id|reviewerID|rating|
+----------+----------+------+
|         0|         0|     0|
+----------+----------+------+



In [13]:
# Converting data type of rating to double type
df = df.withColumn("rating", df["rating"].cast(DoubleType()))

# Indexing reviewerID column into integer type
user_model = StringIndexer(inputCol="reviewerID", outputCol="reviewer_id_index").fit(df)
indexed = user_model.transform(df)
indexed_df = indexed.select(indexed["reviewer_id_index"].cast(IntegerType()).alias("reviewerID"), indexed["product_id"], indexed["rating"])

# Indexing product_id column into integer type
prod_model = StringIndexer(inputCol="product_id", outputCol="prod_id_index").fit(indexed_df)
p_indexed = prod_model.transform(indexed_df)
als_df = p_indexed.select(p_indexed["reviewerID"], p_indexed["prod_id_index"].cast(IntegerType()).alias("productID"), p_indexed["rating"])
als_df.show(5)

+----------+---------+------+
|reviewerID|productID|rating|
+----------+---------+------+
|     18054|     1608|   5.0|
|     70010|     1608|   1.0|
|     25984|     1608|   5.0|
|     31047|     1608|   5.0|
|     31142|     1608|   4.0|
+----------+---------+------+
only showing top 5 rows



In [14]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = als_df.randomSplit([0.8, 0.2])

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="reviewerID", itemCol="productID", ratingCol="rating", nonnegative= True, coldStartStrategy='drop')
model = als.fit(train)

# Print the Root Mean Square Error of ALS Model
print("RMSE = ", evaluator.evaluate(model.transform(test)))

# Print default model parameters
print("Rank: ", model.rank)
print("Max Iter: ", model._java_obj.parent().getMaxIter())
print("Reg Param: ", model._java_obj.parent().getRegParam())

RMSE =  1.9050221676271877
Rank:  10
Max Iter:  10
Reg Param:  0.1


In [15]:
#Tuning model with Parameter Grid Builder
param_grid = ParamGridBuilder() \
     .addGrid(als.rank, [25]) \
     .addGrid(als.regParam, [.1]) \
     .addGrid(als.maxIter, [10]) \
     .build()
# #print(param_grid)

#Build a 10 fold cross validation
crossvalidation = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds=10)   

#Fit ALS model to training data
best_model = crossvalidation.fit(train).bestModel

#Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("RMSE value after cross validation is: ", evaluator.evaluate(best_model.transform(test)))

RMSE value after cross validation is:  1.8779532702103516


In [16]:
# Generate n Recommendations for all users
prod_rec = best_model.recommendForAllUsers(10)
#prod_rec.show()

recommend = prod_rec.toPandas()
recommend.head(5)

# Append user_id and product_id into a list and create a dataframe
users = []
recommendations = []
#For all data iterations
for i in range(len(recommend)):

  users.append(recommend.iloc[i,0])         #Add user_id to list
  user_recs = "" 

  for item in recommend.iloc[i,1]:          #Fetching only the item ID's from the recommendations
    user_recs = user_recs + ", " + str(item.asDict()['productID'])
  
  recommendations.append(user_recs[2:])     #Append the itemID's to a list

#Create a dataframe with the appended data
recommendations_df = pd.DataFrame(data = zip(users, recommendations), columns=["UserID", "ProductID"])

#Displaying users and product recommendations
#(top 10 products) for the first 10 users
recommendations_df.head(10)

Unnamed: 0,UserID,ProductID
0,148,"1171, 2069, 1722, 1891, 1424, 938, 1834, 496, ..."
1,463,"1187, 912, 1052, 1362, 1053, 1984, 955, 880, 1..."
2,471,"886, 1327, 980, 1427, 1160, 1722, 1318, 1606, ..."
3,496,"973, 1099, 1657, 1019, 1264, 2160, 1454, 948, ..."
4,833,"2134, 2004, 1721, 1107, 659, 1613, 1955, 1106,..."
5,1088,"1072, 1955, 1171, 1144, 2007, 1505, 1622, 1659..."
6,1238,"761, 1657, 1354, 843, 421, 958, 1187, 1052, 10..."
7,1342,"1360, 1215, 1434, 1436, 958, 902, 761, 2074, 6..."
8,1580,"1663, 1650, 1440, 1182, 1188, 1354, 851, 1209,..."
9,1591,"1382, 1215, 2010, 761, 2119, 958, 1193, 1243, ..."


In [17]:
#write to a text file
recommendations_df.to_csv('/content/drive/MyDrive/data/ALSRecommendation_Output.csv', index=False)

### **Recommendation System Using K-Means Algorithm**

---


In [18]:
df = data.select('asin', 'reviewText').dropna(how = "any")
# Data cleaning using regular expressions
df1 = df.toPandas()
df1['reviewText'] = df1['reviewText'].str.lower()
df1['reviewText'] = df1['reviewText'].replace({"'ll": " "}, regex=True)
df1 = df1.replace({"[^A-Za-z0-9 ]+": ""}, regex=True)
df = spark.createDataFrame(df1)

  for column, series in pdf.iteritems():


In [20]:
# Creating a Pipeline as an estimator 
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

k=10

kmeans = KMeans().setK(k).setSeed(1)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, kmeans])

In [21]:
# Finding best k using Silhoutte Scores
wordsData = tokenizer.transform(df)
rem = remover.transform(wordsData)
featurizedData = hashingTF.transform(rem)
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
t_df = rescaledData.select("asin", "features")
for k in range(3,17):
  t1 = time.time()
  kmeans = KMeans().setK(k).setSeed(1)
  model = kmeans.fit(t_df)
  predictions = model.transform(t_df)
  evaluator = ClusteringEvaluator()
  silhouette = evaluator.evaluate(predictions)
  t2 = time.time()
  print("Silhouette with squared euclidean distance with {} clusters = {} | Time Taken : {}".format(k,silhouette,(t2-t1)/60))

Silhouette with squared euclidean distance with 3 clusters = 0.7556388080145828 | Time Taken : 0.8857178330421448
Silhouette with squared euclidean distance with 4 clusters = 0.6295716836487847 | Time Taken : 0.7754782319068909
Silhouette with squared euclidean distance with 5 clusters = 0.626288543924286 | Time Taken : 0.7682438651720683
Silhouette with squared euclidean distance with 6 clusters = 0.7314356088718217 | Time Taken : 0.8696662267049153
Silhouette with squared euclidean distance with 7 clusters = 0.6337863054734664 | Time Taken : 0.7291148662567138
Silhouette with squared euclidean distance with 8 clusters = 0.5810721284195208 | Time Taken : 0.7548729419708252
Silhouette with squared euclidean distance with 9 clusters = 0.6126486572280184 | Time Taken : 0.7638101855913798
Silhouette with squared euclidean distance with 10 clusters = 0.5800971239784495 | Time Taken : 0.795718002319336
Silhouette with squared euclidean distance with 11 clusters = 0.5848268739600898 | Time T

In [22]:
# Building k-means model with best k
kmeans = KMeans().setK(10).setSeed(1)
km = pipeline.fit(df)

# Transforming dataframe
preddata = km.transform(df)

In [23]:
# Plot clusters in best k using PCA
recc_pca = preddata.select('features', 'prediction')
pca = PCA(k=2,inputCol='features',outputCol='pcaFeatures')
pca_model = pca.fit(recc_pca)
result = pca_model.transform(recc_pca)
result_df = result.toPandas()
x = result_df['pcaFeatures'].apply(lambda x:x[0])
y = result_df['pcaFeatures'].apply(lambda x:x[1])
label = result_df['prediction']
data_df = pd.concat({'pca1':x,'pca2':y,'label':label},axis=1)
fig = px.scatter(data_df,x='pca1',y='pca2',color='label')
fig.show()

In [24]:
recc_df = preddata.select("asin", "prediction").toPandas()
recc_df = recc_df.groupby(['prediction'])['asin'].apply(lambda x: ','.join(x)).reset_index()
recc_df = recc_df.rename(columns={"prediction": "Cluster Number", "asin": "ProductID"})
recc_df.head()

Unnamed: 0,Cluster Number,ProductID
0,0,"0000032034,0000032034,0000032034,0000032034,08..."
1,1,B00009V2YO
2,2,"B0001WB4PM,B0001WB4PM,B0001WB4PM,B0001WB4PM,B0..."
3,3,"0000032034,0899332757,0899332757,0899332757,08..."
4,4,B000AL6HSI


In [26]:
# Recommend top 10 unique products for each searchword 
def search(a):
  lst = [(a, )]
  mod_df = spark.createDataFrame(lst, ['reviewText'])
  pred = km.transform(mod_df)
  output = pred.collect()[0]['prediction']
  fin = recc_df.loc[recc_df['Cluster Number'] == output]
  fin1 = fin['ProductID'].values[0]
  fin1 = fin1.split(',')
  return fin1

def unique(a):
  uni_list = []
  for i in a:
    if i not in uni_list:
      uni_list.append(i)
  return uni_list

unique(search("skateboard"))[:10]

['0000032034',
 '0899332757',
 '0899333257',
 '0971100764',
 '1620878747',
 '1926644425',
 '3843518912',
 '7245456321',
 '7245456275',
 '7245456313']