### Dataset Link : https://grouplens.org/datasets/movielens/100k/

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext
import collections

In [2]:
## Create spark context using standalone mode
conf = SparkConf().setMaster("local").setAppName("RatingHistogram")
sc = SparkContext(conf = conf)

### Movie rating Histogram

In [3]:
## Create a DD from text file and transform it to only contains rantings column
lineRDD = sc.textFile("C:/Users/lenovo/Desktop/spark/ml-100k/u.data")
ratingRDD = lineRDD.map(lambda x:x.split()[2])
## use RDD action countByValue to count how many times each value occurs...
result = ratingRDD.countByValue()

In [4]:
## Print out the results
sortedResult = collections.OrderedDict(sorted(result.items()))
print('\n rating count')

for key, value in sortedResult.items():
    print("%s %i" %(key, value))


 rating count
1 6110
2 11370
3 27145
4 34174
5 21201


### Movie rating distribution

In [5]:
movieid = '127' ## MovieID for GodFather is 127
movieidcolidx = 1 ##  2nd column has movieID in data file
ratingcolidx = 2

movieRatingRDD = lineRDD.filter(lambda x:x.split()[movieidcolidx] == movieid)

ratingsHistogram = movieRatingRDD.map(lambda x:(x.split()[ratingcolidx],1)).reduceByKey(lambda a,b:a+b).collect()
print("\n Rating distribution is for movie GodFather", ratingsHistogram)
print('\n')


 Rating distribution is for movie GodFather [('4', 137), ('2', 23), ('5', 214), ('3', 33), ('1', 6)]




### Find Most rated movies

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
## Create spark session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

def mapper(line):
    fields = line.split("\t")
    return Row(user_id=int(fields[0]), movie_id=int(fields[1]), rating=int(fields[2]), timestamp=int(fields[3]))

## Create rdd from text file then transform it into a rdd with Row object using map
ratingsRDD = lineRDD.map(mapper)
## register the dataframe as a table
schemaRatings = spark.createDataFrame(ratingsRDD).cache()
schemaRatings.createOrReplaceTempView("movie_ratings")
## SQL can be run over dataframe that have registered  as a table
query = "SELECT movie_id, count(rating) as cnt FROM movie_ratings GROUP BY movie_id order by cnt desc limit 10"
top_rated = spark.sql(query)

## The results of SQL queries are rdd and support all rdd operations
print("Most rated movies are:\n")   
for top_movie in top_rated.collect():
    print(top_movie)

print("\n")                                                   
spark.stop()                                               

Most rated movies are:

Row(movie_id=50, cnt=583)
Row(movie_id=258, cnt=509)
Row(movie_id=100, cnt=508)
Row(movie_id=181, cnt=507)
Row(movie_id=294, cnt=485)
Row(movie_id=286, cnt=481)
Row(movie_id=288, cnt=478)
Row(movie_id=1, cnt=452)
Row(movie_id=300, cnt=431)
Row(movie_id=121, cnt=429)




### Movie Recommendation

#### Run from Here......

In [1]:
import findspark
findspark.init()

import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating

In [2]:
conf = SparkConf().setMaster('local[*]').setAppName("MovieRecommendationSystem")
sc = SparkContext(conf = conf)
sc.setCheckpointDir('checkpoint')
 ## from the movies catalog file create a dictionary of movieID to name
def load_movie_name_dict():
    movieNames = {}
    with open("ml-100k/u.ITEM", encoding='ascii', errors='ignore') as file:
        for line in file:
            fields = line.split("|")
            movieID = int(fields[0])
            movieName = fields[1]
            movieNames[movieID] = movieName
        return movieNames
print("Loading movie names dictionary...")  
movieNameDict = load_movie_name_dict()


Loading movie names dictionary...


In [20]:
## Now let get to ratings data
## Lets create a rdd with rating objects as expected by MLLib

ratings = sc.textFile("C:/Users/lenovo/Desktop/spark/ml-100k/u.data")
ratingsRDD = ratings.map(lambda l:l.split("\t")).map(lambda l:Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
print("Training recommendation system ...")
rank =10
numIterations =6

model = ALS.train(ratingsRDD, rank, numIterations)

userID = int(input("Enter UserID :"))

print("\nRating given by userID" + str(userID)+":")


userRatings = ratingsRDD.filter(lambda l: l[0] == userID)
for rating in userRatings.collect():
    print (movieNameDict[int(rating[1])] +": "+ str(rating[2]))


Training recommendation system ...
Enter UserID :1

Rating given by userID1:
Three Colors: White (1994): 4.0
Grand Day Out, A (1992): 3.0
Desperado (1995): 4.0
Glengarry Glen Ross (1992): 4.0
Angels and Insects (1995): 4.0
Groundhog Day (1993): 5.0
Delicatessen (1991): 5.0
Hunt for Red October, The (1990): 4.0
Dirty Dancing (1987): 2.0
Rock, The (1996): 3.0
Ed Wood (1994): 4.0
Star Trek: First Contact (1996): 4.0
Pillow Book, The (1995): 5.0
Horseman on the Roof, The (Hussard sur le toit, Le) (1995): 5.0
Star Trek VI: The Undiscovered Country (1991): 4.0
From Dusk Till Dawn (1996): 3.0
So I Married an Axe Murderer (1993): 4.0
Shawshank Redemption, The (1994): 5.0
True Romance (1993): 3.0
Star Trek: The Wrath of Khan (1982): 5.0
Kull the Conqueror (1997): 1.0
Independence Day (ID4) (1996): 4.0
Wallace & Gromit: The Best of Aardman Animation (1996): 5.0
Wizard of Oz, The (1939): 4.0
Faster Pussycat! Kill! Kill! (1965): 1.0
Citizen Kane (1941): 4.0
Silence of the Lambs, The (1991): 4.0
Bl

In [19]:
# now lets use our model to recommend movies for this user..
print("\n Top 10 recommendations:")
recommendations = model.recommendProducts(userID, 10)
for recommendation in recommendations :
    print(movieNameDict[int (recommendation[1])]+ " Score "+ str(recommendation[2]))
print("\n")


 Top 10 recommendations:
Story of Xinghua, The (1993) Score 6.053624579706944
Primary Colors (1998) Score 5.9906982977785015
Angel Baby (1995) Score 5.611389722762487
Schizopolis (1996) Score 5.570771146250742
Margaret's Museum (1995) Score 5.543642704322801
Truman Show, The (1998) Score 5.448789411864578
Pather Panchali (1955) Score 5.439966909166571
Night on Earth (1991) Score 5.391054895937481
Ma vie en rose (My Life in Pink) (1997) Score 5.390659524542199
Antonia's Line (1995) Score 5.377536486768896


