# Rating some movies
#### To make recommendation for you, we are going to learn your taste by asking you to rate a few movies.

In [1]:
import sys
import os.path
from os.path import isfile
from time import time
import pandas as pd
import math

In [2]:


topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

ratingsFilePath = os.path.abspath('C:\\cs4337\\personalRatings.txt')
ratingsFile = open(ratingsFilePath, 'r+')
if isfile(ratingsFilePath):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
          ratingsFile.truncate(0)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

now = int(time())
n = 0

f = open(ratingsFilePath, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")


Please rate the following movie (1-5 (best), or 0 if not seen): 


# Solution Structure

In [3]:
#!/usr/bin/env python

import sys
import os
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:



def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
    sc = spark.sparkContext


In [5]:
#load personal ratings
myRatings = loadRatings(ratingsFilePath)
myRatingsRDD = sc.parallelize(myRatings, 1)

# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
ratingsRDD = sc.textFile(os.path.abspath('C:\\cs4337\\ratings.dat')).map(parseRating)

# movies is an RDD of (movieId, movieTitle)
moviesRDD = sc.textFile(os.path.abspath('C:\\cs4337\\movies.dat')).map(parseMovie)


In [6]:
#convert personalRatingsRDD to dataframe
myRatingsDF = myRatingsRDD.toDF()
myRatingsDF = myRatingsDF.withColumn('userId', col('_1')).withColumn('movieId', col('_2')).withColumn('rating', col('_3'))
myRatingsDF = myRatingsDF.drop('_1')
myRatingsDF = myRatingsDF.drop('_2')
myRatingsDF = myRatingsDF.drop('_3')

#convert ratingsRDD to dataframe
ratingsDF = ratingsRDD.toDF()
ratingsDF = ratingsDF.drop("_1")
ratingsDF = ratingsDF.withColumn("userId", col("_2").getItem("_1")).withColumn("movieId", col("_2").getItem("_2")).withColumn("rating", col("_2").getItem("_3"))
ratingsDF = ratingsDF.drop("_2")

#add personalRatings to ratings
ratingsDF = ratingsDF.union(myRatingsDF)
ratingsDF = ratingsDF.orderBy('userId', ascending=True)

#split ratings into test and train
(training, test) = ratingsDF.randomSplit([0.8, 0.2])

In [7]:
# Build the recommendation model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

#The accuracy of the model
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.889519424214175


In [8]:
#get reccommendations for userId: 0 (personal ratings)

userRecs = model.recommendForAllUsers(10)
userRecs = userRecs.where(userRecs.userId == 0).select('recommendations')
newUserRecs = userRecs.select(userRecs.recommendations, explode('recommendations'))
newUserRecs = newUserRecs.withColumn("movieId", col("col").getItem("movieId")).withColumn('rating', col('col').getItem('rating'))
newUserRecs = newUserRecs.drop('recommendations')
newUserRecs = newUserRecs.drop('col')

In [9]:
#convert moviesRDD to dataframe
moviesDF = moviesRDD.toDF()
moviesDF = moviesDF.withColumn('movieId', col('_1')).withColumn('movieTitle', col('_2'))
moviesDF = moviesDF.drop('_1')
moviesDF = moviesDF.drop('_2')

newUserRecs = newUserRecs.join(moviesDF, 'movieId')
newUserRecs = newUserRecs.orderBy('rating', ascending=False)
movieTitles = newUserRecs
movieTitles = movieTitles.drop('movieId')
movieTitles = movieTitles.drop('rating')

In [10]:
print("Movies recommended for you:")
result = movieTitles.take(5)

count = 1;
for row in result:
    print("%i: %s" % (count, row[0]))
    count += 1

Movies recommended for you:
1: Nekromantik (1987)
2: Spirits of the Dead (Tre Passi nel Delirio) (1968)
3: From the Journals of Jean Seberg (1995)
4: Barenaked in America (1999)
5: Boys Life 2 (1997)


In [11]:
# clean up
sc.stop()