In [None]:
# Get or Create Spark Context
sc = SparkContext.getOrCreate()

In [None]:
# Create RDDs from each of the CSV files
def split(s):
    if('"' not in s):
        return s.split(',')
    return [x.strip(',') for x in s.split('"')]

path = "ml-latest-small/"
movies = sc.textFile(path + "movies.csv").map(split)    #  [movieId,title,genres]
ratings = sc.textFile(path + "ratings.csv").map(split)  #  [userId,movieId,rating,timestamp]

In [None]:
# Average number of users a movie is rated by
def add(x,y): return x+y
avg = ratings.map(lambda l: (l[1], 1)).reduceByKey(add).map(lambda x: x[1]).mean()
print("The average number of users rating a movie is:",avg)

In [None]:
# Average rating of movies in each genre

def addPair(p,q): return (p[0]+q[0], p[1]+q[1])

# compute total rating and number of ratings for each movie
totalByMovie = ratings.map(lambda l: (l[1], (float(l[2]),1.0))).reduceByKey(addPair) # (movieId, (totalRating, numRatings))
movieGenre = movies.map(lambda l: (l[0], l[2])) # (movieId, genres)
join1 = totalByMovie.join(movieGenre) # (movieId, ((totalRating, numRatings), genres))
totalByGenre = join1.flatMap(lambda t: [(g, t[1][0]) for g in t[1][1].split('|')]).reduceByKey(addPair)
avgList = totalByGenre.map(lambda t: (t[0], t[1][0]/t[1][1])).collect()
print("The average ratings of movies in each genre is as follows:\n")
for p in avgList: print(p[0],':', p[1])

In [None]:
# Top 3 movies in each genre
genres = [x[0] for x in avgList]
avgByMovie = totalByMovie.map(lambda t: (t[0], t[1][0]/t[1][1]))
movieTitleGenre = movies.map(lambda t: (t[0], (t[1], t[2])))
join2 = avgByMovie.join(movieTitleGenre) # (movieId, (avgRating, (title, genres))
genreRatingTitle = join2.flatMap(lambda t: [(g, (t[1][0], t[1][1][0])) for g in t[1][1][1].split('|')])

for g in genres:
    top = genreRatingTitle.filter(lambda t: t[0] == g).map(lambda t: t[1]).top(3)
    print("Top 3 movies in genre '"+g+"' are:")
    for p in top: print("\t",p[1], "(avg. rating:", p[0],")")
    print("\n")

In [None]:
# Top 10 movie watchers ranked by the number of movies they have rated
top = ratings.map(lambda l: (l[0], 1)).reduceByKey(add).map(lambda p: (p[1], p[0])).top(10)
for x in top: print("User",x[1], "rated",x[0],"movies.")

In [None]:
# top ten pairs of users ranked by the number of movies both have watched (or rated)
def getPairs(t):
    l = t[1]
    m = []
    for a in l:
        for b in l:
            if(a<b): m.append(((a,b),1))
    return m

top = ratings.map(lambda l: (l[1],[l[0]])).reduceByKey(add).flatMap(getPairs).reduceByKey(add).map(lambda p: (p[1],p[0])).top(10)
print("The top ten pairs of users ranked by no. of common movies rated  are: \n")
for p in top: print(p[1],":",p[0]," common movies.")