In [26]:
import pandas as pd
import numpy as np
import os

from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql

from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer

#scripts
import rank_metrics as rm
import helpers

In [27]:
spark = SparkSession(SparkContext())

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-2-15f8d37a362f>:1 

In [28]:
data_path = '../data/csv/'
ratings_df = pd.DataFrame(pd.read_csv(os.path.join(data_path, 'ratings.csv')))
movies_df = pd.DataFrame(pd.read_csv(os.path.join(data_path, 'movies.csv')))
tags_df = pd.DataFrame(pd.read_csv(os.path.join(data_path, 'tags.csv')))

In [29]:
movies_df

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
...,...,...,...
9737,193581,Black Butler: Book of the Atlantic (2017),Action|Animation|Comedy|Fantasy
9738,193583,No Game No Life: Zero (2017),Animation|Comedy|Fantasy
9739,193585,Flint (2017),Drama
9740,193587,Bungo Stray Dogs: Dead Apple (2018),Action|Animation


In [30]:
ratings_df = ratings_df.drop('timestamp', axis=1)

In [31]:
ratings_df.head(3)

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0


In [32]:
ratings = spark.createDataFrame(ratings_df)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [33]:
training.head()

Row(userId=1, movieId=1, rating=4.0)

In [34]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [35]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("RMSE = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(5)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(5)

RMSE = 1.0782645889208922


In [36]:
userRecs_df = helpers.spark_to_pandas(userRecs)


In [42]:
top_movies_and_rankings = helpers.get_top_movies_and_ratings(userRecs_df, 90, movies_df)
top_movie_ratings = list(top_movies_and_rankings.values())
top_movies = list(top_movies_and_rankings.keys())

In [43]:
import rank_metrics

rank_metrics.ndcg_at_k(top_movie_ratings, 5, 0)

1.0

In [44]:
movies_df[movies_df['movieId']==1111]['title'].values[0]

"Microcosmos (Microcosmos: Le peuple de l'herbe) (1996)"

In [45]:
movies_df[movies_df['movieId']==1111]['title'].index[0]

844

In [46]:
top_movie_ratings

[7.435305118560791,
 7.113680362701416,
 6.620959758758545,
 6.472896575927734,
 6.442361354827881]