In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [6]:
def loadMovieNames():
    movieNames = {}
    with open('u.item',encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames
# def loadMovieNames():
#     movieNames = {}
#     with open('u.item') as f:
#         for line in f:
#             fields = line.split('|')
#             movieNames[int(fields[0])] = fields[1]
#     return movieNames

In [7]:
def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

In [8]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [9]:
movieNames = loadMovieNames()

In [10]:
lines = spark.sparkContext.textFile("u.data")


In [11]:
movies = lines.map(parseInput)
movies

PythonRDD[2] at RDD at PythonRDD.scala:53

In [12]:
movieDataset = spark.createDataFrame(movies)

In [13]:
averageRatings = movieDataset.groupBy("movieID").avg("rating")
averageRatings.show()

+-------+------------------+
|movieID|       avg(rating)|
+-------+------------------+
|    474| 4.252577319587629|
|     29|2.6666666666666665|
|     26| 3.452054794520548|
|    964|3.3333333333333335|
|   1677|               3.0|
|     65|3.5391304347826087|
|    191| 4.163043478260869|
|   1224|2.6666666666666665|
|    558|3.6714285714285713|
|   1010|              3.25|
|    418|3.5813953488372094|
|   1277|3.4210526315789473|
|   1258|2.5217391304347827|
|    541| 2.877551020408163|
|   1360|               1.5|
|    222|  3.66027397260274|
|    938|              2.88|
|    293| 3.802721088435374|
|    270|3.5955882352941178|
|   1127| 2.909090909090909|
+-------+------------------+
only showing top 20 rows



In [14]:
counts = movieDataset.groupBy("movieID").count()
counts.show()

+-------+-----+
|movieID|count|
+-------+-----+
|    474|  194|
|     29|  114|
|     26|   73|
|    964|    9|
|   1677|    1|
|     65|  115|
|    191|  276|
|   1224|   12|
|    558|   70|
|   1010|   44|
|    418|  129|
|   1277|   19|
|   1258|   23|
|    541|   49|
|   1360|    2|
|    222|  365|
|    938|   25|
|    293|  147|
|    270|  136|
|   1127|   11|
+-------+-----+
only showing top 20 rows



In [15]:
averagesAndCounts = counts.join(averageRatings, "movieID")
averagesAndCounts.show()

+-------+-----+------------------+
|movieID|count|       avg(rating)|
+-------+-----+------------------+
|     26|   73| 3.452054794520548|
|     29|  114|2.6666666666666665|
|    474|  194| 4.252577319587629|
|    964|    9|3.3333333333333335|
|   1677|    1|               3.0|
|     65|  115|3.5391304347826087|
|    191|  276| 4.163043478260869|
|    418|  129|3.5813953488372094|
|    541|   49| 2.877551020408163|
|    558|   70|3.6714285714285713|
|   1010|   44|              3.25|
|   1224|   12|2.6666666666666665|
|   1258|   23|2.5217391304347827|
|   1277|   19|3.4210526315789473|
|   1360|    2|               1.5|
|    222|  365|  3.66027397260274|
|    270|  136|3.5955882352941178|
|    293|  147| 3.802721088435374|
|    730|   24|               3.5|
|    938|   25|              2.88|
+-------+-----+------------------+
only showing top 20 rows



In [16]:
topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

In [17]:
for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])

The Courtyard (1995) 1 1.0
Amityville: A New Generation (1993) 5 1.0
Low Life, The (1994) 1 1.0
Power 98 (1995) 1 1.0
Touki Bouki (Journey of the Hyena) (1973) 1 1.0
Careful (1992) 1 1.0
Falling in Love Again (1980) 2 1.0
Further Gesture, A (1996) 1 1.0
Amityville: Dollhouse (1996) 3 1.0
Lotto Land (1995) 1 1.0
