# Task: Top 10 Most Rated Movie

This is a two step process. First we use u.data to determine how often particular movie id is rated. Second we use output of the first step and join it to u.item dataset in order to get movie titles. Our final output will contain following three fields:
1. Movie ID
2. Movie Title
3. Number of times it was rated

Below is some information about data
1. u.data
    * This file contains actual movie rating data. 
    * Field separator: tab
    * Fields: user id, movie id, rating (from 1 to 5) and timestamp
2. u.item
    * this file contains information about movies.
    * Field separator: | (pipe)
    * Fields: movie id | movie title | release date | video release date | IMDb URL | unknown | Action | Adventure | Animation | Children's | Comedy | Crime | Documentary | Drama | Fantasy | Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi | Thriller | War | Western 


In [108]:
def dataStrToTuple(inString, delimiter="\t"):
    """
    Splits inString using delimiter and returns a tuple of integers. 
    It assumes that all the input tokens are integer
    """
    tokens = [int(y) for y in inString.split(delimiter)]
    return tuple(tokens)

# load dataset
data = sc.textFile("data/meetup/movielens/u.data", 5)

# convert input record to tuple
rdd1 = data.map(dataStrToTuple)  

In [110]:
# extract movie id as the first element so that we can use it as a key. 
# Value can be left null/empty as in the next stage we only need to count number of records per key
rdd2 = rdd1.map(lambda x: (x[1],1)) 
# print rdd2.take(5)
# type(rdd2)

In [111]:
# group by movie id and count number of elements
cnts = rdd2.countByKey()
# print "".join(["{0},{1}\n".format(k, rdd3[k]) for k in rdd3.keys()[0:5]])
# type(cnts)

In [112]:
# countByKey returns a collection object 
for row in sorted(cnts.items(), key=lambda x: x[1], reverse=True)[0:10]:
    print row

(50, 583)
(258, 509)
(100, 508)
(181, 507)
(294, 485)
(286, 481)
(288, 478)
(1, 452)
(300, 431)
(121, 429)


##Challenge: Distributed Sorting
countByKey returned a collection object that we sorted in memory. How would you modify the above program so that we use spark's distributed computing engine to extract top 10 most rated movie 

In [114]:
rdd4 = rdd2.reduceByKey(lambda x, y: x + y)
print rdd4.takeOrdered(10, key=lambda x: -1 * x[1])

[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485), (286, 481), (288, 478), (1, 452), (300, 431), (121, 429)]


##Challenge: Multikey Sorting
Above takeOrdered uses only value (i.e. number of times a movie was rated) to extract top 10 movies. However if there are lot of movies that were rated by the same number of people then its possible that different analyst will return different list for top 10 movies. Can you modify the above code to sort movies both by value and movie id. 

In [115]:
print rdd4.takeOrdered(10, key=lambda x: (-1 * x[1], x[0]))

[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485), (286, 481), (288, 478), (1, 452), (300, 431), (121, 429)]


##Add Movie Title 
To make sense of the top 10 most rated movie now lets add movie title. Movie information is stored in u.item file. 

In [116]:
def extractIdTitleFromItem(x):
    tokens = x.split("|")
    return (int(tokens[0]), tokens[1])

items = sc.textFile("data/meetup/movielens/u.item", 5) \
            .map(extractIdTitleFromItem)             
        
print items.take(5)


[(1, u'Toy Story (1995)'), (2, u'GoldenEye (1995)'), (3, u'Four Rooms (1995)'), (4, u'Get Shorty (1995)'), (5, u'Copycat (1995)')]


In [117]:
# Join data and items 
ranked = rdd2.reduceByKey(lambda x, y: x + y)\
        .join(items)\
        .takeOrdered(10, key=lambda x: (-1 * x[1][0], x[1][1]))
for row in ranked:
    print "{0}. {1} -> {2}".format(row[0], row[1][1], row[1][0])

50. Star Wars (1977) -> 583
258. Contact (1997) -> 509
100. Fargo (1996) -> 508
181. Return of the Jedi (1983) -> 507
294. Liar Liar (1997) -> 485
286. English Patient, The (1996) -> 481
288. Scream (1996) -> 478
1. Toy Story (1995) -> 452
300. Air Force One (1997) -> 431
121. Independence Day (ID4) (1996) -> 429


# Cleanup 

Currently the code is very frazile and we have to make sure that we are using correct indexes at each position. Let's use SQLContext to define tables and then ue


In [104]:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

##Construct Rating Tables by Attaching Schema to the loaded dataset

In [105]:
# Construct Rating Schema
schema = StructType([
                StructField("user", IntegerType(), True),
                StructField("movie", IntegerType(), True),
                StructField("rating", IntegerType(), True),
                StructField("timestamp", IntegerType(), True)
            ])

# Apply the schema to the RDD.
schemaRating = sqlContext.createDataFrame(rdd1, schema)

# Register the DataFrame as a table.
schemaRating.registerTempTable("rating")
type(schemaRating)

pyspark.sql.dataframe.DataFrame

##Construct Item Schema and Register as Temporary Table

In [106]:
# Construct Movie Schema
schema = StructType([
            StructField('movie', IntegerType(), True)
            , StructField('title', StringType(), True)
            ]
        )

# Apply the schema to the RDD.
schemaItem = sqlContext.createDataFrame(items, schema)

# Register the DataFrame as a table.
schemaItem.registerTempTable("item")


##Use SQLContext to operate on temporary tables 

In [53]:
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("""
    SELECT A.movie, B.title, A.cnt
    FROM
    (
        SELECT movie, count(*) as cnt
        FROM rating 
        GROUP BY movie
        ORDER BY cnt DESC
        LIMIT 10
    ) A
    JOIN item B
    ON (A.movie = B.movie) 
""")
for row in results.collect():
    print row

Row(movie=1, title=u'Toy Story (1995)', cnt=452)
Row(movie=50, title=u'Star Wars (1977)', cnt=583)
Row(movie=100, title=u'Fargo (1996)', cnt=508)
Row(movie=121, title=u'Independence Day (ID4) (1996)', cnt=429)
Row(movie=181, title=u'Return of the Jedi (1983)', cnt=507)
Row(movie=258, title=u'Contact (1997)', cnt=509)
Row(movie=286, title=u'English Patient, The (1996)', cnt=481)
Row(movie=288, title=u'Scream (1996)', cnt=478)
Row(movie=294, title=u'Liar Liar (1997)', cnt=485)
Row(movie=300, title=u'Air Force One (1997)', cnt=431)


## Alternative To SQL: Using DataFrame Operations

In [None]:
# DataFrame Operations: https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
from pyspark.sql.functions import desc
rdd2 = schemaRating.groupBy('movie').count().orderBy(desc("count")).limit(10)