## Recommender Systems - Solutions

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('recommend').getOrCreate()

In [0]:
df = spark.read.csv('dbfs:/FileStore/movielens_ratings.csv', inferSchema=True, header=True)

In [0]:
df_pd = df.toPandas()

In [0]:
df_pd.head()

Unnamed: 0,movieId,rating,userId
0,2,3.0,0
1,3,1.0,0
2,5,2.0,0
3,9,4.0,0
4,11,1.0,0


In [0]:
df_pd.shape

(1501, 3)

In [0]:
df_pd.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
movieId,1501.0,49.40573,28.937034,0.0,24.0,50.0,74.0,99.0
rating,1501.0,1.774151,1.187276,1.0,1.0,1.0,2.0,5.0
userId,1501.0,14.383744,8.59104,0.0,7.0,14.0,22.0,29.0


In [0]:
df_pd.corr()

Unnamed: 0,movieId,rating,userId
movieId,1.0,0.036569,0.003267
rating,0.036569,1.0,0.056411
userId,0.003267,0.056411,1.0


In [0]:
import numpy as np

In [0]:
df_pd['mealskew'] = df_pd['movieId'].apply(lambda id :np.nan if id > 31 else id)

In [0]:
df_pd.head()

Unnamed: 0,movieId,rating,userId,mealskew
0,2,3.0,0,2.0
1,3,1.0,0,3.0
2,5,2.0,0,5.0
3,9,4.0,0,9.0
4,11,1.0,0,11.0


In [0]:
df_pd.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
movieId,1501.0,49.40573,28.937034,0.0,24.0,50.0,74.0,99.0
rating,1501.0,1.774151,1.187276,1.0,1.0,1.0,2.0,5.0
userId,1501.0,14.383744,8.59104,0.0,7.0,14.0,22.0,29.0
mealskew,486.0,15.502058,9.250634,0.0,7.0,15.0,23.0,31.0


In [0]:
mealmap = { 2. : "Chicken Curry",
           3. : "Spicy Chicken Nuggest",
           5. : "Hamburger",
           9. : "Taco Surprise",
           11. : "Meatloaf",
           12. : "Ceaser Salad",
           15. : "BBQ Ribs",
           17. : "Sushi Plate",
           19. : "Cheesesteak Sandwhich",
           21. : "Lasagna",
           23. : "Orange Chicken",
           26. : "Spicy Beef Plate",
           27. : "Salmon with Mashed Potatoes",
           28. : "Penne Tomatoe Pasta",
           29. : "Pork Sliders",
           30. : "Vietnamese Sandwich",
           31. : "Chicken Wrap",
           np.nan: "Cowboy Burger",
           4. : "Pretzels and Cheese Plate",
           6. : "Spicy Pork Sliders",
           13. : "Mandarin Chicken PLate",
           14. : "Kung Pao Chicken",
           16. : "Fried Rice Plate",
           8. : "Chicken Chow Mein",
           10. : "Roasted Eggplant ",
           18. : "Pepperoni Pizza",
           22. : "Pulled Pork Plate",
           0. : "Cheese Pizza",
           1. : "Burrito",
           7. : "Nachos",
           24. : "Chili",
           20. : "Southwest Salad",
           25.: "Roast Beef Sandwich"}

In [0]:
df_pd['meal_name'] = df_pd['mealskew'].map(mealmap)

In [0]:
df_pd.head()

Unnamed: 0,movieId,rating,userId,mealskew,meal_name
0,2,3.0,0,2.0,Chicken Curry
1,3,1.0,0,3.0,Spicy Chicken Nuggest
2,5,2.0,0,5.0,Hamburger
3,9,4.0,0,9.0,Taco Surprise
4,11,1.0,0,11.0,Meatloaf


In [0]:
# convert panda dataframe to pysaprk dataframe
df = spark.createDataFrame(df_pd)

In [0]:
df.show()

+-------+------+------+--------+--------------------+
|movieId|rating|userId|mealskew|           meal_name|
+-------+------+------+--------+--------------------+
|      2|   3.0|     0|     2.0|       Chicken Curry|
|      3|   1.0|     0|     3.0|Spicy Chicken Nug...|
|      5|   2.0|     0|     5.0|           Hamburger|
|      9|   4.0|     0|     9.0|       Taco Surprise|
|     11|   1.0|     0|    11.0|            Meatloaf|
|     12|   2.0|     0|    12.0|        Ceaser Salad|
|     15|   1.0|     0|    15.0|            BBQ Ribs|
|     17|   1.0|     0|    17.0|         Sushi Plate|
|     19|   1.0|     0|    19.0|Cheesesteak Sandw...|
|     21|   1.0|     0|    21.0|             Lasagna|
|     23|   1.0|     0|    23.0|      Orange Chicken|
|     26|   3.0|     0|    26.0|    Spicy Beef Plate|
|     27|   1.0|     0|    27.0|Salmon with Mashe...|
|     28|   1.0|     0|    28.0| Penne Tomatoe Pasta|
|     29|   1.0|     0|    29.0|        Pork Sliders|
|     30|   1.0|     0|    3

In [0]:
from pyspark.sql.functions import col, sum as _sum

In [0]:
# check missing values
missing_val = df.select([_sum(col(c).isNull().cast('int')).alias(c) for c in df.columns])
missing_val.show()

+-------+------+------+--------+---------+
|movieId|rating|userId|mealskew|meal_name|
+-------+------+------+--------+---------+
|      0|     0|     0|    1015|        0|
+-------+------+------+--------+---------+



In [0]:
# handling missing values fill median 
from pyspark.ml.feature import Imputer

In [0]:
imputer = Imputer(inputCol='mealskew', outputCol='mealskew_median', strategy='median')
output = imputer.fit(df).transform(df)
output.show()

+-------+------+------+--------+--------------------+---------------+
|movieId|rating|userId|mealskew|           meal_name|mealskew_median|
+-------+------+------+--------+--------------------+---------------+
|      2|   3.0|     0|     2.0|       Chicken Curry|            2.0|
|      3|   1.0|     0|     3.0|Spicy Chicken Nug...|            3.0|
|      5|   2.0|     0|     5.0|           Hamburger|            5.0|
|      9|   4.0|     0|     9.0|       Taco Surprise|            9.0|
|     11|   1.0|     0|    11.0|            Meatloaf|           11.0|
|     12|   2.0|     0|    12.0|        Ceaser Salad|           12.0|
|     15|   1.0|     0|    15.0|            BBQ Ribs|           15.0|
|     17|   1.0|     0|    17.0|         Sushi Plate|           17.0|
|     19|   1.0|     0|    19.0|Cheesesteak Sandw...|           19.0|
|     21|   1.0|     0|    21.0|             Lasagna|           21.0|
|     23|   1.0|     0|    23.0|      Orange Chicken|           23.0|
|     26|   3.0|    

In [0]:
output.columns

['movieId', 'rating', 'userId', 'mealskew', 'meal_name', 'mealskew_median']

In [0]:
final_data = output.select( ['movieId', 'rating', 'userId', 'mealskew_median'])
final_data.show()

+-------+------+------+---------------+
|movieId|rating|userId|mealskew_median|
+-------+------+------+---------------+
|      2|   3.0|     0|            2.0|
|      3|   1.0|     0|            3.0|
|      5|   2.0|     0|            5.0|
|      9|   4.0|     0|            9.0|
|     11|   1.0|     0|           11.0|
|     12|   2.0|     0|           12.0|
|     15|   1.0|     0|           15.0|
|     17|   1.0|     0|           17.0|
|     19|   1.0|     0|           19.0|
|     21|   1.0|     0|           21.0|
|     23|   1.0|     0|           23.0|
|     26|   3.0|     0|           26.0|
|     27|   1.0|     0|           27.0|
|     28|   1.0|     0|           28.0|
|     29|   1.0|     0|           29.0|
|     30|   1.0|     0|           30.0|
|     31|   1.0|     0|           31.0|
|     34|   1.0|     0|           15.0|
|     37|   1.0|     0|           15.0|
|     41|   2.0|     0|           15.0|
+-------+------+------+---------------+
only showing top 20 rows



## Format MLlib

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# split data train test 
train, test = final_data.randomSplit([0.8, 0.2])

In [0]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId',ratingCol='rating')

In [0]:
alsModel = als.fit(train)

In [0]:
predictions = alsModel.transform(test)

In [0]:
predictions.show()

+-------+------+------+---------------+----------+
|movieId|rating|userId|mealskew_median|prediction|
+-------+------+------+---------------+----------+
|      3|   1.0|     1|            3.0|0.42258722|
|      4|   2.0|     1|            4.0| 2.3814287|
|      9|   3.0|     1|            9.0| 0.8902099|
|      9|   4.0|     0|            9.0| 1.1913786|
|     12|   1.0|     1|           12.0|0.12478562|
|     13|   1.0|     1|           13.0| 1.6433841|
|     15|   2.0|     2|           15.0| 0.7476189|
|     18|   2.0|     2|           18.0|  -4.35146|
|     21|   1.0|     0|           21.0| 1.8454365|
|     22|   1.0|     2|           22.0|-1.8958981|
|     28|   1.0|     2|           28.0|  3.757442|
|     33|   1.0|     1|           15.0|-2.7339153|
|     35|   1.0|     2|           15.0| 2.3783596|
|     35|   1.0|     3|           15.0|0.44643486|
|     36|   2.0|     1|           15.0| 2.5524783|
|     37|   1.0|     1|           15.0| 1.2370214|
|     38|   1.0|     2|        

In [0]:
# evaluation
regEval = RegressionEvaluator(predictionCol='prediction', labelCol='rating', metricName='rmse')
rmse = regEval.evaluate(predictions)
print("RMSE : " + str(rmse))

RMSE : 1.9200378456484328


In [0]:
single_user = test.filter(test['userId']== 11).select('movieId','userId')

In [0]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      0|    11|
|     12|    11|
|     16|    11|
|     22|    11|
|     27|    11|
|     50|    11|
|     51|    11|
|     59|    11|
|     61|    11|
|     67|    11|
|     89|    11|
+-------+------+



In [0]:
recommendation = alsModel.transform(single_user)

In [0]:
recommendation.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     12|    11| 3.0954523|
|     51|    11| 1.6909355|
|     50|    11| 1.3408356|
|     22|    11| 1.2882675|
|     67|    11|0.98723763|
|     61|    11|0.83714885|
|     16|    11| 0.6517198|
|     27|    11|-0.6428449|
|     59|    11| -1.168501|
|      0|    11|-2.9192219|
|     89|    11| -4.906834|
+-------+------+----------+



## Good job..!!