# Recommendation model
In this task we will use ALS in spark to get the get recommendation based on item similarity.

When a user liked an item we will recommend the most similiar items.


In [0]:
import os

from pyspark.sql.types import StructType
from pyspark.sql.functions import col
from pyspark.ml.recommendation import ALS

## 1 Loading data

In [0]:
dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'
ratings_filename = dbfs_dir + '/ratings.csv' 

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)   

ratings_df = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header=True, inferSchema=False).schema(ratings_df_schema)\
    .load(ratings_filename)
    
ratings_df_sample = ratings_df.sample(fraction=0.01, seed=42)
ratings_df.show(4)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
+------+-------+------+
only showing top 4 rows



## 2 Modelling

In [0]:
als = ALS(maxIter=7, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(ratings_df_sample)
userRecs = model.recommendForAllUsers(10)
movieRecs = model.recommendForAllItems(10)

In [0]:
userRecs.show(5)
movieRecs.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    12|[{93721, 7.427467...|
|    27|[{1893, 5.3181906...|
|    31|[{483, 8.968879},...|
|    53|[{3224, 7.6661563...|
|    65|[{991, 8.375018},...|
+------+--------------------+
only showing top 5 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     12|[{22762, 8.872498...|
|     26|[{127120, 11.0851...|
|     27|[{75195, 13.78014...|
|     28|[{49392, 12.51699...|
|     31|[{4295, 7.3399525...|
+-------+--------------------+
only showing top 5 rows



## 3 Persistence

Usually we put the k/value recommendation results into redis by foreachpartition() instead of collect() to store them locally.

In [0]:
local_movie_recs = movieRecs.collect()
recommendations_dict = {}
for row in local_movie_recs:
    movie_id = row["movieId"]
    recommendations = [(rec["userId"], rec["rating"]) for rec in row["recommendations"]]
    recommendations_dict[movie_id] = str.join(',',map(lambda tup:str(tup[0]),recommendations))

## 5 Recommendtation

After a user clicked or liked an item, we will recommend the most similiar items from the dictionary.

In [0]:
target_id = 2
print('If a user liked/clicked the movie %d, we will recommend him these movies'%target_id, recommendations_dict[2])

If a user liked/clicked the movie 2, we will recommend him these movies 57475,35981,124725,133365,11308,81093,119709,90462,94819,137510
