In [1]:
# import libraries
#!pip install dnspython #for collab
import pymongo
import pandas as pd
from pymongo import MongoClient
from pyspark.sql.types import *

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [2]:
# Initialize a Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()

# Get configuration of SparkContext
conf = spark.sparkContext.getConf()

# Set properties to avoid OutOfMemoryException
conf.set('spark.executor.memory', '10G')
conf.set('spark.driver.memory', '10G')
conf.set('spark.driver.maxResultSize', '10G')

# Display all properties
conf.getAll()

[('spark.master', 'local[4]'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '10G'),
 ('spark.driver.maxResultSize', '10G'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.port', '58896'),
 ('spark.executor.memory', '10G'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'LAPTOP-OC9K1UNP'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1606942165480'),
 ('spark.ui.showConsoleProgress', 'true')]

In [2]:
# Accessing data from MongoDB

#accessing cluster of rating and meta data.
cluster_rating = MongoClient("mongodb+srv://Chaitra:Springjan19@cluster0.2gm3v.mongodb.net/AmazonData?retryWrites=true&w=majority")
cluster_metaData = MongoClient("mongodb+srv://dbNikita:Mummy2304@cluster0.god5y.mongodb.net/dbFinalProject?retryWrites=true&w=majority")

#accesing the database
db_rating=cluster_rating["AmazonData"] #choosedb
db_metaData=cluster_metaData["dbFinalProject"] #choosedb|

#accessing the collection
collection_rating = db_rating["Ratings"]
collection_metaData = db_metaData["amazon_meta_data	"]

#converting into dataframe
df_rating = pd.DataFrame(list(db_rating.Ratings.find({})))
df_metaData = pd.DataFrame(list(db_metaData.amazon_meta_data.find({})))

In [11]:
#df_rating.head()
#metadata_df.head(100)

Unnamed: 0,userID,productID,rating
0,ARMDSTEI0Z7YW,77614992,5
1,A3FYN0SZYWN74,615208479,5
2,A2J0WRZSAAHUAP,615269990,5
3,A38RKP6G5P8J63,615269990,5
4,ARENM677YXZKX,615269990,2


In [7]:
#Data Preprocessing

#Dropping unwanted columns and renaming the selected columns
metadata_df=df_metaData.drop(df_metaData[['_id','description','imUrl','related','salesRank','price','brand']], axis=1)
df_rating=df_rating.drop('_id', axis=1)
df_rating=df_rating.drop('timestamp', axis=1)
df_rating.columns = ["userID", "productID", "rating"]

#replace all na values with zeros
metadata_df = metadata_df.fillna(0)

# Iterate over every row and replace 0 of title with pseudonym, 
for i in range(len(metadata_df)):
  if metadata_df.iloc[i, 1] == 0:
    metadata_df.iloc[i, 1] = "item_" + metadata_df.iloc[i, 0]

In [None]:
#Cleaning of Category column

#Create a new column main category:
metadata_df['main_category']=0

for i in range(len(metadata_df['categories'])):
  if(len(metadata_df['categories'][i])) > 1:
    for j in range(len(metadata_df['categories'][i])):
      if metadata_df['categories'][i][j][0] == "Health & Personal Care":
        if(len(metadata_df['categories'][i][j])) > 1:
          metadata_df['main_category'][i]=metadata_df['categories'][i][j][1]
        else:
          metadata_df['main_category'][i]=metadata_df['categories'][i][j][0]
  else:
    if metadata_df['categories'][i][0][0] == "Health & Personal Care":
        if(len(metadata_df['categories'][i][0])) > 1:
          metadata_df['main_category'][i]=metadata_df['categories'][i][0][1]
        else:
          metadata_df['main_category'][i]=metadata_df['categories'][i][0][0]
        
        
metadata_df=metadata_df.drop('categories', axis=1)
metadata_df.columns = ["productID", "title", "category"]

In [34]:
# Randomly sample 1M rows
df_rating = df_rating.sample(n=1000000, random_state=123)

# Import library for encoding string
from sklearn import preprocessing as prep

# Encode itemID in metadata
item_encoder = prep.LabelEncoder()
item_encoder.fit(metadata_df["productID"])
encoded_items = item_encoder.transform(metadata_df["productID"])

# Encode userID in ratings file
user_encoder = prep.LabelEncoder()
user_encoder.fit(df_rating["userID"])
encoded_users = user_encoder.transform(df_rating["userID"])

In [35]:
# Store item_encoding as a dictionary
item_dict = {label:index for index, label in enumerate(item_encoder.classes_.tolist())}
len(item_dict) #263032

263032

In [36]:
# Store item_encoding as a dictionary
user_dict = {label:index for index, label in enumerate(user_encoder.classes_.tolist())}
len(user_dict) #1851132 unique users #775894 for 1M sample

775894

In [37]:
#making the item id as a dictionary
item_dict_df=pd.DataFrame.from_dict(item_dict, orient='index')
item_dict_df=item_dict_df.reset_index()
item_dict_df.columns=['productID','Dict_value']
item_dict_df

#Making the user dictionary as a dataframe
user_dict_df=pd.DataFrame.from_dict(user_dict, orient='index')
user_dict_df=user_dict_df.reset_index()
user_dict_df.columns=['userID','user_dict_value']
user_dict_df

Unnamed: 0,userID,user_dict_value
0,A000187635I595IAVSQLH,0
1,A000285218JCFNDXRN02X,1
2,A000479237YOCU69UVEX3,2
3,A000551015I52B53CYTDN,3
4,A00061202IT7XNIEW32MA,4
...,...,...
775889,AZZZMOM8BW4BO,775889
775890,AZZZPCGBZEF8T,775890
775891,AZZZRGMYLGFLM,775891
775892,AZZZV0D9D5V05,775892


In [38]:
#Step - Processing of rating table
# out join to merge item dictionary and rating
df_rating_merged=df_rating.merge(item_dict_df, on='productID', how='outer',indicator=True)
#df_rating_merged

#renaming and dropping:
df_rating_merged=df_rating_merged.drop('_merge', axis=1)
df_rating_merged = df_rating_merged.rename(mapper={'Dict_value': 'item_index'}, axis= 1)

# outer join to merge user dictionary and df_rating_merged on user id to get user indexes
df_rating_indexes=df_rating_merged.merge(user_dict_df, on='userID', how='outer',indicator=True)
#df_rating_indexes

#renaming and dropping:
df_rating_indexes=df_rating_indexes.drop('_merge', axis=1)
df_rating_indexes = df_rating_indexes.rename(mapper={'user_dict_value': 'user_index'}, axis= 1)

#dropping rows with null values - i.e products that were never bought or rated.
df_rating_indexes=df_rating_indexes.dropna(axis=0)

#dropping userid and product id since they are encoded as user index and item index.
df_rating_final=df_rating_indexes.drop('userID', axis=1)
df_rating_final=df_rating_final.drop('productID', axis=1)
df_rating_final

Unnamed: 0,userID,productID,rating,Dict_value,_merge
0,A3MDOTYNV494O,B006RB5Y9I,5.0,170824,both
1,AR6DLMKS2VHR8,B006RB5Y9I,5.0,170824,both
2,A1DRSN6VRXLDQM,B006RB5Y9I,4.0,170824,both
3,A2PU48JPA8GMW9,B006RB5Y9I,5.0,170824,both
4,ASWIZQ1P36SKI,B006RB5Y9I,5.0,170824,both
...,...,...,...,...,...
1105824,,B00LNS2HKY,,263020,right_only
1105825,,B00LOZNF7U,,263021,right_only
1105826,,B00LU4MI0A,,263026,right_only
1105827,,B00LV4480W,,263029,right_only


In [46]:
#Step - Processing of metadata table
# outer join to merge item dictionary and metadata table
metadata_merged=metadata_df.merge(item_dict_df, on='productID', how='outer',indicator=True)

#dropping and renaming columns
metadata_merged=metadata_merged.drop('_merge', axis=1)
metadata_final = metadata_merged.rename(mapper={'Dict_value': 'item_index'}, axis= 1)
metadata_final
#_______________________

In [3]:
#metadata_final= pd.read_csv(r'C:\Users\chait\Desktop\561- Big Data\Project\Data_files\metadata_df_final.csv')
#df_rating_final=pd.read_csv(r'C:\Users\chait\Desktop\561- Big Data\Project\Data_files\df_rating_final.csv')


In [4]:
# Recommendation system using PySpark

#Chaning to datatypes suitable for ALS alogorithm
df_rating_final=df_rating_final.astype(float)

#coverting pandas dataframe to spark dataframe
df_rating_spark = spark.createDataFrame(df_rating_final)
df_rating_spark.printSchema()

root
 |-- rating: double (nullable = true)
 |-- item_index: double (nullable = true)
 |-- user_index: double (nullable = true)



In [5]:
df_rating_spark.show()

+------+----------+----------+
|rating|item_index|user_index|
+------+----------+----------+
|   5.0|  170824.0|  537852.0|
|   5.0|    6907.0|  474793.0|
|   3.0|   90300.0|  361854.0|
|   4.0|   73619.0|   52509.0|
|   5.0|  242330.0|  673915.0|
|   4.0|  107638.0|  546963.0|
|   5.0|   10591.0|  185958.0|
|   3.0|   23105.0|   27243.0|
|   5.0|  189380.0|  476634.0|
|   5.0|  187547.0|  560703.0|
|   4.0|   13265.0|  420195.0|
|   5.0|  227637.0|  336167.0|
|   5.0|  225506.0|  369455.0|
|   3.0|   76900.0|  666806.0|
|   4.0|  249754.0|  419964.0|
|   5.0|   11793.0|  441780.0|
|   1.0|   53940.0|  167519.0|
|   5.0|  121808.0|  656975.0|
|   5.0|  109972.0|  658145.0|
|   5.0|  161840.0|  129029.0|
+------+----------+----------+
only showing top 20 rows



In [6]:
#Chaning to datatypes suitable for ALS alogorithm
metadata_final['item_index']=metadata_final['item_index'].astype(float)

#Converting to required structure type
sch = StructType([StructField("productID",StringType(),True),
                  StructField("title",StringType(),True),
                  StructField("category",StringType(),True),
                 StructField("item_index",DoubleType(),True)])

#coverting pandas dataframe to spark dataframe for metadata
metadata_df_spark = spark.createDataFrame(metadata_final, sch)                                       
metadata_df_spark.printSchema()

root
 |-- productID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item_index: double (nullable = true)



In [7]:
metadata_df_spark.show()

+----------+--------------------+--------------------+----------+
| productID|               title|            category|item_index|
+----------+--------------------+--------------------+----------+
|0077614992|Principles of Mgm...|Health & Personal...|       0.0|
|0615208479|Brain Fitness Exe...|       Personal Care|       1.0|
|0615269990|       Occam's Razor|       Personal Care|       2.0|
|0615315860|101 BlenderBottle...|Health & Personal...|       3.0|
|0615406394|Aphrodite Reborn ...|Health & Personal...|       4.0|
|0615836828|Breast Cancer: My...|Health & Personal...|       5.0|
|0641710577|Foster Grant Wome...|       Personal Care|       6.0|
|0641864507|LED Travel Magnifier|Medical Supplies ...|       7.0|
|0681504498|Itty Bitty Paperb...|Medical Supplies ...|       8.0|
|0705394638|C.r. Gibson Baby ...|Stationery & Part...|       9.0|
|0736789928|Zaner-Bloser Clas...|Stationery & Part...|      10.0|
|076493211X|Wayne Thiebaud Co...|Stationery & Part...|      11.0|
|076719676

In [47]:
#Dividing the dataset into train-test samples
(training, test) = df_rating_spark.randomSplit([0.8, 0.2])

# Define an evaluator which calculates RMSE
evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating",predictionCol = "prediction")

# Define a model with basic definitions
recsys = ALS(userCol = "user_index", itemCol = "item_index",
             ratingCol = "rating", nonnegative = True, coldStartStrategy="drop")

# Define a parameter grid consisting of values to work with
paramGrid = ParamGridBuilder() \
    .addGrid(recsys.regParam, [0.1]) \
    .addGrid(recsys.rank, [25]) \
    .build()

# Define a CrossValidator object which performs 2-fold cross validation 
cvs = CrossValidator(estimator = recsys,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=2)

In [48]:
#Build model using train data
model = cvs.fit(training)

#Show best model paramters (only for grid search)
recsys=model.bestModel

#Fit model to test data
predictions = model.transform(test)

#RMSE to evaluate model
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error =" + str(rmse))

In [None]:
# Get top 10 recommendations for all users #775894 users
userRecs = recsys.recommendForAllUsers(10)
userRecs.printSchema()
userRecs.show()

##Displaying in Pandas
userRecsPand = userRecs.toPandas()

root
 |-- user_index: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_index: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [92]:
#To display the 10 recommednations in 10 different columns:
recmd=userRecsPand

#Splitting to 10 columns
recmd['x']=recmd['recommendations'].apply(lambda x: x.split('Row('))
recmd['temp']=recmd['x'].apply(lambda x: x.pop(0))

recmd['Recommendation_1']=recmd['x'].apply(lambda x: x[0][:-3])
recmd['Recommendation_2']=recmd['x'].apply(lambda x: x[1][:-3])
recmd['Recommendation_3']=recmd['x'].apply(lambda x: x[2][:-3])
recmd['Recommendation_4']=recmd['x'].apply(lambda x: x[3][:-3])
recmd['Recommendation_5']=recmd['x'].apply(lambda x: x[4][:-3])
recmd['Recommendation_6']=recmd['x'].apply(lambda x: x[5][:-3])
recmd['Recommendation_7']=recmd['x'].apply(lambda x: x[6][:-3])
recmd['Recommendation_8']=recmd['x'].apply(lambda x: x[7][:-3])
recmd['Recommendation_9']=recmd['x'].apply(lambda x: x[8][:-3])
recmd['Recommendation_10']=recmd['x'].apply(lambda x: x[9][:-2])

#Dropping temporary columns
recmd= recmd.drop(recmd[['recommendations','x','temp']],axis=1)

In [93]:
recmd.head(10)

Unnamed: 0,user_index,Recommendation_1,Recommendation_2,Recommendation_3,Recommendation_4,Recommendation_5,Recommendation_6,Recommendation_7,Recommendation_8,Recommendation_9,Recommendation_10
0,148,"item_index=258823, rating=1.9737238883972168","item_index=185376, rating=1.9575668573379517","item_index=57139, rating=1.9392974376678467","item_index=183884, rating=1.9349370002746582","item_index=130798, rating=1.9338281154632568","item_index=105804, rating=1.9321393966674805","item_index=103072, rating=1.9262118339538574","item_index=2015, rating=1.9223661422729492","item_index=186827, rating=1.91844642162323","item_index=153268, rating=1.91844642162323"
1,463,"item_index=6945, rating=4.920039653778076","item_index=186827, rating=4.860890865325928","item_index=153268, rating=4.860890865325928","item_index=185376, rating=4.859927654266357","item_index=214112, rating=4.809175968170166","item_index=183884, rating=4.8039021492004395","item_index=105520, rating=4.772045612335205","item_index=203250, rating=4.748579502105713","item_index=90507, rating=4.734275817871094","item_index=96111, rating=4.705621719360352"
2,471,"item_index=228192, rating=3.6455678939819336","item_index=117686, rating=3.5809648036956787","item_index=28488, rating=3.566378116607666","item_index=188371, rating=3.566378116607666","item_index=143813, rating=3.566378116607666","item_index=122234, rating=3.5656230449676514","item_index=157201, rating=3.505542755126953","item_index=89612, rating=3.505265474319458","item_index=131676, rating=3.488877058029175","item_index=192446, rating=3.477353096008301"
3,496,"item_index=162252, rating=4.776736259460449","item_index=63664, rating=4.754911422729492","item_index=231914, rating=4.584717273712158","item_index=228092, rating=4.5077972412109375","item_index=176940, rating=4.507537841796875","item_index=108633, rating=4.502350807189941","item_index=3557, rating=4.498564720153809","item_index=136607, rating=4.475340843200684","item_index=238749, rating=4.445220947265625","item_index=73697, rating=4.442404270172119"
4,833,"item_index=183884, rating=4.809751510620117","item_index=153268, rating=4.806126117706299","item_index=186827, rating=4.806126117706299","item_index=185376, rating=4.805229663848877","item_index=2015, rating=4.748161792755127","item_index=6945, rating=4.734163761138916","item_index=57139, rating=4.731656551361084","item_index=11203, rating=4.676901340484619","item_index=214112, rating=4.662258625030518","item_index=105804, rating=4.653405666351318"
5,1088,"item_index=153268, rating=3.9552128314971924","item_index=186827, rating=3.9552128314971924","item_index=185376, rating=3.9120266437530518","item_index=183884, rating=3.906235933303833","item_index=57139, rating=3.86938214302063","item_index=105804, rating=3.8395791053771973","item_index=6945, rating=3.8321568965911865","item_index=2015, rating=3.8252973556518555","item_index=130798, rating=3.8140947818756104","item_index=224052, rating=3.8104071617126465"
6,1238,"item_index=185376, rating=4.900214195251465","item_index=183884, rating=4.837372303009033","item_index=74691, rating=4.786144256591797","item_index=2015, rating=4.781135559082031","item_index=6945, rating=4.776639938354492","item_index=186827, rating=4.762091159820557","item_index=153268, rating=4.762091159820557","item_index=90507, rating=4.745449066162109","item_index=57139, rating=4.740128517150879","item_index=11203, rating=4.72135591506958"
7,1342,"item_index=74691, rating=4.704067707061768","item_index=211742, rating=4.57657527923584","item_index=183884, rating=4.526294708251953","item_index=57139, rating=4.5055131912231445","item_index=185376, rating=4.502546787261963","item_index=2015, rating=4.4889068603515625","item_index=153268, rating=4.47322940826416","item_index=186827, rating=4.47322940826416","item_index=103072, rating=4.4717488288879395","item_index=214112, rating=4.454628944396973"
8,1580,"item_index=2015, rating=3.9315924644470215","item_index=185376, rating=3.911513566970825","item_index=183884, rating=3.893906831741333","item_index=186827, rating=3.8543412685394287","item_index=153268, rating=3.8543412685394287","item_index=6945, rating=3.8270113468170166","item_index=57139, rating=3.8155641555786133","item_index=214112, rating=3.804212808609009","item_index=39687, rating=3.8001458644866943","item_index=73014, rating=3.800144672393799"
9,1591,"item_index=258823, rating=4.574864387512207","item_index=178505, rating=4.567498207092285","item_index=98876, rating=4.500478267669678","item_index=131247, rating=4.457087516784668","item_index=70247, rating=4.396417140960693","item_index=24316, rating=4.391935348510742","item_index=184867, rating=4.360193729400635","item_index=111367, rating=4.354029655456543","item_index=217597, rating=4.351689338684082","item_index=230109, rating=4.3483805656433105"


In [None]:
#Cleaning of Recommendation 1column
recmd1=recmd['Recommendation_1'].apply(lambda x: int(x.split(',')[0].split('=')[1]))
recmd1=pd.DataFrame(recmd1) 
recmd1.columns=['item_index']
recmd1=recmd1.merge(metadata_df, on='item_index', how='inner',indicator=True)
recmd1= recmd1.drop('_merge', axis=1)

#productIDs Identified using powerBI
#most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00KP1CLL0'].head(1))

#2nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00K44T2BK'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00JZCPNHO'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00KCVG2KY'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00JJRMNFA'].head(1))

In [None]:
#Recommendation 2
recmd2=recmd['Recommendation_2'].apply(lambda x: int(x.split(',')[0].split('=')[1]))
recmd2=pd.DataFrame(recmd2) 
recmd2.columns=['item_index']
recmd2=recmd2.merge(metadata_df, on='item_index', how='inner',indicator=True)
recmd2= recmd2.drop('_merge', axis=1)

#recmd2.to_csv(r'C:\Users\chait\Desktop\561- Big Data\Project\Data_files\Recommendation_2.csv', index=False)

#most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00KP1CLL0'].head(1))

#2nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00K44T2BK'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00KCVG2KY'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00K22ZDGM'].head(1))

#3nd most popular product from recommendation1
print(recmd1[recmd1['productID']=='B00KB1VMLE'].head(1))