In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import pandas as pd

In [2]:
from pyspark.ml.feature import StringIndexer
spark = SparkSession \
    .builder \
    .appName("Recommendation App") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()
default_conf = spark.sparkContext._conf.getAll()
print(default_conf)

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '2g'),
                                        ('spark.app.name', 'HW2 Recommendation'),
                                        ('spark.executor.cores', '4'),
                                        ('spark.cores.max', '10'),
                                        ('spark.driver.memory','16g'),
                                        ('spark.kryoserializer.buffer.max','1g'),
                                        ('spark.default.parallelism','300'),
                                       ('spark.sql.shuffle.partitions','300')])
v = spark.sparkContext._conf.getAll()
print(v)

[('spark.app.name', 'Recommendation App'), ('spark.driver.port', '63115'), ('spark.rdd.compress', 'True'), ('spark.app.startTime', '1653276693639'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.submit.pyFiles', ''), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', 'localhost'), ('spark.app.id', 'local-1653276695198'), ('spark.ui.showConsoleProgress', 'true'), ('spark.sql.warehouse.dir', 'file:/Users/saheedadepoju/Documents/CSE272/HW2Recommendation/spark-warehouse')]
[('spark.executor.memory', '2g'), ('spark.driver.port', '63115'), ('spark.app.name', 'HW2 Recommendation'), ('spark.executor.id', 'driver'), ('spark.driver.host', 'localhost'), ('spark.default.parallelism', '300'), ('spark.driver.memory', '16g'), ('spark.app.id', 'local-1653276695198'), ('spark.executor.cores', '4'), ('spark.cores.max', '10'), ('spark.kryoserializer.buffer.max', '1g'), ('spark.rdd.compress', 'True'), ('spark.app.startTime',

In [3]:
movies_ratings_df = spark.read.json("/Users/saheedadepoju/Documents/CSE272/HW2/Movies_and_TV.json.gz")

In [4]:
movies_ratings_df_subset = movies_ratings_df.limit(100000)



In [5]:
movies_asin_index_subset=movies_ratings_df_subset.select("asin").distinct().withColumn("asin_index", monotonically_increasing_id())

In [6]:
movies_training_data_merged_1 = movies_ratings_df_subset.join(movies_asin_index_subset.select('asin', 'asin_index'), ['asin'])



In [7]:
movies_review_subset = movies_training_data_merged_1.select("reviewerID").distinct().withColumn("reviewerID_index", monotonically_increasing_id())



In [8]:
movies_training_data_merged_2 = movies_training_data_merged_1.join(movies_review_subset.select('reviewerID', 'reviewerID_index'), ['reviewerID'])



In [9]:
(training,test)=movies_training_data_merged_2.randomSplit([0.8, 0.2])



In [10]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)

In [11]:
model=als.fit(training)



In [12]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")



In [13]:
evaluator_1=RegressionEvaluator(metricName="mae",labelCol="overall",predictionCol="prediction")



In [14]:
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)

print("RMSE="+str(rmse))

RMSE=1.9789857184519146


In [15]:
mae = evaluator_1.evaluate(predictions)
print("MAE="+str(mae))

MAE=1.6459459560664045


In [16]:
recs=model.recommendForAllUsers(10).toPandas()



In [17]:
nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['reviewerID_index'], value_name = "recommendation") \
            .drop("variable", axis = 1) \
            .dropna()

In [18]:
nrecs=nrecs.sort_values('reviewerID_index')

In [19]:
nrecs=pd.concat([nrecs['recommendation'].apply(pd.Series), nrecs['reviewerID_index']], axis = 1)



In [20]:
nrecs.columns = [

        'ProductID_index',
        'Rating',
        'UserID_index'

     ]
md=movies_training_data_merged_2.select(movies_training_data_merged_2['reviewerID'],movies_training_data_merged_2['reviewerID_index'],movies_training_data_merged_2['asin'],movies_training_data_merged_2['asin_index'])

In [21]:
md=md.toPandas()
dict1 =dict(zip(md['reviewerID_index'],md['reviewerID']))
dict2=dict(zip(md['asin_index'],md['asin']))
nrecs['reviewerID']=nrecs['UserID_index'].map(dict1)
nrecs['asin']=nrecs['ProductID_index'].map(dict2)
nrecs=nrecs.sort_values('reviewerID')
nrecs.reset_index(drop=True, inplace=True)
new=nrecs[['reviewerID','asin','Rating']]
new['recommendations'] = list(zip(new.asin, new.Rating))
res=new[['reviewerID','recommendations']]
res_new=res['recommendations'].groupby([res.reviewerID]).apply(list).reset_index()
review_df = spark.createDataFrame(res_new)
review_df.show(10)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if __name__ == '__main__':


+--------------------+--------------------+
|          reviewerID|     recommendations|
+--------------------+--------------------+
|A0025276XH785Y75YRN0|[{0767000021, 1.6...|
|A0033358Q08LR9V17C3X|[{0780625390, 6.1...|
|A0034986DWR7WEDQN0GV|[{0310894913, 10....|
| A0040714X0G8QUCER7Q|[{000503860X, 5.0...|
|A0056274FAHZQC4N2ZN8|[{0310691281, 10....|
|A0093751ZA04WDR6FGNX|[{000503860X, 11....|
|A0160612BLIWRHROHLLE|[{078062386X, 14....|
|A0297244750EW7S81VID|[{0764008722, 11....|
|A0322174KPHFYVAJWTR2|[{0784011915, 9.1...|
|A0351505SE8094H4NC6F|[{0740318764, 5.9...|
+--------------------+--------------------+
only showing top 10 rows



In [22]:
review_df.count()

72193

In [23]:
review_df.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: double (nullable = true)



In [24]:
saved_review_df = review_df.limit(20)

In [25]:
saved_review_df.write.csv("recommendation.csv")

AnalysisException: CSV data source does not support array<struct<_1:string,_2:double>> data type.