In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
# !tar xf spark-2.4.4-bin-hadoop2.7.tgz
# !pip install -q findspark

# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init("../dataset/spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster("local").setAppName("MovieLensInsights")
sc = SparkContext(conf = conf)

# Question 5 (1)

In [3]:
linesRDD = sc.textFile('../dataset/ml-latest-small/ratings.csv')
header = linesRDD.first() #extract header
linesRDD = linesRDD.filter(lambda x: x != header) #filter out header
ratingsRDD = linesRDD.map(lambda x: x.encode("ascii", "ignore").split(',')[:-1]) #removing the timestamp column

In [4]:
print(ratingsRDD.collect()[:10])

[['1', '1', '4.0'], ['1', '3', '4.0'], ['1', '6', '4.0'], ['1', '47', '5.0'], ['1', '50', '5.0'], ['1', '70', '3.0'], ['1', '101', '5.0'], ['1', '110', '4.0'], ['1', '151', '5.0'], ['1', '157', '5.0']]


## average user rating

In [5]:
userRatingsRDD = ratingsRDD.map(lambda x: (x[0],(float(x[2]), 1)))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: ("u_" + x[0], x[1][0] / x[1][1]))
                #.sortBy(lambda x: -x[1])
print(userRatingsRDD.collect())

[('u_199', 3.3953168044077136), ('u_198', 3.491304347826087), ('u_344', 3.7681159420289854), ('u_345', 3.903225806451613), ('u_346', 3.682926829268293), ('u_347', 3.5555555555555554), ('u_340', 4.214285714285714), ('u_341', 3.798076923076923), ('u_342', 2.9393939393939394), ('u_343', 4.059322033898305), ('u_348', 4.672727272727273), ('u_349', 3.72972972972973), ('u_298', 2.363684771033014), ('u_299', 3.652173913043478), ('u_296', 4.166666666666667), ('u_297', 2.5972222222222223), ('u_294', 2.610983981693364), ('u_295', 3.7439024390243905), ('u_292', 3.3015695067264574), ('u_293', 2.619047619047619), ('u_290', 4.142322097378277), ('u_291', 4.258064516129032), ('u_591', 3.2777777777777777), ('u_590', 3.355769230769231), ('u_593', 3.266990291262136), ('u_592', 3.5851063829787235), ('u_595', 4.2), ('u_594', 3.9245689655172415), ('u_597', 3.9774266365688487), ('u_596', 3.495133819951338), ('u_195', 3.5294117647058822), ('u_194', 3.475), ('u_197', 3.857142857142857), ('u_196', 3.9375), ('u_1

## average movie rating

In [6]:
movieRatingsRDD = ratingsRDD.map(lambda x: (x[1],(float(x[2]), 1)))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: ("m_" + x[0], x[1][0] / x[1][1]))
                #.sortBy(lambda x: -x[1])
print(movieRatingsRDD.collect()[:20])

[('m_110553', 2.6363636363636362), ('m_98160', 1.5), ('m_4447', 3.15625), ('m_5745', 5.0), ('m_172793', 5.0), ('m_101765', 4.25), ('m_188751', 4.5), ('m_114074', 3.3333333333333335), ('m_55292', 0.75), ('m_5988', 2.625), ('m_5989', 3.9217391304347826), ('m_55294', 2.5), ('m_5980', 3.5), ('m_5986', 4.0), ('m_5984', 3.5), ('m_103543', 3.5), ('m_87960', 3.0), ('m_965', 4.045454545454546), ('m_43871', 2.75), ('m_27370', 4.5)]


# Question 5 (2)

In [7]:
import csv

datafile = open('../dataset/ml-latest-small/movies.csv', 'r')
myreader = csv.reader(datafile)

movieGenres = {}
for row in myreader:
    movieGenres[row[0]] = row[2].split('|')

In [8]:
print(movieGenres.items()[:10])

[('110553', ['Action', 'Sci-Fi', 'IMAX']), ('60141', ['Children', 'Comedy']), ('172793', ['Adventure', 'Animation', 'Children', 'Fantasy']), ('101765', ['Adventure', 'Comedy', 'Romance']), ('188751', ['Comedy', 'Romance']), ('114074', ['Drama']), ('55292', ['Comedy']), ('5988', ['Drama']), ('5989', ['Crime', 'Drama']), ('55294', ['Comedy', 'Crime', 'Drama'])]


## This a sample code for testing

In [9]:
sampleRating = sc.parallelize([[1, 4.0], [1, 2], [2, 1.5]])
sampleGenres = dict()
sampleGenres[1] = ['Action', 'Drama']
sampleGenres[2] = ['sci-fi', 'Action']

genreRatingsRDD = sampleRating.flatMap(lambda x: ((genre, (float(x[1]), 1)) for genre in sampleGenres[x[0]]))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: ("" + x[0], x[1][0] / x[1][1]))
                #.sortBy(lambda x: -x[1])
print(genreRatingsRDD.collect())

[('Action', 2.5), ('Drama', 3.0), ('sci-fi', 1.5)]


## average genre rating

In [10]:
genreRatingsRDD = ratingsRDD.flatMap(lambda x: ((genre, (float(x[2]), 1)) for genre in movieGenres[x[1]]))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: (x[0], x[1][0] / x[1][1]))
                #.sortBy(lambda x: -x[1])
print(genreRatingsRDD.collect())

[('Mystery', 3.632460255407871), ('Romance', 3.5065107040388437), ('Sci-Fi', 3.455721162210752), ('Horror', 3.258195034974626), ('Film-Noir', 3.920114942528736), ('Crime', 3.658293867274144), ('Drama', 3.6561844113718758), ('Fantasy', 3.4910005070136894), ('Western', 3.583937823834197), ('Animation', 3.6299370349170004), ('War', 3.8082938876312), ('IMAX', 3.618335343787696), ('Children', 3.412956125108601), ('Action', 3.447984331646809), ('(no genres listed)', 3.4893617021276597), ('Comedy', 3.3847207640898267), ('Documentary', 3.797785069729286), ('Musical', 3.5636781053649105), ('Thriller', 3.4937055799183425), ('Adventure', 3.5086089151939075)]


In [11]:
import itertools

In [12]:
linesRDD=sc.parallelize(['u1 m1 3.4','u1 m2 4.0','u1 m3 5.0','u2 m1 3.0','u2 m2 2.5','u2 m3 4.0','u3 m1 5.0','u3 m2 4.0',])

In [13]:
A = linesRDD.map(lambda line:line.split())
print(A.collect())

[['u1', 'm1', '3.4'], ['u1', 'm2', '4.0'], ['u1', 'm3', '5.0'], ['u2', 'm1', '3.0'], ['u2', 'm2', '2.5'], ['u2', 'm3', '4.0'], ['u3', 'm1', '5.0'], ['u3', 'm2', '4.0']]


In [14]:
B = A.map(lambda x: ((x[0], x[1])))
print(B.collect())

[('u1', 'm1'), ('u1', 'm2'), ('u1', 'm3'), ('u2', 'm1'), ('u2', 'm2'), ('u2', 'm3'), ('u3', 'm1'), ('u3', 'm2')]


In [15]:
C = B.groupByKey().map(lambda x: (x[0], list(x[1])))
print(C.collect())

[('u1', ['m1', 'm2', 'm3']), ('u3', ['m1', 'm2']), ('u2', ['m1', 'm2', 'm3'])]


In [16]:
D = C.flatMap(lambda x: ((tup, 1) for tup in list(itertools.combinations(x[1], 2))))
print(D.collect())

[(('m1', 'm2'), 1), (('m1', 'm3'), 1), (('m2', 'm3'), 1), (('m1', 'm2'), 1), (('m1', 'm2'), 1), (('m1', 'm3'), 1), (('m2', 'm3'), 1)]


In [17]:
E = D.groupByKey().map(lambda x: (x[0], len(x[1])))
print(E.collect())

[(('m1', 'm2'), 3), (('m2', 'm3'), 2), (('m1', 'm3'), 2)]
