In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType
spark = SparkSession.builder.appName('findPmovie').getOrCreate()

In [2]:
schema = StructType([StructField("userID", IntegerType(), True),
                     StructField("movieID", IntegerType(), True),
                     StructField("rating", IntegerType(), True),
                     StructField("timestamp", LongType(), True)
                     ])

In [3]:
movieDF = spark.read.option('sep', '\t').schema(schema).csv("../ml-100k/u.data")

In [4]:
movieDF.show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
|   298|    474|     4|884182806|
|   115|    265|     2|881171488|
|   253|    465|     5|891628467|
|   305|    451|     3|886324817|
|     6|     86|     3|883603013|
|    62|    257|     2|879372434|
|   286|   1014|     5|879781125|
|   200|    222|     5|876042340|
|   210|     40|     3|891035994|
|   224|     29|     3|888104457|
|   303|    785|     3|879485318|
|   122|    387|     5|879270459|
|   194|    274|     2|879539794|
|   291|   1042|     4|874834944|
|   234|   1184|     2|892079237|
+------+-------+------+---------+
only showing top 20 rows



In [5]:
topMovieIDs = movieDF.groupBy("movieID").count().orderBy(func.desc("count"))

In [6]:
topMovieIDs.show(100)

+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    204|  350|
|    313|  350|
|    405|  344|
|     79|  336|
|    210|  331|
|    151|  326|
|    173|  324|
|     69|  321|
|    748|  316|
|    168|  316|
|    269|  315|
|    257|  303|
|    195|  301|
|    423|  300|
|      9|  299|
|    318|  298|
|    276|  298|
|    302|  297|
|     22|  297|
|    328|  295|
|     96|  295|
|    118|  293|
|     15|  293|
|     25|  293|
|    183|  291|
|    216|  290|
|    176|  284|
|     64|  283|
|    202|  280|
|    234|  280|
|     28|  276|
|    191|  276|
|     89|  275|
|    111|  272|
|    275|  268|
|     12|  267|
|    742|  267|
|    357|  264|
|     82|  261|
|    289|  259|
|    135

In [7]:
import codecs
def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("../ml-100k/u.item", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [8]:
nameDict = spark.sparkContext.broadcast(loadMovieNames())

In [10]:
movieCounts = movieDF.groupBy('movieID').count()

In [11]:
def lookupName(movieID):
    return nameDict.value[movieID]

In [12]:
lookupNameUDF = func.udf(lookupName)

In [13]:
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

In [14]:
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

In [15]:
sortedMoviesWithNames.show(10)

+-------+-----+--------------------+
|movieID|count|          movieTitle|
+-------+-----+--------------------+
|     50|  583|    Star Wars (1977)|
|    258|  509|      Contact (1997)|
|    100|  508|        Fargo (1996)|
|    181|  507|Return of the Jed...|
|    294|  485|    Liar Liar (1997)|
|    286|  481|English Patient, ...|
|    288|  478|       Scream (1996)|
|      1|  452|    Toy Story (1995)|
|    300|  431|Air Force One (1997)|
|    121|  429|Independence Day ...|
+-------+-----+--------------------+
only showing top 10 rows

