#0. Chargement des données

In [0]:
spark.conf.set(
  "fs.azure.account.key.storagestudent.blob.core.windows.net", 
  "pH3rgal+XcwJXc3hQEYEAE+dMBo6YzhKnb4iYQNlTZ9lXaxe8RWmZwVPMF1j2V5zwBnBZ/iNu8JoFgApOxdn4Q=="
)

In [0]:
datasets = {
  dataset: spark.read.load( 
    "wasbs://default@storagestudent.blob.core.windows.net/datasets/S8-5/Exo/restaurant-data-with-consumer-ratings/{0}.csv".format(dataset), 
    format="csv",
    header="true"
  )
  for dataset in [
    "chefmozaccepts", 
    "chefmozcuisine", 
    "chefmozhours4", 
    "chefmozparking", 
    "geoplaces2", 
    "rating_final", 
    "usercuisine", 
    "userpayment", 
    "userprofile"
  ]
} 

In [0]:
datasets

In [0]:
rating = datasets["rating_final"]

In [0]:
display(rating)

userID,placeID,rating,food_rating,service_rating
U1077,135085,2,2,2
U1077,135038,2,2,1
U1077,132825,2,2,2
U1077,135060,1,2,2
U1068,135104,1,1,2
U1068,132740,0,0,0
U1068,132663,1,1,1
U1068,132732,0,0,0
U1068,132630,1,1,1
U1067,132584,2,2,2


#1. Pre processing

In [0]:
from pyspark.ml.feature import StringIndexer

userIdIndexer = StringIndexer(inputCol="userID", outputCol="userIdIndex").fit(rating)

rating = userIdIndexer.transform(rating)

display(rating)

userID,placeID,rating,food_rating,service_rating,userIdIndex
U1077,135085,2,2,2,112.0
U1077,135038,2,2,1,112.0
U1077,132825,2,2,2,112.0
U1077,135060,1,2,2,112.0
U1068,135104,1,1,2,81.0
U1068,132740,0,0,0,81.0
U1068,132663,1,1,1,81.0
U1068,132732,0,0,0,81.0
U1068,132630,1,1,1,81.0
U1067,132584,2,2,2,99.0


In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

rating = rating.select(
  F.col("userIdIndex"),
  F.col("placeID").cast(IntegerType()),
  F.col("rating").cast(IntegerType())
)

display(rating)

userIdIndex,placeID,rating
112.0,135085,2
112.0,135038,2
112.0,132825,2
112.0,135060,1
81.0,135104,1
81.0,132740,0
81.0,132663,1
81.0,132732,0
81.0,132630,1
99.0,132584,2


#2. Entrainement

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

train, test = rating.randomSplit([0.8, 0.2])

In [0]:
als = ALS(userCol="userIdIndex", itemCol="placeID", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

paramGrid = ParamGridBuilder() \
                    .addGrid(als.rank, [1, 5, 10]) \
                    .addGrid(als.maxIter,[1,5,10]) \
                    .build()

cv = CrossValidator(
  estimator=als, 
  evaluator=evaluator, 
  estimatorParamMaps=paramGrid, 
  numFolds=5
)

In [0]:
model = cv.fit(train)

In [0]:
bestModel = model.bestModel

bestModelParameters = {
  key.name: value
  for key, value in bestModel.extractParamMap().items()
  if key.name in ["rank", "maxIter"]
}

#3. Evaluation et analyse

In [0]:
predictions_test = model.transform(test)

In [0]:
display(predictions_test)

userIdIndex,placeID,rating,prediction
122.0,135000,0,1.3171759
48.0,135027,2,0.7652261
12.0,135066,2,1.9352859
33.0,135066,1,1.3971237
78.0,132663,1,0.4258607
30.0,135108,2,1.024524
27.0,135071,2,0.79292196
86.0,135071,2,0.78549683
91.0,132723,2,1.4061359
1.0,132723,2,1.4275413


#4. Post processing

In [0]:
from pyspark.ml.feature import IndexToString

predictions_test = IndexToString(inputCol="userIdIndex", outputCol="userID").transform(predictions_test).drop("userIdIndex")

In [0]:
userpayment = datasets['userpayment']
chefmozaccepts = datasets['chefmozaccepts']

In [0]:
payment_matching = userpayment.join(chefmozaccepts, userpayment['Upayment'] == chefmozaccepts['Rpayment'], how='inner')
payment_matching = payment_matching.select("userID", "placeID").dropDuplicates()

In [0]:
display(payment_matching)

userID,placeID
U1081,135110
U1006,135107
U1042,135107
U1029,135105
U1070,135104
U1102,135103
U1034,135103
U1027,135100
U1104,135099
U1086,135094


In [0]:
predictions_test.count()

In [0]:
predictions_test_filtered = predictions_test.join(
  payment_matching, 
  ["userID", "placeID"],
  how='inner'
)

predictions_test_filtered.count()

In [0]:
display(predictions_test_filtered)

userID,placeID,rating,prediction
U1072,135000,0,1.3171759
U1132,135027,2,0.7652261
U1016,135066,2,1.9352859
U1099,135066,1,1.3971237
U1115,135071,2,0.78549683
U1048,132723,2,1.4061359
U1061,132723,2,1.4275413
U1095,132723,2,1.3221822
U1018,135062,0,0.86757195
U1002,135062,1,1.4269618
