In [1]:
import pyspark

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()
sc

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Analyzing Movies Dataset") \
        .getOrCreate()

#### Load the dataset

Both datasets have been sourced from https://www.kaggle.com/rounakbanik/the-movies-dataset. <br />
The first file (ratings.csv) contains 26 million ratings from 270,000 users on all the 45,000 movies but here we are considering only 2 million ratings for this demo. <br />
The second file (movies_metadata.csv) includes information about budget, revenue, release dates, languages, production countries etc. for each movie.

In [5]:
movie_ratings = spark.read\
               .format("csv")\
               .option("sep", ",")\
               .option("header", "true")\
               .load("../datasets/ratings.csv")

In [6]:
movie_ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
movie_metadata = spark.read\
                         .format("csv")\
                         .option("sep", ",")\
                         .option("header", "true")\
                         .load("../datasets/movies_metadata.csv")

In [8]:
movie_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [9]:
movie_ratings.count(),movie_metadata.count()

(1999999, 45572)

#### Distinct number of Id

In [10]:
movie_metadata.select('id')\
                 .distinct()\
                 .count()

45470

#### Drop columns which are not necessary for our analysis

In [11]:
movie_metadata = movie_metadata.drop(
                        'belongs_to_collection', 
                        'genres',
                        'homepage',
                        'imdb_id',
                        'overview',
                        'poster_path',
                        'production_companies',
                        'production_countries',
                        'spoken_languages',
                        'tagline')
movie_metadata

DataFrame[adult: string, budget: string, id: string, original_language: string, original_title: string, popularity: string, release_date: string, revenue: string, runtime: string, status: string, title: string, video: string, vote_average: string, vote_count: string]

#### View the schema of the dataframes
Though many numeric columns are currently formatted as strings, we'll adjust their types when we need to later on

In [12]:
movie_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)



#### Drop the invalid data from the dataframes

In [13]:
movie_metadata = movie_metadata.dropna()
movie_ratings = movie_ratings.dropna()

#### The size of movie_metadata has reduced

In [14]:
movie_ratings.count(),movie_metadata.count()

(1999999, 43492)

#### Cast the numeric columns as float
These are currently strings

In [15]:
for col_name in ["budget", "revenue", "vote_average", "popularity"]:
    movie_metadata = movie_metadata.withColumn(col_name, 
                                               movie_metadata[col_name].cast("float"))
    
movie_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- budget: float (nullable = true)
 |-- id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: string (nullable = true)



#### We are interested in positive budget and revenue values

In [16]:
movie_metadata = movie_metadata[(movie_metadata.budget > 0) \
                                & (movie_metadata.revenue > 0)]

In [17]:
movie_metadata.count()

4981

In [18]:
movie_metadata.select("budget", "revenue").show()

+---------+------------+
|   budget|     revenue|
+---------+------------+
|    3.0E7|3.73554048E8|
|    6.5E7|2.62797248E8|
|    6.0E7|1.87436816E8|
|    3.5E7| 6.4350172E7|
|    5.8E7|3.52194048E8|
|    6.2E7|1.07879496E8|
|    4.4E7| 1.3681765E7|
|    9.8E7| 1.0017322E7|
|    5.2E7|1.16112376E8|
|   1.65E7|      1.35E8|
|4000000.0|   4300000.0|
|    3.0E7|2.12385536E8|
|    6.0E7| 3.5431112E7|
|    5.0E7| 3.0303072E7|
|3600000.0|      4.98E7|
|    1.2E7|      2.74E7|
|   2.95E7|    1.6884E8|
|    3.0E7|2.54134912E8|
|    1.1E7| 3.9363636E7|
|    1.8E7| 1.2219592E8|
+---------+------------+
only showing top 20 rows



In [19]:
movie_metadata = movie_metadata.withColumn("profit",
                                           (movie_metadata["revenue"] - movie_metadata["budget"]))

In [20]:
movie_metadata.select("profit").sort(movie_metadata["profit"].desc())\
                               .show()

+------------+
|      profit|
+------------+
|2.55096525E9|
|1.82322355E9|
|1.64503424E9|
|1.36352883E9|
|1.31624934E9|
|1.29955789E9|
|1.21699994E9|
|1.12540365E9|
|1.12421901E9|
| 1.1028864E9|
|1.08273101E9|
|1.02488896E9|
|   1.01544E9|
|  9.887648E8|
| 9.4006336E8|
| 9.2874694E8|
| 9.0856102E8|
| 9.0330445E8|
| 8.9476186E8|
| 8.7378419E8|
+------------+
only showing top 20 rows



In [21]:
movie_metadata.printSchema()

root
 |-- adult: string (nullable = true)
 |-- budget: float (nullable = true)
 |-- id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- profit: float (nullable = true)



In [22]:
from pyspark.sql.functions import udf

#### We are spliting the date column and from here we will take only year and save it in another column.

In [23]:
year_extract_udf = udf(lambda date: date.split('-')[0])

movie_metadata = movie_metadata.withColumn(
                    "release_year",
                    year_extract_udf(movie_metadata.release_date)
)

#### Drop the date column

In [24]:
movie_metadata =movie_metadata.drop("release_date")

In [25]:
movie_metadata.columns

['adult',
 'budget',
 'id',
 'original_language',
 'original_title',
 'popularity',
 'revenue',
 'runtime',
 'status',
 'title',
 'video',
 'vote_average',
 'vote_count',
 'profit',
 'release_year']

#### Filter the dataset to movies released since 1995 

In [26]:
movies_1995 = movie_metadata.filter(movie_metadata.release_year >= 1995)

In [27]:
movies_1995.count()

3674

#### Check the number of distinct languages in this filtered dataset

In [28]:
movies_1995.select(movies_1995.original_language)\
       .distinct()\
       .count()

38

#### Check average popularity and profits for movies in each language

In [29]:
movies_1995 = movies_1995.groupBy("original_language")\
                         .agg({"popularity": "avg",
                               "profit": "avg"
                              })

In [30]:
movies_1995 =movies_1995.withColumnRenamed("avg(popularity)", "popularity")\
                        .withColumnRenamed("avg(profit)", "profit")

#### How popular and profitable are movies in each language?
* English, German, Spanish films are popular and profitable
* Vietnamese films are neither profitable nor popular
* Romanian films are mildly popular but unprofitable

In [31]:
movies_1995.show()

+-----------------+-------------------+--------------------+
|original_language|         popularity|              profit|
+-----------------+-------------------+--------------------+
|               en| 11.186577659240745| 7.346751861813618E7|
|               vi| 0.0445609986782074|           -661000.0|
|               nb|  6.097683906555176|            659678.0|
|               ro|  3.927061289548874|           -909612.5|
|               ur|0.16696199774742126|            938650.0|
|               pl|  5.014954090118408|          5572534.75|
|               cn|  6.368948727846146|       1.644614425E7|
|               pt|  4.732584699988365|   4892234.833333333|
|               ko|  6.297175228595734|1.9510993916666668E7|
|               mr| 0.4464539885520935|           2900000.0|
|               sr| 0.6386486490567526|  -91267.33333333333|
|               tr| 3.1493526299794516|   5093333.333333333|
|               de|  7.280842576708112| 2.185576342857143E7|
|               is| 2.86

#### Define your own metric to calculate how "good" a movie is
We scale the popularity and profitabiltiy values

In [32]:
weight_popularity = 10
weight_profit=0.000001

In [33]:
movies = movies_1995.withColumn("score",
                                    (movies_1995.popularity * weight_popularity + \
                                     movies_1995.profit * weight_profit))

In [34]:
movies.show()

+-----------------+-------------------+--------------------+--------------------+
|original_language|         popularity|              profit|               score|
+-----------------+-------------------+--------------------+--------------------+
|               en| 11.186577659240745| 7.346751861813618E7|  185.33329521054364|
|               vi| 0.0445609986782074|           -661000.0|-0.21539001321792595|
|               nb|  6.097683906555176|            659678.0|   61.63651706555176|
|               ro|  3.927061289548874|           -909612.5|   38.36100039548874|
|               ur|0.16696199774742126|            938650.0|  2.6082699774742126|
|               pl|  5.014954090118408|          5572534.75|  55.722075651184085|
|               cn|  6.368948727846146|       1.644614425E7|   80.13563152846146|
|               pt|  4.732584699988365|   4892234.833333333|  52.218081833216985|
|               ko|  6.297175228595734|1.9510993916666668E7|     82.482746202624|
|               

#### We are interested in movies with a score greater than 50
English, Japanese and Chinese movies top our list

In [35]:
movies = movies.filter(movies.score > 50)\
                   .sort(movies.score.desc())
    
movies.show(15)

+-----------------+------------------+--------------------+------------------+
|original_language|        popularity|              profit|             score|
+-----------------+------------------+--------------------+------------------+
|               en|11.186577659240745| 7.346751861813618E7|185.33329521054364|
|               ja|  9.34331934928894|       4.904474252E7|142.47793601288942|
|               zh| 6.719967931509018|      7.2416313125E7|139.61599244009017|
|               es|   8.4552802835192|1.4337702971428571E7| 98.89050580662057|
|               th|  8.77933645248413|        1.08011215E7|  98.5944860248413|
|               de| 7.280842576708112| 2.185576342857143E7| 94.66418919565254|
|               da|  7.78316912651062|        1.64826483E7|  94.3143395651062|
|               it|  7.91468448638916|       1.385168335E7|  92.9985282138916|
|               id| 8.445611794789633|  192323.66666666666|   84.648441614563|
|               fr| 7.134752727933784|1.260257453731

In [36]:
from pyspark.sql.functions import broadcast

In [37]:
movie_ratings.count(),movie_metadata.count()

(1999999, 4981)

### Broadcast & Join

* Broadcast the smaller dataframe so it is available on all cluster machines
* The data should be small enough so it is held in memory
* All nodes in the cluster distribute the data as fast as they can so overall computation is faster

In [38]:
movies_joined = movie_ratings.select('movieId',
                                     'rating')\
                             .join(broadcast(movie_metadata),
                                   movie_ratings.movieId == movie_metadata.id, 'inner')

In [39]:
movies_joined.columns

['movieId',
 'rating',
 'adult',
 'budget',
 'id',
 'original_language',
 'original_title',
 'popularity',
 'revenue',
 'runtime',
 'status',
 'title',
 'video',
 'vote_average',
 'vote_count',
 'profit',
 'release_year']

In [40]:
movies_joined = movies_joined.drop(
                                     'adult',
                                     'id',
                                     'budget',
                                     'original_title',
                                     'popularity',
                                     'revenue',
                                     'runtime',
                                     'status',
                                     'video',
                                     'vote_average',
                                     'vote_count',
                                     'profit',
                                     'release_year')
movies_joined

DataFrame[movieId: string, rating: string, original_language: string, title: string]

In [41]:
movies_joined.show(truncate = False)

+-------+------+-----------------+------------------------------------------+
|movieId|rating|original_language|title                                     |
+-------+------+-----------------+------------------------------------------+
|858    |5.0   |en               |Sleepless in Seattle                      |
|1246   |5.0   |en               |Rocky Balboa                              |
|5      |3.0   |en               |Four Rooms                                |
|25     |3.0   |en               |Jarhead                                   |
|58     |3.0   |en               |Pirates of the Caribbean: Dead Man's Chest|
|64     |4.0   |es               |Talk to Her                               |
|79     |4.0   |zh               |Hero                                      |
|141    |3.0   |en               |Donnie Darko                              |
|339    |5.0   |en               |Night on Earth                            |
|377    |4.0   |en               |A Nightmare on Elm Street     

In [42]:
movies_joined.groupBy(["movieID", "title"])\
             .agg({"rating": "avg"}).show()

+-------+------------------+------------------+
|movieID|             title|       avg(rating)|
+-------+------------------+------------------+
|   1619|The Way of the Gun|3.4446952595936793|
|   3062|       42nd Street|3.9110576923076925|
|    433|      Mary Poppins|2.8247422680412373|
|     90| Beverly Hills Cop|               3.0|
|    580| Jaws: The Revenge|              3.32|
|    414|    Batman Forever|2.5705128205128207|
|     55|     Amores perros|3.1587301587301586|
|   3309|    Mildred Pierce|3.2142857142857144|
|   2334|            Taxi 3|3.0944444444444446|
|    179|   The Interpreter|2.7252747252747254|
|   5902|  A Bridge Too Far|3.8642045454545455|
|    613|          Downfall| 3.646511627906977|
|   2082|         Halloween|2.8169533169533167|
|  26142|        The Doctor|3.5833333333333335|
|   8998|Friends with Money|3.9722222222222223|
|   2161|  Fantastic Voyage|3.5329608938547485|
|   3536|             U-571|3.5042735042735043|
|   8852|Prince of Darkness|3.2142857142

In [43]:
movies_joined.select("rating").describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            377149|
|   mean|3.4989009648706477|
| stddev|1.0625113046920454|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



### Accumulators
* Shared variables which are updated by processes running across multiple nodes

In [44]:
terrible_count = spark.sparkContext.accumulator(0)
subpar_count = spark.sparkContext.accumulator(0)
average_count = spark.sparkContext.accumulator(0)
good_count = spark.sparkContext.accumulator(0)

#### Get movie count by ratings

In [45]:
def count_movie_by_rating(row):
    rating = float(row.rating)
    
    if (rating <= 2.0 ):
        terrible_count.add(1)
    elif (rating <= 3.0 and rating > 2.0 ):
        subpar_count.add(1)
    elif (rating <= 4.0 and rating > 3.0 ):
        average_count.add(1)
    elif (rating > 4.0) :
        good_count.add(1)

In [46]:
movies_joined.foreach(lambda x: count_movie_by_rating(x))

In [47]:
print("Terrible movies: ", terrible_count.value)
print("Sub-par movies: ", subpar_count.value)
print("Average movies: ", average_count.value)
print("Good movies: ", good_count.value)

Terrible movies:  51857
Sub-par movies:  107724
Average movies:  137547
Good movies:  80021


### Save the dataframe to a file

#### Save to a CSV file
Saves the data into a directory. You could also give a path in HDFS

In [48]:
movies_1995.write\
           .option("header","true")\
           .csv("movies_1995_csv")

#### Coalesce data into one file
Data from all partitions will coalesce into a single list

In [49]:
movies_1995.coalesce(1)\
           .write\
           .option("header","true")\
           .csv("movies_1995_coalesce_csv")

#### Save data as a JSON file

In [50]:
movies_1995.coalesce(1)\
           .write\
           .json("movie_overall_json")