In [1]:
import findspark
findspark.init()

In [2]:
import sys
import os
from time import time

import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS

topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

parentDir = os.path.abspath('')
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        #remove(ratingsFile)
        a_file = open(ratingsFile, "w")
        a_file.truncate()
        a_file.close()
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)
print(topMovies)

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")

Looks like you've already rated the movies. Overwrite ratings (y/N)? y
Please rate the following movie (1-5 (best), or 0 if not seen): 
1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)
Toy Story (1995): 2
Independence Day (a.k.a. ID4) (1996): 3
Dances with Wolves (1990): 4
Star Wars: Episode VI - Return of the Jedi (1983): 3
Mission: Impossible (1996): 2
Ace Ventura: Pet Detective (1994): 3
Die Hard: With a Vengeance (1995): 2
Batman Forever (1995): 3
Pretty Woman (1990): 2
Men in Black (1997): 3
Dumb & Dumber (1994): 2


In [4]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import col

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

#Created a second parseRating for rating as the field three was causing issues with the rdd to dataframe conversion
def parseRating2(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[0]) , int(fields[1]), int(fields[2]), float(fields[3])


def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), str(fields[1]),str(fields[2])

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

if __name__ == "__main__":
    
    from pyspark.sql import SparkSession
    # Build the SparkSession
    
    spark = SparkSession.builder \
    .master("local") \
    .appName("Movie Recommendation Engine") \
    .config("spark.executor.memory", "1gb") \
    .getOrCreate()
   
    sc = spark.sparkContext
    # load personal ratings
    myRatings = loadRatings(os.path.abspath('./personalRatings.txt'))
    myRatingsRDD = sc.parallelize(myRatings)
    
    # load ratings and movie titles

    movieLensHomeDir = os.path.abspath('.')

    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = list(sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating2).collect())
    RatingsRDD = sc.parallelize(ratings)
    
    
    # movies is an RDD of (movieId, movieTitle)
    movies = list(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
    moviesStorage = sc.parallelize(movies)

<h1>Data Exploration<h1>

In [5]:
#examine the data make sure rdd has been correctly implemented
moviesStorage.take(3)

[(1, 'Toy Story (1995)', "Animation|Children's|Comedy"),
 (2, 'Jumanji (1995)', "Adventure|Children's|Fantasy"),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance')]

In [6]:
RatingsRDD.take(3)

[(1, 1193, 5, 978300760.0), (1, 661, 3, 978302109.0), (1, 914, 3, 978301968.0)]

In [7]:
myRatingsRDD.take(3)

[(0, 1, 2.0), (0, 780, 3.0), (0, 590, 4.0)]

In [8]:
#Covert all RDD's to dataframes
    
#dataframe of ratings from personal file
from pyspark.sql import SQLContext, Row
myratingsDF = myRatingsRDD.map(lambda line: Row(UserID=line[0],  
                                                MovieID=line[1],
                                                Rating=line[2])).toDF()
    
    #dataframe of movies
moviesDF = moviesStorage.map(lambda line: Row(MovieID=line[0],
                                             Title=line[1],
                                             Genres=line[2])).toDF()
    #dataframe of ratings from user
ratingsDF = RatingsRDD.map(lambda line: Row(UserID=line[0],
                                         MovieID=line[1],
                                         Rating=line[2])).toDF()

In [9]:
#check the results
myratingsDF.show()
moviesDF.show()
ratingsDF.show()

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|      1|   2.0|
|     0|    780|   3.0|
|     0|    590|   4.0|
|     0|   1210|   3.0|
|     0|    648|   2.0|
|     0|    344|   3.0|
|     0|    165|   2.0|
|     0|    153|   3.0|
|     0|    597|   2.0|
|     0|   1580|   3.0|
|     0|    231|   2.0|
+------+-------+------+

+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|Adventure|Children's|
|      9| Sudden Death (1995)|              Action|
|     10|    Go

<h1>Data Preprocessing </h1>

In [10]:
#examine the data for any errors and spot necessary data for the ALS algorithm

#print the schemas to examine coloumn types
myratingsDF.printSchema()
moviesDF.printSchema()
ratingsDF.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: double (nullable = true)

root
 |-- MovieID: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)

root
 |-- UserID: long (nullable = true)
 |-- MovieID: long (nullable = true)
 |-- Rating: long (nullable = true)



In [11]:
#join the myratingsDF with the user ratings table. This will allow algorithm to perform collaborative filtering on my movies

allRatingsDF = myratingsDF.union(ratingsDF)

In [12]:
#check if the join was successful if user id 0 is in the table the operation has worked

allRatingsDF.show()

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|      1|   2.0|
|     0|    780|   3.0|
|     0|    590|   4.0|
|     0|   1210|   3.0|
|     0|    648|   2.0|
|     0|    344|   3.0|
|     0|    165|   2.0|
|     0|    153|   3.0|
|     0|    597|   2.0|
|     0|   1580|   3.0|
|     0|    231|   2.0|
|     1|   1193|   5.0|
|     1|    661|   3.0|
|     1|    914|   3.0|
|     1|   3408|   4.0|
|     1|   2355|   5.0|
|     1|   1197|   3.0|
|     1|   1287|   5.0|
|     1|   2804|   5.0|
|     1|    594|   4.0|
+------+-------+------+
only showing top 20 rows



In [13]:
#check for missing data 

MissingData = allRatingsDF.dropDuplicates().dropna(
    how='any', subset=['UserID','MovieID','Rating'])
    
numberOfmissingData = allRatingsDF.count() - MissingData.count()
print(numberOfmissingData) #there is 0 missing data no further action needed 

0


In [14]:
#check data for duplicates

countOrignalData = allRatingsDF.count() #count up all original data files
countDuplicatedData = allRatingsDF.dropDuplicates().count() #drop any duplicates from the data and count
    
print(countOrignalData)
print(countDuplicatedData) #no duplicates in the data set as the numbers still match

1000220
1000220


<h1>Standardization</h1>

In [15]:
#the following code scales the values between 0-1 to improve the machine learning models performance

print("Before Scaling :") #before conversion
allRatingsDF.show(5)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over ratings to be scaled
for i in ["Rating"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fit pipeline on dataframe
    allRatingsDF = pipeline.fit(allRatingsDF).transform(allRatingsDF).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("After Scaling :") #after conversion
allRatingsDF.show(5)

Before Scaling :
+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|      1|   2.0|
|     0|    780|   3.0|
|     0|    590|   4.0|
|     0|   1210|   3.0|
|     0|    648|   2.0|
+------+-------+------+
only showing top 5 rows

After Scaling :
+------+-------+------+-------------+
|UserID|MovieID|Rating|Rating_Scaled|
+------+-------+------+-------------+
|     0|      1|   2.0|         0.25|
|     0|    780|   3.0|          0.5|
|     0|    590|   4.0|         0.75|
|     0|   1210|   3.0|          0.5|
|     0|    648|   2.0|         0.25|
+------+-------+------+-------------+
only showing top 5 rows



In [16]:
dfdrop = allRatingsDF.drop('Rating') #drop the old rating column
allRatingsDF = dfdrop.withColumnRenamed("Rating_Scaled","Rating") #rename the scaled column to rating
allRatingsDF.show()

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     0|      1|  0.25|
|     0|    780|   0.5|
|     0|    590|  0.75|
|     0|   1210|   0.5|
|     0|    648|  0.25|
|     0|    344|   0.5|
|     0|    165|  0.25|
|     0|    153|   0.5|
|     0|    597|  0.25|
|     0|   1580|   0.5|
|     0|    231|  0.25|
|     1|   1193|   1.0|
|     1|    661|   0.5|
|     1|    914|   0.5|
|     1|   3408|  0.75|
|     1|   2355|   1.0|
|     1|   1197|   0.5|
|     1|   1287|   1.0|
|     1|   2804|   1.0|
|     1|    594|  0.75|
+------+-------+------+
only showing top 20 rows



<h1>Movie Recommendation Algorithm (ALS)</h1>

In [17]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit


#data split 0.8-0.2
train_data, test_data = allRatingsDF.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
#iteration = 10, regularization parameter = 0.05 , 
#rank is imported as it defines the hidden values such as sex, gender and income
als = ALS(maxIter=10, regParam=0.05,userCol='UserID',rank = 3, itemCol='MovieID', ratingCol='Rating', coldStartStrategy="drop", nonnegative = True)

#fit the training data to the model
model = als.fit(train_data)

In [18]:
prediction = model.transform(test_data) #tranform model

In [19]:
prediction.show(30) #we can compare our prediction rating with our actual ratings here...Results are pretty decent

+------+-------+------+----------+
|UserID|MovieID|Rating|prediction|
+------+-------+------+----------+
|  4169|    148|   0.5| 0.4941019|
|  5333|    148|   0.5|0.39152807|
|  4387|    148|   0.0| 0.3419803|
|   482|    148|  0.25|0.40790772|
|  3829|    148|  0.25| 0.3979937|
|  4858|    463|   0.5|0.41530326|
|  3328|    463|  0.75|0.46033332|
|  4277|    463|  0.75| 0.5164586|
|   202|    463|   0.5|0.41611063|
|  5795|    463|   0.0|0.32072073|
|  1962|    463|   0.5|0.30541745|
|   392|    471|  0.75|0.60837865|
|   516|    471|  0.25|  0.464225|
|   588|    471|  0.75| 0.6749089|
|  5847|    471|  0.75| 0.6936333|
|  5178|    471|  0.75|  0.655625|
|  1496|    471|  0.25|0.52126634|
|  1243|    471|   0.5|  0.620561|
|  5841|    471|   0.5| 0.7037088|
|  2168|    471|   1.0|0.71343344|
|  3519|    471|  0.75|  0.513861|
|  1125|    471|   0.5| 0.5343544|
|  2042|    471|  0.25|0.54307103|
|  2414|    471|  0.75| 0.6783346|
|  2724|    471|   0.5|0.65997326|
|  5786|    471|  0.

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="mse", labelCol="Rating",  predictionCol="prediction")
mse = evaluator.evaluate(prediction)
print(mse) #calculate the mean squared error //how close a regression line is to a set of points 
#lower the mse the better the prediction

0.0525719833437006


In [21]:
recommended_movie_df = model.recommendForAllUsers(5) #recommend 5 movies for all users 

In [22]:
from pyspark.sql.functions import explode
recommended_movie_df = recommended_movie_df\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('UserID', col("rec_exp.MovieID"), col("rec_exp.Rating")) #format the recommendations column..explode out into MovieID and Rating

theRecommendedMoviesDF = recommended_movie_df.join(moviesDF,['MovieID'],'left') #join the movies by Movie ID to see the titles

theRecommendedMoviesDF.show()
#get my personal ratings
personalRatings = theRecommendedMoviesDF.filter(theRecommendedMoviesDF['UserID']==0).select(['MovieID', 'UserID','Title','Rating']).sort(col("Rating").desc())

+-------+------+----------+--------------------+------+
|MovieID|UserID|    Rating|               Title|Genres|
+-------+------+----------+--------------------+------+
|   3382|  1580|  0.985421|Song of Freedom (...| Drama|
|   3382|  4900| 1.2948103|Song of Freedom (...| Drama|
|   3382|  5300| 1.0974194|Song of Freedom (...| Drama|
|   3382|   471| 0.9067986|Song of Freedom (...| Drama|
|   3382|  1591| 1.4301045|Song of Freedom (...| Drama|
|   3382|  4101| 1.4704807|Song of Freedom (...| Drama|
|   3382|  1342| 1.1182377|Song of Freedom (...| Drama|
|   3382|  2122| 1.0881228|Song of Freedom (...| Drama|
|   3382|  2142| 1.1738149|Song of Freedom (...| Drama|
|   3382|   463|0.72880965|Song of Freedom (...| Drama|
|   3382|   833| 1.4198496|Song of Freedom (...| Drama|
|   3382|  5803|0.72812986|Song of Freedom (...| Drama|
|   3382|  3794|0.92622286|Song of Freedom (...| Drama|
|   3382|  1645| 1.8548088|Song of Freedom (...| Drama|
|   3382|  3175| 1.0355017|Song of Freedom (...|

In [23]:
import numpy as np

#Finally would you like to see your recommended movies :)

r = input("Would you like to see movies Recommended for you (y/N)? ")
if r and r[0].lower() == "y":
        # define a list
        print("Movies Recommended for you:")
        List = np.array(personalRatings.select("Title").collect())
        i = 0
        while i < len(List):
            print(i+1, end = ': ')
            print((str(List[i]).lstrip('[').rstrip(']')))
            i = i+1
else:
        sys.exit()

Would you like to see movies Recommended for you (y/N)? y
Movies Recommended for you:
1: 'Song of Freedom (1936)'
2: 'Foreign Student (1994)'
3: 'Mamma Roma (1962)'
4: 'Bells, The (1926)'
5: 'Smashing Time (1967)'


In [None]:
sc.stop()