In [33]:
# import libraries

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from cold_start import get_cold_start_rating
import pyspark
import time

In [2]:
# make a SparkSession object

# spark = (SparkSession
#          .builder
#          .appName("MoviesALS")
#          .config("spark.driver.host", "localhost")
#          .getOrCreate())

# instantiate SparkSession object
spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
# import ratings json file into spark dataframe

movie_ratings = spark.read.json('data/ratings.json')

In [4]:
# check schema
movie_ratings.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [5]:
# cast to Pandas dataframe to turn timestamp data to datetime and check nulls. 

movies_df = movie_ratings.select('*').toPandas()
movies_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 719949 entries, 0 to 719948
Data columns (total 4 columns):
movie_id     719949 non-null int64
rating       719949 non-null int64
timestamp    719949 non-null float64
user_id      719949 non-null int64
dtypes: float64(1), int64(3)
memory usage: 22.0 MB


In [6]:
# attempt to change timestamp object to years, all years are 2000

date = pd.to_datetime(movies_df['timestamp'], unit='s').dt.year
date.value_counts()

2000    719949
Name: timestamp, dtype: int64

In [7]:
# Decide to drop timestamp for now because only year 2000

movie_ratings = movie_ratings.drop('timestamp')

In [8]:
# Split data into training and test set

(training, test) = movie_ratings.randomSplit([.8, .2])

In [9]:
# Create ALS instance and fit model

als = ALS(maxIter=10,
          rank=10,
          userCol='user_id',
          itemCol='movie_id',
          ratingCol='rating',
          seed=42)

model = als.fit(training)

In [10]:
# Generate Predictions

predictions = model.transform(test)
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, user_id: bigint, prediction: float]

In [11]:
# Convert to pandas dataframe, fill prediction nulls, and convert back to spark dataframe

pred_df = predictions.select('*').toPandas()

In [12]:
pred_df.isna().sum()

movie_id       0
rating         0
user_id        0
prediction    36
dtype: int64

In [13]:
def user_average(user, df):
    """Return average score for user"""
    user_df = df[df['user_id'] == user]
    average = user_df['prediction'].mean()
    if np.isnan(average):
        return 3
    else:
        return average
    
def compute_user_average_if_null(row):
    """Check if value is null, if so, replace with user average"""
    if np.isnan(row['prediction']):
        return user_average(row['user_id'], pred_df)
    else:
        return row['prediction']
    

In [15]:
user_df = pd.read_csv('data/user_cluster.csv', index_col=0) 
u_clusters = pd.read_csv('data/u_info.csv', index_col=0)
ratings_df = pd.read_csv('data/movie_cluster_avg.csv', index_col=0)

In [16]:
for i, row in pred_df[pred_df['prediction'].isna()].iterrows():
    pred_df.loc[i, 'prediction'] = get_cold_start_rating(row['user_id']
                                                         , row['movie_id']
                                                         , user_df
                                                         , u_clusters
                                                         , ratings_df
                                                        )
    
print(pred_df['prediction'].isna().any())
    

False


In [17]:
predictions = spark.createDataFrame(pred_df)

In [18]:
# Evaluate model 
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                               predictionCol='prediction')

rmse = evaluator.evaluate(predictions)
print(rmse)

0.8772095007527893


# Create a parameter grid, cross validate for best model with different hyperperameters
params_score = {}

params = (ParamGridBuilder()
          .addGrid(als.regParam, [1, 0.01, 0.001, 0.1])
          .addGrid(als.maxIter, [5, 10, 20])
          .addGrid(als.rank, [4, 10, 50])).build()

cv = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)

best_model = cv.fit(movie_ratings)
als_model = best_model.bestModel

# save model
als_model.save('als_model')

In [19]:
# load requests json file into a spark dataframe
    
requests = spark.read.json("data/requests.json") 

# predict requests with als model
#requests_predictions = model.transform(requests)
requests_predictions = model.transform(requests).toPandas()

In [27]:
# predict null predictions with cold start model
for i, row in requests_predictions[requests_predictions['prediction'].isna()].iterrows():
    requests_predictions.loc[i, 'prediction'] = get_cold_start_rating(row['user_id'], row['movie_id']
                                                                     ,user_df
                                                                     ,u_clusters
                                                                     ,ratings_df
                                                                     )

In [28]:
    
print(requests_predictions['prediction'].isna().any())

# export request predictions dataframe as json file.
cols = ['user_id','movie_id', 'rating', 'timestamp', 'prediction']
requests_predictions = requests_predictions[cols]

# predictions = requests_predictions.to_json(r"data/predictions.json"
#                                            ,orient='records'
#                                            ,lines=True
#                                           )

requests_predictions.to_json(r"data/predictions.json"
                                           ,orient='records'
                                           ,lines=True
                                          )

True


In [29]:
!head data/predictions.json

{"user_id":53,"movie_id":148,"rating":null,"timestamp":977959026.0,"prediction":3.25}
{"user_id":4169,"movie_id":148,"rating":null,"timestamp":976559602.0,"prediction":2.9328947067}
{"user_id":5333,"movie_id":148,"rating":null,"timestamp":989024856.0,"prediction":2.5639445782}
{"user_id":4387,"movie_id":148,"rating":null,"timestamp":977005381.0,"prediction":2.2363648415}
{"user_id":3539,"movie_id":148,"rating":null,"timestamp":966907208.0,"prediction":2.4977426529}
{"user_id":840,"movie_id":148,"rating":null,"timestamp":976266538.0,"prediction":2.7661399841}
{"user_id":216,"movie_id":148,"rating":null,"timestamp":976841639.0,"prediction":3.2656331062}
{"user_id":482,"movie_id":148,"rating":null,"timestamp":976191154.0,"prediction":3.25}
{"user_id":752,"movie_id":148,"rating":null,"timestamp":1029283935.0,"prediction":3.0448253155}
{"user_id":424,"movie_id":148,"rating":null,"timestamp":1026978024.0,"prediction":3.25}


In [None]:
!head data/requests.json

In [30]:
requests_predictions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 280260 entries, 0 to 280259
Data columns (total 5 columns):
user_id       280260 non-null int64
movie_id      280260 non-null int64
rating        0 non-null float64
timestamp     280260 non-null float64
prediction    280134 non-null float32
dtypes: float32(1), float64(2), int64(2)
memory usage: 9.6 MB


In [34]:
print("only {} nulls.".format(280260-280134))

only 126 nulls.


In [48]:
requests_predictions.head()

Unnamed: 0,user_id,movie_id,rating,timestamp,prediction
0,53,148,,977959026.0,3.25
1,4169,148,,976559602.0,2.932895
2,5333,148,,989024856.0,2.563945
3,4387,148,,977005381.0,2.236365
4,3539,148,,966907208.0,2.497743


In [49]:
rp = requests_predictions.info() 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 280260 entries, 0 to 280259
Data columns (total 5 columns):
user_id       280260 non-null int64
movie_id      280260 non-null int64
rating        0 non-null float64
timestamp     280260 non-null float64
prediction    280134 non-null float32
dtypes: float32(1), float64(2), int64(2)
memory usage: 9.6 MB


In [63]:
rp = requests_predictions.copy()

In [64]:
rp.shape

(280260, 5)

In [65]:
rp.loc[rp['prediction'].isna()].shape

(126, 5)

In [55]:
movie_ratings.head()

Row(movie_id=858, rating=4, user_id=6040)

In [62]:
#make dfratings_temp = movie_ratings.select('*').toPandas()

KeyboardInterrupt: 

In [None]:
ratings.temp.agg({'rating':'mean'})

In [59]:
rp.loc[rp['prediction'].isna(), ['prediction']] = 5

In [60]:
rp.loc[rp['prediction'].isna()].shape

(0, 5)

# BACKFILLED 126 with 5!

In [61]:
rp.to_json(r"data/predictions.json"
                   ,orient='records'
                   ,lines=True
                  )

# IGNORE BELOW

In [39]:
# load requests json file into a spark dataframe
    
pred_get_back = spark.read.json("data/predictions.json") 

In [40]:
pred_get_back.show(5)

+--------+------------+------+------------+-------+
|movie_id|  prediction|rating|   timestamp|user_id|
+--------+------------+------+------------+-------+
|     148|        3.25|  null|9.77959026E8|     53|
|     148|2.9328947067|  null|9.76559602E8|   4169|
|     148|2.5639445782|  null|9.89024856E8|   5333|
|     148|2.2363648415|  null|9.77005381E8|   4387|
|     148|2.4977426529|  null|9.66907208E8|   3539|
+--------+------------+------+------------+-------+
only showing top 5 rows



In [42]:
pred_get_back.persist()

DataFrame[movie_id: bigint, prediction: double, rating: string, timestamp: double, user_id: bigint]

In [46]:
# predict requests with als model
#requests_predictions = model.transform(requests)
rp_v1 =  pred_get_back.select('*').toPandas()

KeyboardInterrupt: 