In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

In [None]:
!wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip ml-100k.zip

In [None]:
def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("./ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [None]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [None]:
nameDict = spark.sparkContext.broadcast(loadMovieNames())

Create schema when reading u.data

In [None]:
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

Load up movie data as dataframe

In [None]:
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("./ml-100k/u.data")

In [None]:
movieCounts = moviesDF.groupBy("movieID").count()

Create a user-defined function to look up movie names from our broadcasted dictionary

In [None]:
def lookupName(movieID):
    return nameDict.value[movieID]

In [None]:
lookupNameUDF = func.udf(lookupName)

Add a movieTitle column using our new udf

In [None]:
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

Sort the results

In [None]:
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

Grab the top 10

In [None]:
sortedMoviesWithNames.show(10, False)

Stop the session

In [None]:
spark.stop()