In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
import random


In [2]:
def loadMovieNames():
    movieNames = {}
    # source file should be on hadoop cluster in prod. use local file for demo
    # hadoop format "hdfs:///user/jlee/u.item"
    with open("../u.item",encoding='ISO-8859-1') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

In [3]:
# Create a SparkSession 
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
# Load up movie data
movieNames = loadMovieNames()

In [4]:
random.choices(movieNames,k=5)

["Joe's Apartment (1996)",
 'Something to Talk About (1995)',
 'Convent, The (Convento, O) (1995)',
 'Eddie (1996)',
 'Selena (1997)']

In [5]:
# load raw data into RDD
lines = spark.sparkContext.textFile("../u.data")

In [6]:
lines.take(10)

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [7]:
# Convert it to a RDD of Row objects in  (movieID, rating) format
movies = lines.map(parseInput)
# Convert to a DataFrame
movieDf = spark.createDataFrame(movies)

In [8]:
movieDf.take(5)

[Row(movieID=242, rating=3.0),
 Row(movieID=302, rating=3.0),
 Row(movieID=377, rating=1.0),
 Row(movieID=51, rating=2.0),
 Row(movieID=346, rating=1.0)]

In [9]:
# Compute average by movieID
averageRatings = movieDf.groupBy("movieID").avg("rating")
# Compute count by each movieID
counts = movieDf.groupBy("movieID").count()


In [10]:
averageRatings.take(5)

[Row(movieID=474, avg(rating)=4.252577319587629),
 Row(movieID=29, avg(rating)=2.6666666666666665),
 Row(movieID=26, avg(rating)=3.452054794520548),
 Row(movieID=964, avg(rating)=3.3333333333333335),
 Row(movieID=1677, avg(rating)=3.0)]

In [28]:
rounded_averageRatings=averageRatings.rdd.map(lambda x : Row(movieID=x[0], avg_rating=round(x[1],2)))

In [30]:
rounded_averageRatings=rounded_averageRatings.toDF()

In [31]:
rounded_averageRatings.take(5)

[Row(movieID=474, avg_rating=4.25),
 Row(movieID=29, avg_rating=2.67),
 Row(movieID=26, avg_rating=3.45),
 Row(movieID=964, avg_rating=3.33),
 Row(movieID=1677, avg_rating=3.0)]

In [32]:
# Join the two together 
averagesAndCounts = counts.join(rounded_averageRatings, "movieID")
# Pull the top 10 
topTen_lowest = averagesAndCounts.orderBy("avg_rating").take(10)

In [34]:
topTen_lowest

[Row(movieID=1343, count=1, avg_rating=1.0),
 Row(movieID=1571, count=1, avg_rating=1.0),
 Row(movieID=439, count=5, avg_rating=1.0),
 Row(movieID=830, count=1, avg_rating=1.0),
 Row(movieID=1374, count=2, avg_rating=1.0),
 Row(movieID=1671, count=1, avg_rating=1.0),
 Row(movieID=1329, count=1, avg_rating=1.0),
 Row(movieID=858, count=3, avg_rating=1.0),
 Row(movieID=1567, count=1, avg_rating=1.0),
 Row(movieID=1559, count=1, avg_rating=1.0)]

In [33]:
# Print them out
for movie in topTen_lowest:
    print (movieNames[movie[0]], movie[1], movie[2])

Lotto Land (1995) 1 1.0
Touki Bouki (Journey of the Hyena) (1973) 1 1.0
Amityville: A New Generation (1993) 5 1.0
Power 98 (1995) 1 1.0
Falling in Love Again (1980) 2 1.0
Further Gesture, A (1996) 1 1.0
Low Life, The (1994) 1 1.0
Amityville: Dollhouse (1996) 3 1.0
Careful (1992) 1 1.0
Hostile Intentions (1994) 1 1.0


In [36]:
topTen_best = averagesAndCounts.orderBy(averagesAndCounts.avg_rating.desc()).take(10)
topTen_best

[Row(movieID=1293, count=3, avg_rating=5.0),
 Row(movieID=1500, count=2, avg_rating=5.0),
 Row(movieID=1653, count=1, avg_rating=5.0),
 Row(movieID=1122, count=1, avg_rating=5.0),
 Row(movieID=1201, count=1, avg_rating=5.0),
 Row(movieID=1536, count=1, avg_rating=5.0),
 Row(movieID=1189, count=3, avg_rating=5.0),
 Row(movieID=814, count=1, avg_rating=5.0),
 Row(movieID=1599, count=1, avg_rating=5.0),
 Row(movieID=1467, count=2, avg_rating=5.0)]

In [37]:
# Print them out
for movie in topTen_best:
    print (movieNames[movie[0]], movie[1], movie[2])

Star Kid (1997) 3 5.0
Santa with Muscles (1996) 2 5.0
Entertaining Angels: The Dorothy Day Story (1996) 1 5.0
They Made Me a Criminal (1939) 1 5.0
Marlene Dietrich: Shadow and Light (1996)  1 5.0
Aiqing wansui (1994) 1 5.0
Prefontaine (1997) 3 5.0
Great Day in Harlem, A (1994) 1 5.0
Someone Else's America (1995) 1 5.0
Saint of Fort Washington, The (1993) 2 5.0


In [None]:
#Above results are not accurate as most of the movies were only rated a few times

In [52]:
averagesAndCounts_RDD=averagesAndCounts.rdd.filter(lambda x: x[1] > 50)

In [53]:
averagesAndCounts_RDD.take(5)

[Row(movieID=26, count=73, avg_rating=3.45),
 Row(movieID=29, count=114, avg_rating=2.67),
 Row(movieID=474, count=194, avg_rating=4.25),
 Row(movieID=65, count=115, avg_rating=3.54),
 Row(movieID=191, count=276, avg_rating=4.16)]

In [54]:
results=averagesAndCounts_RDD.sortBy(lambda x: x[2],ascending=False)

In [60]:
for result in results.take(10):
        print(movieNames[result[0]], result[1], result[2])

Close Shave, A (1995) 112 4.49
Schindler's List (1993) 298 4.47
Wrong Trousers, The (1993) 118 4.47
Casablanca (1942) 243 4.46
Wallace & Gromit: The Best of Aardman Animation (1996) 67 4.45
Shawshank Redemption, The (1994) 283 4.45
Usual Suspects, The (1995) 267 4.39
Rear Window (1954) 209 4.39
Star Wars (1977) 583 4.36
12 Angry Men (1957) 125 4.34


In [None]:
# Stop the session
spark.stop()