In [26]:
# -*- coding: utf-8 -*-
#"""
#Created on Mon Sep  7 15:28:00 2020

#@author: Frank
#"""

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

In [28]:
#Function for create broadcast
def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("C:/SparkCourse/ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [29]:
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

In [30]:
#Call Funct Broadcast
nameDict = spark.sparkContext.broadcast(loadMovieNames())

In [31]:
# Create schema when reading u.data
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

In [32]:
# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("file:///c:/SparkCourse/ml-100k/u.data")
movieCounts = moviesDF.groupBy("movieID").count()
movieCounts.show(20, False)

+-------+-----+
|movieID|count|
+-------+-----+
|496    |231  |
|471    |221  |
|463    |71   |
|148    |128  |
|1342   |2    |
|833    |49   |
|1088   |13   |
|1591   |6    |
|1238   |8    |
|1580   |1    |
|1645   |1    |
|392    |68   |
|623    |39   |
|540    |43   |
|858    |3    |
|737    |59   |
|243    |132  |
|1025   |44   |
|1084   |21   |
|1127   |11   |
+-------+-----+
only showing top 20 rows



In [33]:
# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

In [34]:
# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

In [35]:
# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

In [36]:
# Grab the top 10
sortedMoviesWithNames.show(20, False)

+-------+-----+--------------------------------+
|movieID|count|movieTitle                      |
+-------+-----+--------------------------------+
|50     |583  |Star Wars (1977)                |
|258    |509  |Contact (1997)                  |
|100    |508  |Fargo (1996)                    |
|181    |507  |Return of the Jedi (1983)       |
|294    |485  |Liar Liar (1997)                |
|286    |481  |English Patient, The (1996)     |
|288    |478  |Scream (1996)                   |
|1      |452  |Toy Story (1995)                |
|300    |431  |Air Force One (1997)            |
|121    |429  |Independence Day (ID4) (1996)   |
|174    |420  |Raiders of the Lost Ark (1981)  |
|127    |413  |Godfather, The (1972)           |
|56     |394  |Pulp Fiction (1994)             |
|7      |392  |Twelve Monkeys (1995)           |
|98     |390  |Silence of the Lambs, The (1991)|
|237    |384  |Jerry Maguire (1996)            |
|117    |378  |Rock, The (1996)                |
|172    |367  |Empir

In [37]:
# Stop the session
spark.stop()