# Recommendation System Using pyspark and MLib

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
spark = SparkSession \
    .builder \
    .appName("Recommendation_System") \
    .getOrCreate()

In [3]:
sc = spark.sparkContext
# Default Partitions
sc.defaultParallelism
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read.load("Musical_Instruments_Processed.csv",format="csv", sep=",", inferSchema="true", header="true")

In [5]:
#import RegressionEvaluator and ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
#ALS accepts only integer value as parameters. Hence we need to convert asin and reviewerID column in index form.
#import modules
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

Using StringIndexer

In [6]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df.columns)-set(['overall'])) ]
temp_data = Pipeline(stages=indexer)
updated_data = temp_data.fit(df).transform(df)
updated_data.show()

+----------+--------------+-------+----------------+----------+
|      asin|    reviewerID|overall|reviewerID_index|asin_index|
+----------+--------------+-------+----------------+----------+
|B0002D02NA|A2E64XLZSU0P5L|      5|         32297.0|    3333.0|
|B0002D02NA|A3UD50M7M72150|      4|          2855.0|    3333.0|
|B0002D02NA|A19X5GXMJZLM8Y|      5|          3824.0|    3333.0|
|B0002D02NA|A3BSMAWPYGQJ1S|      5|          7754.0|    3333.0|
|B0002D02NA| AEJZM5IXSYJXB|      5|         60764.0|    3333.0|
|B0002D02NA|A1T051828RL75U|      5|         23166.0|    3333.0|
|B0002DVBBW| AY35MQESG99FO|      2|         69626.0|    3339.0|
|B0002DVBBW|A2T6UJ8K87W4HU|      5|         38954.0|    3339.0|
|B0002DVBBW|A3QSHEJK2T6E41|      5|          8537.0|    3339.0|
|B0002DVBBW|A3LOJ8FRN12M7T|      5|         51659.0|    3339.0|
|B0002DVBBW| A136AL1BFMQEO|      5|         11968.0|    3339.0|
|B0002DVBBW|A1LFG83Q2H1JNR|      3|         19931.0|    3339.0|
|B0006IQLF4|A1STCGKIY3CAUN|      5|     

In [7]:
#Creating training and test data
(training_data,test_data)=updated_data.randomSplit([0.8, 0.2])

In [8]:
#Creating ALS model and fitting data
als=ALS(maxIter=10,regParam=0.1,rank=50,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)

In [9]:
#fit the model on training data
model=als.fit(training_data)

In [10]:
#Generate predictions and evaluate rmse
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions=model.transform(test_data)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

RMSE=1.6612362706914139


In [11]:
predictions.show()

+----------+--------------+-------+----------------+----------+----------+
|      asin|    reviewerID|overall|reviewerID_index|asin_index|prediction|
+----------+--------------+-------+----------------+----------+----------+
|B00P0AHRQ0|A2U57RPL3TCU3W|      3|          2339.0|    1645.0| 3.1910493|
|B00JW286K8|A170P56DL715EP|      4|          1537.0|    1395.0|  4.861015|
|B000V1K7FG| A4TMBU1YC404R|      5|          2895.0|    1460.0| 4.0801983|
|B00EOYKE0O| A35K6IKCXCWJ9|      5|          7439.0|    1483.0| 2.3130622|
|B00VHKM190|A1AS8TH7HN3BLL|      2|          3875.0|    3179.0| 3.0583022|
|B005GZ86ZA|A2VZN56IW519UY|      5|          2362.0|    2996.0| 3.6469593|
|B00U3ZIAUE|A1XYSCQR433K8M|      5|          1903.0|    3761.0| 4.1952424|
|B00CDA0IUC|A180PB68JB06JS|      5|          3731.0|     580.0|  3.943441|
|B00BLQ7M4E|A1JL94VIIZRKC3|      5|          4371.0|      65.0| 3.0215068|
|B0098JO3H0|A31KBMB92U5QIK|      5|          7243.0|    3566.0| 2.7231038|
|B01H31ITWE|A3I2JLMG5R38O

getting recomendation for the users using - recommendForAllUsers and converting Index values back to categorical values as IndexToString

In [13]:
md=updated_data.select(updated_data['reviewerID'],updated_data['reviewerID_index'],updated_data['asin'],updated_data['asin_index'])
md=md.toPandas()
dict1 =dict(zip(md['reviewerID_index'],md['reviewerID']))
dict2=dict(zip(md['asin_index'],md['asin']))

In [14]:
import pandas as pd
recs=model.recommendForAllUsers(10).toPandas()
nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['reviewerID_index'], value_name = "recommendation") \
            .drop("variable", axis = 1) \
            .dropna() 
nrecs=nrecs.sort_values('reviewerID_index')
nrecs=pd.concat([nrecs['recommendation'].apply(pd.Series), nrecs['reviewerID_index']], axis = 1)
nrecs.columns = [
        
        'ProductID_index',
        'Rating',
        'UserID_index'
       
     ]


In [15]:
nrecs['reviewerID']=nrecs['UserID_index'].map(dict1)
nrecs['asin']=nrecs['ProductID_index'].map(dict2)
nrecs=nrecs.sort_values('reviewerID')
nrecs.reset_index(drop=True, inplace=True)
new=nrecs[['reviewerID','asin','Rating']]
new['recommendations'] = list(zip(new.asin, new.Rating))
res=new[['reviewerID','recommendations']]  
res_new=res['recommendations'].groupby([res.reviewerID]).apply(list).reset_index()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [16]:
print(res_new)

                 reviewerID                                    recommendations
0      A0121853CORJ9U925T6G  [(B00QD5EDB8, 4.009557247161865), (B012N5JGGS,...
1      A0183104KMY6KZLU8C70  [(B0007SL856, 5.003383159637451), (B0037NVSYY,...
2      A0245825TGZGEHLKA09N  [(B00GBG12OK, 4.60458517074585), (B00ISQCN6M, ...
3      A02607458WUIM58SBM2D  [(B0008G26JO, 4.966275691986084), (B00ISQCN6M,...
4      A03690638RLHYCUK9KX2  [(B00TSCFIDA, 5.349193572998047), (B000RKVH0K,...
...                     ...                                                ...
58218         AZZ41SOILN3WE  [(B00AZNR6W2, 3.4219517707824707), (B01FPN5JEI...
58219         AZZHB5R8E2TK9  [(B00FW7508S, 3.271819829940796), (B00L1LQ5L0,...
58220         AZZLY8O5S2E5L  [(B00OF51CYY, 0.9842791557312012), (B00FM0MSDK...
58221         AZZO203OUUIEG  [(B00OO0LD1C, 3.4551329612731934), (B00DM4LT10...
58222         AZZZWDGMM27BT  [(B00C2DC8N0, 5.303450584411621), (B012N5JGGS,...

[58223 rows x 2 columns]


In [17]:
#save pandas dataframe to csv file for reference
res_new.to_csv(r'result.csv', index = False)