## Movies!

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

<b>Create spark session</b>

In [19]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

<b>Load and read datafile</b>

In [20]:
def loadMovieNames():
    movieNames = {}
    with codecs.open("C:u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

<b>Getting a broadcasted object from the function</b>

In [21]:
nameDict = spark.sparkContext.broadcast(loadMovieNames())

<b>Creating schema</b>

In [22]:
schema = StructType([StructField("userID", IntegerType(), True), 
                     StructField("movieID", IntegerType(), True), 
                     StructField("rating", IntegerType(), True), 
                     StructField("timestamp", LongType(), True)])

<b>Read and load datafile into schema</b>

In [23]:
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("file:///u.data")

In [24]:
movieCounts = moviesDF.groupBy("movieID").count()

<b>Convert python function into UDF for SparkSQL</b>

In [25]:
def lookupName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

<b>Add movie title column</b>

In [26]:
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

<b>Sort the results</b>

In [27]:
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

<b>Top 10</b>

In [28]:
sortedMoviesWithNames.show(10, False)

+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



In [29]:
spark.stop()