In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import col, explode, udf, lit
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StringType

# Create Spark session
spark = SparkSession.builder \
    .appName("Recommendation") \
    .master("local") \
    .config("spark.mongodb.write.connection.uri","mongodb://localhost:27017/imdb.movies")\
    .config("spark.mongodb.read.connection.uri","mongodb://localhost:27017/imdb.movies")\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.0.5")\
    .getOrCreate()

In [None]:
SparkConf().getAll()

In [4]:
# load data
links = spark.read.csv("data/links.csv",header=True)
movies = spark.read.csv("data/movies.csv",header=True)
ratings = spark.read.csv("data/ratings.csv",header=True)
tags = spark.read.csv("data/tags.csv",header=True)
imdb = spark.read.option("multiline","true").json("data.json")

In [18]:
# insert movies_imdb to collection 
import json

def lower_case(x):
    res = []
    for x_ in x:
        res.append(x_.lower())
    return res

movies_links = movies.join(links, ['movieId'], 'left')\
    .withColumnRenamed('title', 'old_title')\
    .withColumnRenamed('genres', 'old_genres')\
    .drop('tmdbId')

movies_imdb = movies_links.join(imdb, ['imdbId'], 'left').dropna(how="any")\
    .select("imdbId","movieId","title","year","poster", "rating", "summary", "time", "genres" )
    
to_lower_case = udf(lower_case, ArrayType(StringType()))

movies_imdb_convert = movies_imdb.withColumn("imdbId",movies_imdb.imdbId.cast(IntegerType())) \
    .withColumn('movieId', movies_imdb.movieId.cast(IntegerType())) \
    .withColumn('rating', movies_imdb.rating.cast(FloatType()))\
    .withColumn("genres", to_lower_case(col("genres")))\
    .select("imdbId","movieId","title","year","poster", "rating", "summary", "time", "genres" )

# movies_imdb_convert.write.format("mongodb").mode("overwrite").save()

# movies_imdb_convert.toPandas().to_csv('movies_imdb.csv')

In [5]:
# change data type, drop duplicate, selection column
ratings = ratings.withColumn("userId",ratings.userId.cast(IntegerType())) \
    .withColumn('movieId', ratings.movieId.cast(IntegerType())) \
    .withColumn('rating', ratings.rating.cast(FloatType()))\
    .drop('timestamp').dropDuplicates()

In [6]:
# check sparsity data
def get_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    denominator = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/denominator)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    
get_sparsity(ratings)

The ratings dataframe is  98.30% sparse.


In [None]:
# Group data by userId, count ratings
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()


In [None]:
# Group data by userId, count ratings
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

In [7]:
# Import als depedencies
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [19]:
# Create test and train set
(train, test) = ratings.randomSplit([.8, .2], seed=123)

# Create ALS model
als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

type(als)

evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction")

In [57]:
# create grid params model
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5, 10, 12, 15, 20]) \
            .addGrid(als.regParam, [.1, .12, .14, .16]) \
            .addGrid(als.maxIter, [4, 6, 8]) \
            .build()

print ("Number models to be tested: ", len(param_grid))

Number models to be tested:  60


In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(
    estimator=als, 
    estimatorParamMaps=param_grid, 
    evaluator=evaluator,
    numFolds=5)

In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)
#Extract best model from the cv model above
best_model = model.bestModel

In [61]:
# Best Model
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 20
  MaxIter: 8
  RegParam: 0.14


In [62]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)
# test_predictions.show()

0.8754351890823613


In [None]:
rank=20
maxIter=8
regParam=0.14

# Create ALS model again
final_als = ALS(
        rank=rank,
        maxIter=maxIter,
        regParam=regParam,
        userCol="userId", 
        itemCol="movieId",
        ratingCol="rating", 
        nonnegative = True, 
        implicitPrefs = False,
        coldStartStrategy="drop",
        seed=432
)

final_model = final_als.fit(train)

# View the predictions
test_predictions = final_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

# test_predictions.toPandas().to_csv('predictions.csv')


In [None]:
# final_model = ALSModel.load("final_model")

In [21]:
# get recommendation usersubset
# users = ratings.select(als.getUserCol()).distinct().limit(3)
users = spark.createDataFrame([148], IntegerType()).toDF('userId')
userSubsetRecs = final_model.recommendForUserSubset(users, 2)

nrecommendations = userSubsetRecs\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

# users.show()
# for ranking, (movieId, rating) in enumerate(userSubsetRecs[0]['recommendations']):
#     title = movies.where(movies.movieId == movieId).take(1)[0]['title']
#     print(f'Recommendation {ranking+1}: {title} | predicted score: {rating}'.format())


In [None]:
# Generate n Recommendations for all users
nrecommendations = final_model.recommendForAllUsers(10)
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

In [None]:
# check recommendation  
nrecommendations.join(movies, on='movieId').filter('userId = 100').show()

In [None]:
# check movies rated 
ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1101|   100|   5.0|      Top Gun (1986)|      Action|Romance|
|   1958|   100|   5.0|Terms of Endearme...|        Comedy|Drama|
|   2423|   100|   5.0|Christmas Vacatio...|              Comedy|
|   4041|   100|   5.0|Officer and a Gen...|       Drama|Romance|
|   5620|   100|   5.0|Sweet Home Alabam...|      Comedy|Romance|
|    368|   100|   4.5|     Maverick (1994)|Adventure|Comedy|...|
|    934|   100|   4.5|Father of the Bri...|              Comedy|
|    539|   100|   4.5|Sleepless in Seat...|Comedy|Drama|Romance|
|     16|   100|   4.5|       Casino (1995)|         Crime|Drama|
|    553|   100|   4.5|    Tombstone (1993)|Action|Drama|Western|
+-------+------+------+--------------------+--------------------+



In [None]:
# save model
final_model.save("final_model")

In [None]:
load_model = ALSModel.load("final_model")

test_predictions = load_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

test_predictions.show()

0.8732556287231764
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|      6|   4.0| 4.5949717|
|     1|     47|   5.0| 4.5823603|
|     1|    163|   5.0|  4.010098|
|     1|    216|   5.0|  4.128422|
|     1|    316|   3.0|  3.763847|
|     1|    367|   4.0| 3.8843725|
|     1|    552|   4.0| 3.7210417|
|     1|    553|   5.0| 4.6246066|
|     1|    590|   4.0| 4.3124084|
|     1|    593|   4.0| 4.7583666|
|     1|    648|   3.0|  4.013749|
|     1|   1009|   3.0| 3.2530835|
|     1|   1023|   5.0|  4.094334|
|     1|   1090|   4.0|  4.602919|
|     1|   1092|   5.0| 3.6413941|
|     1|   1136|   5.0| 4.8654294|
|     1|   1196|   5.0| 4.9686337|
|     1|   1206|   5.0|  4.465945|
|     1|   1256|   5.0|  4.551629|
|     1|   1265|   4.0| 4.7060924|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
# sample new user
def new_user_recomendatios(user_id, ratings, movies, num_ratings, num_recs):
    samples = ratings.sample(False, .001, seed=100).collect()
    # get list movieId
    sample_list = [i[1] for i in samples]
    new_ratings = []
    # get nre user rating
    for i in range(len(sample_list)):
        # print movie title by movie id in sample list
        print(movies.where(movies.movieId == sample_list[i]).take(1)[0]['title'])
        rating = input('rate this movie 1-5, press n if you have not seen:\n')
        
        if rating == 'n':
            continue
        else:
            new_ratings.append((user_id, sample_list[i], float(rating)))
            num_ratings -= 1
            if num_ratings == 0 : 
                break

    # new_ratings into dataframe base on ratings column
    new_user_ratings = spark.createDataFrame(new_ratings, ratings.columns)

    combined_movie_ratings = ratings.union(new_user_ratings)

    # Create ALS model again
    als = ALS(
            rank=10,
            maxIter=50,
            regParam=0.15,
            userCol="userId", 
            itemCol="movieId",
            ratingCol="rating", 
            nonnegative = True, 
            implicitPrefs = False,
            coldStartStrategy="drop"
    )

    model = als.fit(combined_movie_ratings)

    recomendations = model.recommendForAllUsers(num_recs)

    recomendation_for_user = recomendations.where(recomendations.userId == user_id).take(1)
    # enumerate for  ranking
    for ranking, (movieId, rating) in enumerate(recomendation_for_user[0]['recommendations']):
        title = movies.where(movies.movieId == movieId).take(1)[0]['title']
        print(f'Recommendation {ranking+1}: {title} | predicted score: {rating}'.format())

new_user_recomendatios(2138, ratings=ratings, movies=movies, num_ratings=5, num_recs=10)

In [None]:
# test_ratings = ratings
# new_ratings = []
# new_ratings.append((8382,101,5.0))
# # new ratings into dataframe base on ratings column
# new_user_ratings = spark.createDataFrame(new_ratings, test_ratings.columns)

# combined_movie_ratings = test_ratings.union(new_user_ratings)


# # Create ALS model again
# als = ALS(
#         rank=10,
#         maxIter=50,
#         regParam=0.15,
#         userCol="userId", 
#         itemCol="movieId",
#         ratingCol="rating", 
#         nonnegative = True, 
#         implicitPrefs = False,
#         coldStartStrategy="drop"
# )

# model = als.fit(combined_movie_ratings)

# recomendations = model.recommendForAllUsers(10)
# # nrecomendations = final_model.recommendForAllUsers(1)
# users = spark.createDataFrame([148, 1], IntegerType()).toDF('userId')
# nrecomendations = final_model.recommendForUserSubset(users, 2)


# recs_for_user = nrecomendations.where(nrecomendations.userId == 100).take(1)
# recs_for_user[0]['recommendations']

# for ranking, (movieId, rating) in enumerate(recs_for_user[0]['recommendations']):
#     movie_string = movies.where(movies.movieId == movieId).take(1)[0]['title']
#     print('Recommendation {}: {} | predicted score: {}'.format(ranking+1, movie_string, rating))

# recs_users = {}

# for userId, recs in nrecomendations.collect(): 
#      recs_users[userId] = [recs]

# user_recs = recs_users[148]


# moveie = movies_links.join(user_recs, ['movieId'], 'left').dropna(how="any")\
#     .select("movieId","title","year","poster", "rating", "summary", "time", "genres" )

# print(recs_users)