# This tutorial for recommender engine usingCollaborative Filtering

it uses python 3.5.5 

In [1]:
! python --version

Python 3.5.6 :: Anaconda, Inc.


# importing needed packages 

In [166]:
import os
import pyspark
import urllib.request
import zipfile
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
# from pyspark.mllib.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pandas import Series, DataFrame

import pandas as pd

import math
from time import time
from pyspark.mllib.recommendation import MatrixFactorizationModel

# Dataset info
    .Small: 100,000 ratings and 2,488 tag applications applied to 8,570 movies by 706 users. Last updated 4/2015.
    .Full: 21,000,000 ratings and 470,000 tag applications applied to 27,000 movies by 230,000 users. Last updated 4/2015.

In [3]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

# Defining dataset location

In [4]:
if not os.path.exists('datasets'):
    os.makedirs('datasets')
cwd = os.getcwd()
datasets_path = os.path.join(cwd, 'datasets')
complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

# Download dataset
    . small dataset size = 955 kb 
    . complete dataset size = 264 mb
   in this tutorial we will use the small dataset

In [5]:
if not os.path.isfile(small_dataset_path):
    small_f = urllib.request.urlretrieve(small_dataset_url,small_dataset_path)
# complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

# Unzip datasets

In [6]:

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

# with zipfile.ZipFile(complete_dataset_path, "r") as z:
#     z.extractall(datasets_path)

# Intializing spark context

In [7]:
spark = SparkSession \
   .builder \
    .master("local[*]") \
   .appName("Recommender-system") \
   .getOrCreate()

sc =  spark.sparkContext

# Load data set 
this tables we will use from dataset

Each line in the ratings dataset (ratings.csv) is formatted as: userId,movieId,rating,timestamp

Each line in the movies (movies.csv) dataset is formatted as: movieId,title,genres

In [8]:
# find ratings.csv path 
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
# load ratings.csv file to dataframe 
rating_df = spark.read.format("csv").option("header", "true").load(small_ratings_file).drop('timestamp')

In [None]:
# Ensuring that rating dataframe is loaded correctly 

In [None]:
rating_df.show()

In [None]:
# Casting rating dataframe coloumns to int

In [9]:
# casting userId to int
rating_df = rating_df.withColumn('userId', rating_df['userId'].cast(IntegerType()))
# casting movieId to int 
rating_df = rating_df.withColumn('movieId', rating_df['movieId'].cast(IntegerType()))
#casting rating to float 
rating_df = rating_df.withColumn('rating', rating_df['rating'].cast(FloatType()))


In [None]:
# print schema of rating dataframe

In [None]:
rating_df.printSchema()

# parsing and removing irrelevant data from rating.csv 

In [None]:
# # add file to rdd and removing header from it and removing timestamp from each row 
# small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
#     .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

# Ensuring that data loaded correctly 
    RDD format -> ('user_id','movie_id','actual rating')

In [None]:
# small_ratings_data.take(3)

# load movies.csv file

In [10]:
# finding movies.csv file 
movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
# load movies.csv file to dataframe
# rating_df = spark.read.format("csv").option("header", "true").load(small_ratings_file).drop('timestamp')
movies_df = spark.read.format("csv").option("header", "true").load(movies_file).drop('genres')
# small_movies_raw_data = sc.textFile(small_movies_file)
# find rating .csv header 
# small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

In [None]:
# ensuring that movies dataframe is loaded correctly

In [11]:
movies_df.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      8| Tom and Huck (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
+-------+--------------------+
only showing top 20 rows



In [None]:
#casting movies id to int 

In [12]:
movies_df = movies_df.withColumn('movieId' ,movies_df['movieId'].cast(IntegerType()))

In [None]:
# print movies_df schema 

In [13]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)



# parsing and removing irrelevant data from movies.csv to RDD

In [None]:
# # add file to rdd and removing header and genere from it 
# small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
#     .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

# Ensuring that movies.cs file is loaded correctly 
    RDD formate -> ('movie_id','movie_name(production_year)')

In [None]:
# small_movies_data.take(3)

# Parsing and spliting  dataset  to rdds 
    split dataset into
        . Traning dataset = 60%
        . Validation dataset = 20%
        . Test Dataset = 20%
note that we removed actual ratings from testing and validation dataset 

In [14]:
# # split data randomly to training, validation and test data sets with ratio 60-20-20
# training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
training_df, validation_df, test_df = rating_df.randomSplit([0.6, 0.2, 0.2])
# # remove actual user rating from validation dataset
## validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
# validation_for_predict_df = validation_df.drop("rating")
# # remove actual user rating from testing dataset
# test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
# test_for_predict_df = test_df.drop("rating")


# Configuring and tunning our model

In [None]:
rating_df.printSchema()

In [15]:

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")

param_grid = ParamGridBuilder().addGrid(
    als.rank,
    [10, 15],
).addGrid(
    als.maxIter,
    [10, 15],
).addGrid(
    als.regParam,
    [0.1,0.01,0.2],
).build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
)
tvs = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
)


model = tvs.fit(training_df)

bestmodel=model.bestModel

In [16]:
(bestmodel
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getRegParam()) 
# print ('Best Param {}:'.format(bestmodel._java_obj.parent().getRegParam()))
# print ('Best Param {MaxIter}:'.format( bestmodel._java_obj.parent().getMaxIter()))

0.2

In [17]:
best_rank=bestmodel.rank
best_regParm=bestmodel._java_obj.parent().getRegParam()
best_iterations=bestmodel._java_obj.parent().getMaxIter()

In [18]:
print(best_rank)

10


In [19]:
# seed is used for reproducability 
# seed = 5
iterations = 10
# is used to avoid over and under fitting 
regularization_parameter = 0.1
tolerance = 0.02
ranks = [4, 8, 10]
min_error = float('inf')
best_rank = -1
best_model = None

for rank in ranks:
    als = ALS(maxIter=iterations, regParam=regularization_parameter,rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
    model = als.fit(training_df)
    predictions = model.transform(validation_df)

    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    error = evaluator.evaluate(predictions)

    print ('For rank {} the RMSE is {}'.format(rank, error))

    if error < min_error:
            min_error = error
            best_rank = rank
            best_model = model

    print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.9033439282282294
The best model was trained with rank 4
For rank 8 the RMSE is 0.911418196740271
The best model was trained with rank 4
For rank 10 the RMSE is 0.9145175660427705
The best model was trained with rank 4


In [20]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   474|    471|   3.0|  2.508234|
|   462|    471|   2.5|  2.392996|
|   555|    471|   3.0| 4.4965034|
|   216|    471|   3.0| 3.2247887|
|   287|    471|   4.5| 2.6791642|
|   541|    471|   3.0|  3.425363|
|   357|    471|   3.5|  3.823547|
|   606|   1088|   3.0| 3.1558466|
|   554|   1088|   5.0| 3.5911787|
|   200|   1088|   4.0| 3.6109958|
|   600|   1088|   3.5|  2.649283|
|   517|   1088|   1.0|  3.050231|
|   587|   1238|   4.0| 3.9528525|
|   268|   1238|   5.0|  3.875886|
|   312|   1342|   4.0|  2.720895|
|   137|   1580|   3.5| 3.0223484|
|   580|   1580|   4.0|  3.486576|
|   593|   1580|   1.5| 2.4402766|
|   606|   1580|   2.5|  3.051666|
|    91|   1580|   3.5|  3.549459|
+------+-------+------+----------+
only showing top 20 rows



# Saving the BEST model for future use 

In [None]:
# model_path = os.path.join('/home/tabdalla/Development/code/col-filtering_recommender-system', 'models', 'movie_lens_als_2')
# best_model.save(sc, model_path)

In [None]:
# model_path = os.path.join('/home/tabdalla/Development/code/col-filtering_recommender-system', 'models', 'movie_lens_als_2')
# loaded_model = MatrixFactorizationModel.load(sc, model_path)
# predictions = loaded_model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
# predictions.take(5)

# See predected results
    RDD format ((user_id,movie_is),predicted_rating)

In [21]:
predictions.take(5)

[Row(userId=474, movieId=471, rating=3.0, prediction=2.5082340240478516),
 Row(userId=462, movieId=471, rating=2.5, prediction=2.392996072769165),
 Row(userId=555, movieId=471, rating=3.0, prediction=4.4965033531188965),
 Row(userId=216, movieId=471, rating=3.0, prediction=3.2247886657714844),
 Row(userId=287, movieId=471, rating=4.5, prediction=2.679164171218872)]

# Compare between predicted result and actual result in dataset
    RDD format ((user_id,movie_id),(actual_rating,predected_rating))

In [None]:
predictions.take(5)

# Test model using test dataset

In [22]:
als = ALS(maxIter=iterations, regParam=regularization_parameter,rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
model = als.fit(training_df)
predictions = model.transform(test_df)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
error = evaluator.evaluate(predictions)




# model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
#                       lambda_=regularization_parameter)
# predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
# rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
# error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9174855406146327


# Building the  model using the complete dataset instead of small data set
  in order to get better results we shall use the complete dataset but here we used the small one due to memory limitation 
  all we need to change 'ml-latest-small' -> 'ml-latest'

In [23]:
# Load the complete dataset file
## TAKE CARE HERE WE CHANGED USAGE OF COMPLETE PATH TO USE SMALL DUE TO MEMORY LIMITATION 
complete_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
# complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_df = spark.read.format("csv").option("header", "true").load(complete_ratings_file).drop('timestamp')
# complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]


In [24]:
complete_ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)



# Parse and modify ratings.csv file and parse it to rdd 
modification here means convert string results to int as we did before for accurate computation 

In [25]:

# complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
#     .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
# casting userId to int
complete_ratings_df = complete_ratings_df.withColumn('userId', complete_ratings_df['userId'].cast(IntegerType()))
# casting movieId to int 
complete_ratings_df = complete_ratings_df.withColumn('movieId', complete_ratings_df['movieId'].cast(IntegerType()))
#casting rating to float 
complete_ratings_df = complete_ratings_df.withColumn('rating', complete_ratings_df['rating'].cast(FloatType()))
    
print ("There are {} ratings in the small dataset".format((complete_ratings_df.count())))

There are 100836 ratings in the small dataset


# Train model using complete dataset with chosen parameters
here we did't need validation dataset we need only test to find the error of our model so we splited our dataset to 
    Training -> 70%
    test -> 30% 

In [26]:
# split dataset to training and testing data sets with ration 70% - 30%  
# training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)
training_df, test_df = complete_ratings_df.randomSplit([0.7,0.3])

# complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
#                            iterations=iterations, lambda_=regularization_parameter)

als = ALS(maxIter=iterations, regParam=regularization_parameter,rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
model = als.fit(training_df)
#     predictions = model.transform(validation_df)

# Test using testing dataset to calculate error

In [27]:
# test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

# predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
# rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
# error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
# print ('For testing data the RMSE is {}'.format(error))

# als = ALS(maxIter=iterations, regParam=regularization_parameter,rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
# model = als.fit(training_df)
predictions = model.transform(test_df)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
error = evaluator.evaluate(predictions)

print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8977186728990895


In [None]:
################################################################################################
#   HERE ALSO I CHANGED THE USAGE OF COMPLETE DATASET TO SMALL ONE DUE TO MEMORY LIMITATOIN    #
################################################################################################

In [28]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
# complete_movies_raw_data = sc.textFile(complete_movies_file)
# complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
complete_movies_df = spark.read.format("csv").option("header", "true").load(complete_movies_file).drop('genres')

# parsing movies.csv file 

In [29]:
complete_movies_df = complete_movies_df.withColumn('movieId' ,complete_movies_df['movieId'].cast(IntegerType()))
print ("There are {}movies in the complete dataset ".format(complete_movies_df.count()))


There are 9742movies in the complete dataset 


# Calculate average rating for each movie

In [139]:
# def get_counts_and_averages(ID_and_ratings_tuple):
#     nratings = len(ID_and_ratings_tuple[1])
#     return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

# complete_ratings_data = complete_ratings_df.rdd.map(list)
# # complete_ratings_df.show()

movie_ID_with_avg_ratings_df=complete_ratings_df.groupby('movieId').agg({'rating':'avg'}).show()
movies_rating_counts_df=complete_ratings_df.groupby("movieId").count()


# movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
# movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
# movie_ID_with_avg_ratings_RDD.collect()
# movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
# movies_rating_counts_df = spark.createDataFrame(movie_rating_counts_RDD)
# movies_rating_counts_df.show()

+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|   1580| 3.487878787878788|
|   2366|              3.64|
|   3175|              3.58|
|   1088| 3.369047619047619|
|  32460|              4.25|
|  44022| 3.217391304347826|
|  96488|              4.25|
|   1238| 4.055555555555555|
|   1342|               2.5|
|   1591|2.6346153846153846|
|   1645| 3.411764705882353|
|   4519|3.3333333333333335|
|   2142|               2.7|
|    471|              3.55|
|   3997|1.8333333333333333|
|    833|               2.0|
|   3918|3.2777777777777777|
|   7982|              3.25|
|   1959|3.6666666666666665|
|  68135|              3.55|
+-------+------------------+
only showing top 20 rows



In [141]:
movies_rating_counts_df

DataFrame[movieId: int, count: bigint]

In [None]:
movie_ID_with_avg_ratings_RDD.take(2)

In [None]:
movies_rating_counts_df.show(2)

In [None]:
# Add new user to use it in our model 

In [31]:
new_user_ID = 1

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,2), # Casino (1995)
     (0,25,3), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,2), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,3) # Usual Suspects, The (1995)
    ]

new_user_ratings_df = spark.createDataFrame(new_user_ratings)

new_user_ratings_df = new_user_ratings_df.withColumn('userId', new_user_ratings_df[0].cast(IntegerType()))
new_user_ratings_df = new_user_ratings_df.withColumn('movieId', new_user_ratings_df[1].cast(IntegerType()))
new_user_ratings_df = new_user_ratings_df.withColumn('rating', new_user_ratings_df[2].cast(FloatType()))

new_user_ratings_df = new_user_ratings_df.selectExpr("userId","movieId","rating")

new_user_ratings_df.show()
new_user_ratings_df.printSchema
# df = spark.createDataFrame(new_user_ratings_df)
# new_user_ratings_df=new_user_ratings_df.drop('_1').drop('_2').drop('_3')
# new_user_ratings_df.drop('_1',inplace=True)

# new_user_ratings_RDD = sc.parallelize(new_user_ratings)
# new_user_ratings_df.show()
# new_user_ratings_df.createOrReplaceTempView(new_user_ratings)
# df.rename(columns={'oldName1': 'newName1', 'oldName2': 'newName2'})
# df.rename(columns={ df.columns[1]: "your value" })
# new_user_ratings_df.rename(columns={new_user_ratings.columns[1]: 'movieId'})
# new_user_ratings_df.rename(columns={'_1': 'userId', '_2': 'movieId','_3': 'rating'},inplace= True)

# column_indices = [0,1,2]
# new_names = ['userId','movieId','rating']
# old_names = new_user_ratings_df.columns[column_indices]
# new_user_ratings_df.rename(columns=dict(zip(old_names, new_names)), inplace=True)

# new_user_ratings_df.printSchema



+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|    260|   4.0|
|     0|      1|   3.0|
|     0|     16|   2.0|
|     0|     25|   3.0|
|     0|     32|   4.0|
|     0|    335|   4.0|
|     0|    379|   3.0|
|     0|    296|   2.0|
|     0|    858|   5.0|
|     0|     50|   3.0|
+------+-------+------+



<bound method DataFrame.printSchema of DataFrame[userId: int, movieId: int, rating: float]>

# Join new user rdd with oold one 

In [32]:
complete_data_with_new_ratings_df = complete_ratings_df.union(new_user_ratings_df)

In [102]:


# type(complete_data_with_new_ratings_RDD)

[Row(userId=0)]

In [None]:
# Train new ALS Model 

In [33]:
t0 = time()
als = ALS(maxIter=iterations, regParam=best_regParm,rank=best_rank, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
new_ratings_model = als.fit(complete_data_with_new_ratings_df)
# new_ratings_model = ALS.train(complete_data_with_new_ratings_df, best_rank, seed=seed, 
#                               iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in {} seconds".format(round(tt,3)))

New model trained in 1.745 seconds


# predict ratings for user id 

In [34]:
new_user_ratings

[(0, 260, 4),
 (0, 1, 3),
 (0, 16, 2),
 (0, 25, 3),
 (0, 32, 4),
 (0, 335, 4),
 (0, 379, 3),
 (0, 296, 2),
 (0, 858, 5),
 (0, 50, 3)]

NameError: name 'new_user_rating' is not defined

In [123]:
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import  BooleanType, StringType


invalid_predictions = list(map(lambda x: x[1], new_user_ratings))
print(invalid_predictions)
#print(invalid_predictions
#def valid_movie(movie):
#    return not (movie in invalid_predictions)
#valid_movie_udf = udf(valid_movie, BooleanType())
#complete_movies_df.filter(valid_movie_udf('movieId')).show()


user_id=new_user_ratings_df.select('userId').take(1)[0]['userId']


new_movies_df=complete_movies_df.filter(~complete_movies_df['movieId'].isin(invalid_predictions)).withColumn('userId', lit(user_id)).drop('title')

new_movies_df.printSchema


# new_movies_df = new_movies_df.insert(0,'userId','def')

# new_movies_df['ttt']= new_movies_df['moviesId']
# new_movies_df.show()
# new_user_ratings_df
# new_movies_df.assign(userId=0)
# withColumn('userId', new_user_unrated_movies_df[0].cast(IntegerType()))
# new_movies_df['My new column'] = 'default value'
# new_movies_df.show()


# complete_movies_df.filter(complete_movies_df[1] not in new_user_ratings_ids)




[260, 1, 16, 25, 32, 335, 379, 296, 858, 50]


<bound method DataFrame.printSchema of DataFrame[movieId: int, userId: int]>

In [61]:
new_user_ratings[0][0]

0

In [128]:
# new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
# complete_movies_data=complete_movies_df.rdd
# new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# # new_user_unrated_moves_df = complete_movies_df.

# # Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
# # new_user_unrated_movies_df=new_user_unrated_movies_RDD.DF
# new_user_unrated_movies_df = spark.createDataFrame(new_user_unrated_movies_RDD)

new_movies_df = new_user_unrated_movies_df.withColumn('userId', new_user_unrated_movies_df[0].cast(IntegerType()))
new_movies_df = new_user_unrated_movies_df.withColumn('movieId', new_user_unrated_movies_df[1].cast(IntegerType()))

# new_user_unrated_movies_df = new_user_unrated_movies_df.selectExpr("userId","movieId")

# # new_user_unrated_movies_RDD.take(5)
# new_user_unrated_movies_df.count()
new_user_recommendations_df = new_ratings_model.transform(new_movies_df)


In [130]:
new_user_recommendations_df.show()
# new_movies_df.show()
# new_user_unrated_movies_df.show(800)

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|     1|    148| 4.3277755|
|     1|    471| 4.1512995|
|     1|    496| 4.3277755|
|     1|    833|  2.403976|
|     1|   1088| 3.8973567|
|     1|   1238| 4.5507135|
|     1|   1342| 3.1059513|
|     1|   1580|  4.013965|
|     1|   1591|  3.070634|
|     1|   1645| 3.9537828|
|     1|   1829| 3.6319897|
|     1|   1959| 4.2179365|
|     1|   2122|  2.936361|
|     1|   2142| 3.3981826|
|     1|   2366|  4.204904|
|     1|   2659|  2.076134|
|     1|   2866|  4.070711|
|     1|   3175|  4.174765|
|     1|   3794| 2.8586311|
|     1|   3918|   3.49266|
+------+-------+----------+
only showing top 20 rows



In [174]:
movies_rating_counts_df.count()
df=movies_rating_counts_df.sort('movieId')
df.show()

+-------+-----+
|movieId|count|
+-------+-----+
|      1|  215|
|      2|  110|
|      3|   52|
|      4|    7|
|      5|   49|
|      6|  102|
|      7|   54|
|      8|    8|
|      9|   16|
|     10|  132|
|     11|   70|
|     12|   19|
|     13|    8|
|     14|   18|
|     15|   13|
|     16|   82|
|     17|   67|
|     18|   20|
|     19|   88|
|     20|   15|
+-------+-----+
only showing top 20 rows



In [183]:
# df2= new_user_recommendations_df.sort('movieId')
# df2.withColumn('count',df["count"])

df3 =new_user_recommendations_df.join(movies_rating_counts_df, on='movieId')


### 

In [181]:
pd.__version__

'0.23.4'

In [169]:
# new_user_recommendations_df.withColumn('movie_rating_counts',movies_rating_counts_df['count'].cast(IntegerType()))
df=new_user_recommendations_df.join(movies_rating_counts_df)

type(new_user_recommendations_df)
# type(movies_rating_counts_df)
result=pd.merge(new_user_recommendations_df,movies_rating_counts_df,on='movieId')

ValueError: can not merge DataFrame with instance of type <class 'pyspark.sql.dataframe.DataFrame'>

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)

In [None]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

# New user recommendation RDD 

In [None]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [None]:
# Find top rated movies with more than 25 ratings 

In [None]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

In [None]:
# Predict rating for user ID = 0

In [None]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

In [None]:
#save model for future use 

In [None]:

model_path = os.path.join(cwd, 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)