# Big Data Processing Coursework





In this short notebook, we will load and explore the movielens dataset. Specifically, this notebook covers:

Loading data in memory
Creating SQLContext
Creating Spark DataFrame
Group data by columns
Operating on columns
Running SQL Queries from a Spark DataFrame
Loading in a DataFrame

Build a recommendation system which uses transactional data linking a user and an item to get a list of items to recommend to the user. There are 2 approaches:
## Collaborative Filtering
In the MovieLens dataset, we have movies previously rated by a user, and we attempt to identify if users who previously behaved similarly, ie liked/ disliked similar movies in the past, will have similar behaviors in the future. 
## Content based Filtering


Importing External files/Libraries¶

In [None]:
#!/usr/bin/python
from __future__ import print_function 

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext,sql

import sys
import re
import random
import array
import numpy as np
import scipy.sparse as sps

## Load PySpark

In [None]:
sc = SparkContext(appName = "MovieLens").getOrCreate()
sqlContext = sql.SQLContext(sc)

## Custom Functions 

Functions for parsing movielens data and functions for comparing item similarity

In [None]:
sc.addPyFile("similarity.py")
sc.addPyFile("movielensfcn.py")
from movielensfcn import parseMovies, removeDuplicates, itemItem
from similarity import cosine_similarity, jaccard_similarity

First, let's get the data that we will working with in this notebook. We are using two files from the MovieLens dataset

In [None]:
#!wget --quiet http://www.grouplens.org/system/files/ml-100k.zip | unzip -q -o -d /data/movie-ratings/ | hadoop fs -put - /data/movie-ratings/

In [None]:
!hadoop fs -ls  '/data/movie-ratings'

In [None]:
!hadoop fs -ls  '/data/movie-ratings/ml-10M100K'

In [None]:
!hadoop fs -ls  '/data/movie-ratings/cv'

Review the contents of the movies.dat and ratings.dat files

In [7]:
ratings_file = "/data/movie-ratings/ratings.dat"
movies_file = "/data/movie-ratings/movies.dat"

In [8]:
ratings_raw = sc.textFile(ratings_file)
movies_raw = sc.textFile(movies_file)

Inspect the files to see what we are dealing with

In [9]:
print(ratings_raw.take(5))

[u'1::122::5::838985046', u'1::185::5::838983525', u'1::231::5::838983392', u'1::292::5::838983421', u'1::316::5::838983392']


## dat files
There is no header file
Notice that the columns are separated by :: 
Observe also that the field is in the following format:
user::movie::rating::timestamp

In [None]:
print(movies_raw.take(5))

There is no header file
Notice that the columns are separated by :: Observe also that the field is in the following format
movie::titleandyear::genre

In [None]:
print(movies_raw.count())

In [None]:
print(ratings_raw.count())

Since there are approximately 1M records, it may be faster to set the number of partitions on spark.  Since the movie file is relatively small we can hold in memory using collect

In [14]:
numPartitions =1000

In [36]:
if (ratings_file.find('.dat')):
	movies= movies_raw.map(lambda line: re.split(r'::',line))\
        .map(lambda line: (int(line[0]),(line[1],line[2]))).collect()
	ratings = ratings_raw.map(lambda line: re.split(r'::',line))\
                        .map(lambda line: (int(line[0]),(int(line[1]),float(line[2]))))\
                        .partitionBy(numPartitions)
else:
	ratings_header = ratings_raw.take(1)[0]
	movies_header = movies_raw.take(1)[0]
	movies= movies_raw.filter(lambda line: line!=movies_header)\
                    .map(lambda line: re.split(r',',line)).map(lambda line: (int(line[1]),(line[0],line[2])))
	ratings = ratings_raw.filter(lambda line: line!=ratings_header)\
                    .map(lambda line: re.split(r',',line))\
                    .map(lambda x: (int(line[1]),(int(line[0]),float(line[2]))))\
                    .partitionBy(numPartitions)


In [37]:
ratings.take(5)

[(36000, (1, 2.0)),
 (36000, (32, 4.5)),
 (36000, (34, 2.0)),
 (36000, (50, 4.0)),
 (36000, (107, 3.5))]

In [38]:
RatingsDF = ratings.toDF(['item_id','userid_rating'])

Let's check the type of RatingsDF

In [39]:
type(RatingsDF)

pyspark.sql.dataframe.DataFrame

The printSchema() method gives more details about the DataFrame’s schema and structure:

In [40]:
RatingsDF.printSchema()

root
 |-- item_id: long (nullable = true)
 |-- userid_rating: struct (nullable = true)
 |    |-- _1: long (nullable = true)
 |    |-- _2: double (nullable = true)



In [41]:
RatingsDF.show(5)

+-------+-------------+
|item_id|userid_rating|
+-------+-------------+
|  36000|      [1,2.0]|
|  36000|     [32,4.5]|
|  36000|     [34,2.0]|
|  36000|     [50,4.0]|
|  36000|    [107,3.5]|
+-------+-------------+
only showing top 5 rows



How many movies do we have in the movies file?

In [26]:
numMovies = ratings.values().map(lambda line: line[1]).distinct().count()
print("number of movies: {0}".format(numMovies))

number of movies: 10677


How many users have rated our movies?

In [27]:
numUsers = ratings.values().map(lambda line: line[0]).distinct().count()
print("number of users: {0}".format(numUsers))

number of users: 10


In [29]:
numRatings = ratings.count()
print("total number of ratings: {0}".format(numRatings))

total number of ratings: 10000054


Joining RDDs
Create RDDs for the same ratings and the movies files.

In [31]:
user_ratings_data = ratings.join(ratings)

Join two dataframes and get only one 'item_id' column 

In [59]:
user_ratingsDF = RatingsDF.join(RatingsDF,'item_id')

In [43]:
user_ratings_data.take(5)

[(48000, ((1, 1.0), (1, 1.0))),
 (48000, ((1, 1.0), (2, 3.0))),
 (48000, ((1, 1.0), (19, 0.5))),
 (48000, ((1, 1.0), (34, 1.5))),
 (48000, ((1, 1.0), (39, 1.5)))]

In [60]:
user_ratingsDF.show(5)

+-------+-------------+-------------+
|item_id|userid_rating|userid_rating|
+-------+-------------+-------------+
|    231|     [50,5.0]|     [50,5.0]|
|    231|     [50,5.0]|    [135,3.0]|
|    231|     [50,5.0]|    [318,4.0]|
|    231|     [50,5.0]|   [1136,5.0]|
|    231|     [50,5.0]|   [1394,2.0]|
+-------+-------------+-------------+
only showing top 5 rows



In [45]:
user_ratingsDF2 = sqlContext.createDataFrame(user_ratings_data,['item_id','userid_rating'])

In [46]:
user_ratingsDF2.show(5)

+-------+------------------+
|item_id|     userid_rating|
+-------+------------------+
|  48000| [[1,1.0],[1,1.0]]|
|  48000| [[1,1.0],[2,3.0]]|
|  48000|[[1,1.0],[19,0.5]]|
|  48000|[[1,1.0],[34,1.5]]|
|  48000|[[1,1.0],[39,1.5]]|
+-------+------------------+
only showing top 5 rows




Remove a rating if a user gives the same value for the same movie

In [None]:
unique_joined_ratings = user_ratings_data.filter(removeDuplicates)

Map RDDs

In [None]:
movie_pairs = unique_joined_ratings.map(itemItem).partitionBy(numPartitions)


Now group all ratings together for the same movie

In [None]:
movie_pairs_ratings= movie_pairs.groupByKey()

In [2]:
from math import sqrt
def cosine_similarity(ratingPairs):

    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0

    for ratingX, ratingY in ratingPairs:

        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = ((float(numerator)) / (float(denominator)))

    return (score, numPairs)

def jaccard_similarity(ratingPairs):
 #   "The Jaccard similarity coefficient is a commonly used indicator of the similarity between two sets. For sets A and B it is defined to be the ratio of the number of elements of their intersection and the number of elements of their union If A and B are both empty, we define Jaccard_Similarity(A,B) = 1."

    numPairs = 0
    intersect_xy=setX=setY={}
    for ratingX, ratingY in ratingPairs:
        setX =set(ratingX).union(setX)
        setY =set(ratingY).union(setY)
        intersect_xy = setX.intersect(setY)
        numPairs += 1

    numerator = intersect_xy
    denominator = len(setX) + len(setY) - len(intersectXandY)

    score = 0
    if (denominator):
        score = ((float(numerator)) / (float(denominator)))

    return (score, numPairs)

In [9]:
if algorithm == "JACCARD" :
	item_item_similarities = movie_pairs_ratings.mapValues(jaccard_similarity).persist()
elif algorithm == "COSINE" :
	item_item_similarities = movie_pairs_ratings.mapValues(cosine_similarity).persist()
else:
	item_item_similarities = movie_pairs_ratings.mapValues(cosine_similarity).persist()

KeyboardInterrupt: 

In [None]:
threshold = float(0.97)
topN= int(50)


In [None]:
item_item_sorted=item_item_similarities.sortByKey()

In [None]:
item_item_sorted.persist()

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = item_item_sorted.filter(lambda((item_pair,similarity_occurence)): \
        (item_pair[0] == movie_id or item_pair[1] == movie_id) \
        and similarity_occurence[0] > threshold and similarity_occurence[1] > minOccurence)

if (topN==0):
    topN=10

results = filteredResults.map(lambda((x,y)): (y,x)).sortByKey(ascending = False)
resultsTopN = results.take(topN)
results.coalese(1).saveAsTextFile("movielens")




The join function combines two datasets (Key,ValueV) and (Key,ValueW) together to get (Key, (ValueV,ValueW)).  Let's join the movie and ratings file together to get meaningful recommendations

In [None]:

   print "Top 10 similar movies for " + nameDict[movieID]
   for result in resultsTopN:
       (sim, pair) = result
#         Display the similarity result that isn't the movie we're looking at
       similarMovieID = pair[0]
       if (similarMovieID == movieID):
           similarMovieID = pair[1]
       print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
	

In [None]:
sc.stop()