In [2]:
import time

from pyspark import SparkContext, SparkConf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

In [3]:
conf = SparkConf()
conf.setMaster('local[*]')
conf.set('spark.executor.memory', '15G')
conf.set('spark.driver.memory', '15G')
conf.setAppName("hw41")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

In [6]:
# Read in the ratings csv
ratings = spark.read.option("header", "true").csv('../data/ml-20m/ratings.csv')
ratings = ratings.withColumn('userId', F.col('userId').cast(IntegerType()))
ratings = ratings.withColumn('movieId', F.col('movieId').cast(IntegerType()))
ratings = ratings.withColumn('rating', F.col('rating').cast(DoubleType()))

In [7]:
ratings = ratings.select("userId", "movieId", "rating")
ratings.limit(10).toPandas()

Unnamed: 0,userId,movieId,rating
0,1,2,3.5
1,1,29,3.5
2,1,32,3.5
3,1,47,3.5
4,1,50,3.5
5,1,112,3.5
6,1,151,4.0
7,1,223,4.0
8,1,253,4.0
9,1,260,4.0


In [8]:
rank = 4  # number of features
als_model = ALS(itemCol='movieId',
                userCol='userId',
                ratingCol='rating',
                nonnegative=True,
                rank=rank)

In [9]:
print("Total dataset: ", ratings.count())
ratings = ratings.limit(1000) # total dataset is 20000263
trainTest = ratings.randomSplit([0.8, 0.2])

trainingDF = trainTest[0]
testDF = trainTest[1]

Total dataset:  20000263


In [10]:
time_start = time.time()
# Run cross-validation, and choose the best set of parameters.
als_model = als_model.fit(trainingDF)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
test_prediction = als_model.transform(testDF)
# test_prediction.cache()
time_end = time.time()
print("ALS predictions are done!")
print("took ", time_end - time_start, " seconds for cross validation")
test_prediction.toPandas()

ALS predictions are done!
took  3.0781431198120117  seconds for cross validation


Unnamed: 0,userId,movieId,rating,prediction
0,2,3918,3.0,
1,9,4519,2.0,
2,10,1721,4.0,3.624052
3,8,296,5.0,3.727760
4,7,3062,3.0,
...,...,...,...,...
191,1,2140,4.0,3.153115
192,5,788,3.0,3.824775
193,1,7164,3.5,
194,1,1217,3.5,


In [11]:
total = test_prediction.count()
test_prediction_with_na = test_prediction
test_prediction = test_prediction.na.drop()
non_na = test_prediction.count()
print("Total predictions: ", total, ", non-Nan predictions: ", non_na)
test_prediction.toPandas()

Total predictions:  196 , non-Nan predictions:  81


Unnamed: 0,userId,movieId,rating,prediction
0,10,1721,4.0,3.624052
1,8,296,5.0,3.727760
2,3,593,5.0,2.696532
3,1,1259,4.0,4.007985
4,7,1259,3.0,3.688163
...,...,...,...,...
76,3,1097,5.0,4.273137
77,7,1097,4.0,3.753689
78,3,2118,5.0,3.518396
79,1,2140,4.0,3.153115


In [12]:
rmse_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="rating",
    metricName="rmse")
score = rmse_evaluator.evaluate(test_prediction)
print("RMSE: ", score)

RMSE:  1.2268532211989127


In [13]:
mae_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="rating",
    metricName="mae")
print("MAE: ", mae_evaluator.evaluate(test_prediction))

MAE:  0.9564313314579151


In [14]:
mse_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="rating",
    metricName="mse")
print("MSE: ", mse_evaluator.evaluate(test_prediction))

MSE:  1.505168826366148


Item-Item collaborative filtering
The idea here is to find a set of movies similar to a given movie, 
and rate the given movie based on how those similar movies have been rated by the user.

In [15]:
import math
import time
from statistics import mean

import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity

In [16]:
def get_Matrix(data):
    unique_users = data.userId.unique()
    unique_movies = data.movieId.unique()
    utility_matrix = {}
    for user in unique_users:
        # np array of len()
        col = dict.fromkeys(unique_movies, 0)
        this_user = data[data.userId == user]
        if not this_user.empty:
            for movieID in this_user.movieId:
                # instead of this put the actual ratings in
                rating = this_user[this_user.movieId == movieID].rating.iloc[0]
                if math.isnan(rating):
                    rating = 0
                col[movieID] = rating
        utility_matrix[user] = col

    return pd.DataFrame(utility_matrix)

In [50]:
train_df = get_Matrix(ratings.toPandas())
item_item_index = train_df.index
item_similarity = cosine_similarity(train_df)
item_similarity = pd.DataFrame(item_similarity)
item_similarity.index = item_item_index
item_similarity.columns = item_item_index

k = 10
test_data = testDF.toPandas()
item_item_collaborative_labels = []
for x in test_data[:].iterrows():
    userID = x[1]['userId']
    movieID = x[1]['movieId']
    # taking only those k users that have rated the movie
    this_item_distances = item_similarity[movieID]
    sorted_distances = this_item_distances.sort_values(ascending=False)[1:]
    # get the ratings by this user
    this_user = train_df[userID]

    ratings_this_user_this_movie = []
    for key in sorted_distances.keys():
        if len(ratings_this_user_this_movie) >= k:
            break
        this_user_this_movie = this_user[key]
        if this_user_this_movie > 0:
            ratings_this_user_this_movie.append(this_user_this_movie)
    item_rating = mean(ratings_this_user_this_movie)
    item_item_collaborative_labels.append(np.float16(item_rating))
test_data['prediction-item-item-cf'] = item_item_collaborative_labels
prediction_total = test_data.merge(test_prediction_with_na.toPandas(), on=['userId', 'movieId', 'rating'])
prediction_total['avg_prediction'] = prediction_total[['prediction-item-item-cf', 'prediction']].mean(axis=1)
prediction_total

Unnamed: 0,userId,movieId,rating,prediction-item-item-cf,prediction,avg_prediction
0,1,29,3.5,3.900391,,3.900391
1,1,50,3.5,3.849609,3.941394,3.895502
2,1,318,4.0,3.949219,3.749924,3.849571
3,1,919,3.5,3.849609,3.153115,3.501362
4,1,924,3.5,3.849609,3.420195,3.634902
...,...,...,...,...,...,...
191,10,1721,4.0,3.900391,3.624052,3.762221
192,10,3062,4.0,3.800781,,3.800781
193,10,3107,3.0,3.699219,,3.699219
194,11,260,5.0,4.300781,5.162322,4.731551


In [34]:
# Function to compute Item-Item-CF using Spark
def item_item_cf(x, item_similarity, train_df):
    userID = x[0]
    movieID = x[1]
    
    # taking only those k users that have rated the movie
    this_item_distances = item_similarity[movieID]
    sorted_distances = this_item_distances.sort_values(ascending=False)[1:]
    
    # get the ratings by this user
    this_user = train_df[userID]

    ratings_this_user_this_movie = []
    for key in sorted_distances.keys():
        if len(ratings_this_user_this_movie) >= k:
            break
        this_user_this_movie = this_user[key]
        if this_user_this_movie > 0:
            ratings_this_user_this_movie.append(this_user_this_movie)
    item_rating = mean(ratings_this_user_this_movie)
    
    return item_rating

In [65]:
# Spark Implementation for Item-Item CF
train_df = get_Matrix(ratings.toPandas())
item_item_index = train_df.index
item_similarity = cosine_similarity(train_df)
item_similarity = pd.DataFrame(item_similarity)
item_similarity.index = item_item_index
item_similarity.columns = item_item_index

k = 10
test_data = testDF.rdd.map(tuple)
item_item_results = test_data.map(lambda x: (x[0], x[1], x[2], float(item_item_cf(x, item_similarity, train_df))))
item_item_results.collect()

[(1, 29, 3.5, 3.9),
 (1, 50, 3.5, 3.85),
 (1, 318, 4.0, 3.95),
 (1, 919, 3.5, 3.85),
 (1, 924, 3.5, 3.85),
 (1, 1196, 4.5, 3.95),
 (1, 1217, 3.5, 3.9),
 (1, 1246, 3.5, 3.9),
 (1, 1258, 4.0, 3.9),
 (1, 1259, 4.0, 4.0),
 (1, 1266, 4.0, 3.9),
 (1, 1291, 3.5, 3.85),
 (1, 1370, 3.0, 3.9),
 (1, 1750, 3.5, 3.9),
 (1, 1994, 3.5, 3.9),
 (1, 2140, 4.0, 3.75),
 (1, 2143, 4.0, 3.9),
 (1, 2173, 4.0, 3.9),
 (1, 2683, 3.5, 3.95),
 (1, 2692, 3.5, 3.9),
 (1, 2762, 4.0, 3.85),
 (1, 2918, 3.5, 3.85),
 (1, 2959, 4.0, 3.75),
 (1, 3438, 3.5, 3.9),
 (1, 4027, 4.0, 3.9),
 (1, 4226, 3.5, 3.9),
 (1, 4446, 3.5, 3.9),
 (1, 4571, 4.0, 3.9),
 (1, 4720, 3.5, 3.9),
 (1, 5026, 4.0, 3.9),
 (1, 5540, 4.0, 3.9),
 (1, 5898, 3.5, 3.9),
 (1, 7153, 5.0, 3.9),
 (1, 7164, 3.5, 3.9),
 (1, 7389, 4.0, 3.9),
 (1, 7757, 4.0, 3.9),
 (1, 8690, 3.5, 3.9),
 (1, 31696, 4.0, 3.9),
 (2, 908, 4.0, 4.2),
 (2, 1327, 5.0, 2.9),
 (2, 1970, 2.0, 2.9),
 (2, 1971, 2.0, 2.9),
 (2, 2454, 4.0, 2.9),
 (2, 2455, 4.0, 4.3),
 (2, 3534, 3.0, 2.9),
 (2, 3

In [66]:
# Convert to DataFrame
from pyspark.sql.types import IntegerType, StructType, StructField, FloatType
schema = StructType([StructField('userId', IntegerType(), True), 
                     StructField('movieId', IntegerType(), True), 
                     StructField('rating', FloatType(), True),
                     StructField('prediction', FloatType(), True)])
item_item_results_df = spark.createDataFrame(item_item_results, schema)

In [67]:
item_item_results_df.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|     29|   3.5|       3.9|
|     1|     50|   3.5|      3.85|
|     1|    318|   4.0|      3.95|
|     1|    919|   3.5|      3.85|
|     1|    924|   3.5|      3.85|
|     1|   1196|   4.5|      3.95|
|     1|   1217|   3.5|       3.9|
|     1|   1246|   3.5|       3.9|
|     1|   1258|   4.0|       3.9|
|     1|   1259|   4.0|       4.0|
|     1|   1266|   4.0|       3.9|
|     1|   1291|   3.5|      3.85|
|     1|   1370|   3.0|       3.9|
|     1|   1750|   3.5|       3.9|
|     1|   1994|   3.5|       3.9|
|     1|   2140|   4.0|      3.75|
|     1|   2143|   4.0|       3.9|
|     1|   2173|   4.0|       3.9|
|     1|   2683|   3.5|      3.95|
|     1|   2692|   3.5|       3.9|
+------+-------+------+----------+
only showing top 20 rows

