In [8]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [9]:
# from pyspark import SparkContext, SparkConf
conf=SparkConf().setMaster("local").setAppName("Most Popular Movies")
sc=SparkContext(conf=conf)

In [10]:
sqlContext=SQLContext(sc)

In [11]:
sc

<pyspark.context.SparkContext at 0x7fbea65a3e50>

In [12]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7fbea65b5f50>

In [13]:
# reading the csv file and saving it to dataframe using sqlContext
ratings_df = sqlContext.read.csv("hdfs://nameservice1/user/edureka_294428/ratings.csv", header=True)
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [14]:
# BY default everything is stored as string, we need to save them into
# appropriate schema types as later we might have to do the operations 
# accordingly
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [15]:
from pyspark.sql.types import *
columns = [StructField("userId", IntegerType(), True),
        StructField("movieId", IntegerType(), True),
        StructField("rating", DoubleType(), True),
        StructField("timestamp", LongType(), True) ]

In [16]:
# Now reading the file again but with schema this time
ratings_df = sqlContext.read.csv("hdfs://nameservice1/user/edureka_294428/ratings.csv", header=True, 
                                 #inferSchema=True)
                                 schema=StructType(columns))
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [17]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [18]:
# we might have to refer to this ratings_df lots of time in our 
# analysis, why not persist it
ratings_df.persist()

DataFrame[userId: int, movieId: int, rating: double, timestamp: bigint]

In [19]:
# It makes sense to group the data by movie id and then get the number 
# of movies present there

from pyspark.sql.functions import *
movie_counts = ratings_df.groupBy("movieid").count()
# sort descending
movie_counts = movie_counts.sort(desc("count"))
movie_counts.show(10)

+-------+-----+
|movieid|count|
+-------+-----+
|    356|  341|
|    296|  324|
|    318|  311|
|    593|  304|
|    260|  291|
|    480|  274|
|   2571|  259|
|      1|  247|
|    527|  244|
|    589|  237|
+-------+-----+
only showing top 10 rows



In [20]:
# fetching average rating by movie_id
avg_ratings = ratings_df.groupBy("movieid").agg( {"rating":"avg"} )

avg_ratings = avg_ratings.withColumnRenamed( "avg(rating)", "avg_rating" )
avg_ratings = avg_ratings.sort( desc( "avg_rating" ) )
avg_ratings.show( 10 )

+-------+----------+
|movieid|avg_rating|
+-------+----------+
|   5427|       5.0|
|  65037|       5.0|
|    961|       5.0|
|  32525|       5.0|
|     53|       5.0|
|   6598|       5.0|
|  91690|       5.0|
|  32460|       5.0|
|  61250|       5.0|
|   5071|       5.0|
+-------+----------+
only showing top 10 rows



In [21]:
import pandas as pd
import numpy as np
from IPython.display import display, HTML

CSS = """
.output {
    flex-direction: column;
}
"""

HTML('<style>{}</style>'.format(CSS))

In [22]:
display(movie_counts.show(3))
display(avg_ratings.show(3))

+-------+-----+
|movieid|count|
+-------+-----+
|    356|  341|
|    296|  324|
|    318|  311|
+-------+-----+
only showing top 3 rows



None

+-------+----------+
|movieid|avg_rating|
+-------+----------+
|   5071|       5.0|
|  91690|       5.0|
|  32460|       5.0|
+-------+----------+
only showing top 3 rows



None

In [23]:
# joining these two dataframes as they can provide both count and avg_ratings for movies
avg_ratings_count = avg_ratings.join( movie_counts,
                                   avg_ratings.movieid == movie_counts.movieid ,
                                   'inner' ).drop(movie_counts.movieid)
avg_ratings_count.show(5)

+----------+-------+-----+
|avg_rating|movieId|count|
+----------+-------+-----+
|       5.0|  32460|    1|
|       5.0|  91690|    1|
|       5.0|   5071|    1|
|       5.0|  32525|    2|
|       5.0|  61250|    1|
+----------+-------+-----+
only showing top 5 rows



In [24]:
# To avoid any potential bias in the system , considering only records having count greater than 
# 50

avg_ratings_count = avg_ratings_count.filter( avg_ratings_count["count"] > 50 )
# sort by rating first and then by count
avg_ratings_count = avg_ratings_count.sort( desc( "avg_rating" ) , desc( "count") )
avg_ratings_count.show( 10 )

+------------------+-------+-----+
|        avg_rating|movieId|count|
+------------------+-------+-----+
|            4.4875|    858|  200|
| 4.487138263665595|    318|  311|
| 4.387096774193548|    913|   62|
| 4.385185185185185|   1221|  135|
| 4.370646766169155|     50|  201|
|4.3355263157894735|   1252|   76|
| 4.315217391304348|    904|   92|
| 4.304054054054054|   1203|   74|
|  4.30327868852459|    527|  244|
| 4.297101449275362|   6016|   69|
+------------------+-------+-----+
only showing top 10 rows



In [25]:
# Lets rounds the avg_rating to 2 decimal places, it doesn't looks good
avg_ratings_count = avg_ratings_count.withColumn( "avg_rating",round( avg_ratings_count["avg_rating"]
                                    , 2 ) )
avg_ratings_count = avg_ratings_count.sort( desc( "avg_rating" ) , desc( "count") )
avg_ratings_count.show( 10 )

+----------+-------+-----+
|avg_rating|movieId|count|
+----------+-------+-----+
|      4.49|    318|  311|
|      4.49|    858|  200|
|      4.39|   1221|  135|
|      4.39|    913|   62|
|      4.37|     50|  201|
|      4.34|   1252|   76|
|      4.32|    904|   92|
|       4.3|    527|  244|
|       4.3|   1203|   74|
|       4.3|   6016|   69|
+----------+-------+-----+
only showing top 10 rows



In [26]:
movies_df = sqlContext.read.csv('hdfs://nameservice1/user/edureka_294428/movies.csv', header=True, inferSchema=True)
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [27]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [28]:
top_rated_movies = avg_ratings_count.limit(10).join( movies_df,avg_ratings_count.movieId == movies_df.movieId,
                "inner")
top_rated_movies.show()

+----------+-------+-----+-------+--------------------+--------------------+
|avg_rating|movieId|count|movieId|               title|              genres|
+----------+-------+-----+-------+--------------------+--------------------+
|      4.49|    318|  311|    318|Shawshank Redempt...|         Crime|Drama|
|      4.49|    858|  200|    858|Godfather, The (1...|         Crime|Drama|
|      4.39|   1221|  135|   1221|Godfather: Part I...|         Crime|Drama|
|      4.39|    913|   62|    913|Maltese Falcon, T...|   Film-Noir|Mystery|
|      4.37|     50|  201|     50|Usual Suspects, T...|Crime|Mystery|Thr...|
|      4.34|   1252|   76|   1252|    Chinatown (1974)|Crime|Film-Noir|M...|
|      4.32|    904|   92|    904|  Rear Window (1954)|    Mystery|Thriller|
|       4.3|    527|  244|    527|Schindler's List ...|           Drama|War|
|       4.3|   1203|   74|   1203| 12 Angry Men (1957)|               Drama|
|       4.3|   6016|   69|   6016|City of God (Cida...|Action|Adventure|...|

In [29]:
# OK , we need only movie id , along with the title

top_rated_movies = top_rated_movies.select("title")
top_rated_movies.show()

+--------------------+
|               title|
+--------------------+
|Shawshank Redempt...|
|Godfather, The (1...|
|Godfather: Part I...|
|Maltese Falcon, T...|
|Usual Suspects, T...|
|    Chinatown (1974)|
|  Rear Window (1954)|
|Schindler's List ...|
| 12 Angry Men (1957)|
|City of God (Cida...|
+--------------------+

