# Data Collection and Organization for Movie Analytics
### [Udacity Data Engineering Nanodegree](https://www.udacity.com/course/data-engineer-nanodegree--nd027) Capstone Project

#### Project Summary
This project combines movie data, including user ratings, from different sources in order to build a data model that includes historical movie titles, along with user ratings, genres and tagged attributes of each movie. From here, analysis can be performed to find movies that are most popular with users and which attributes and genres that popular titles fall under. This data may be valuable to a studio or distributor in order to map trends for titles that users enjoyed the most.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope
The scope of this project is to take movie data, including user ratings, from different sources and to combine their data attributes per individual titles. This is being done in order to produce a more comprehensive data set than a single source can provide. The resulting data can then be used to run intelligence reporting regarding popularity for particular movies, genres, etc. The data will be read, organized and analyzed using [Apache Spark](https://spark.apache.org/).  

#### Describe and Gather Data 
The data being used is from both [IMDB Datasets](https://www.imdb.com/interfaces/) and [MovieLens](https://grouplens.org/datasets/movielens/latest/). Both of these datasets are publicly available and together contain data for over 8 million titles, along with metadata on actors, crew and user ratings. Information about invididual fields are listed in each link.

The IMDB Data Set consists of the following 7 tab-delimited files:

<ul>
    <li>title.basics.tsv.gz</li>
    <li>title.akas.tsv.gz</li>
    <li>title.crew.tsv.gz</li>
    <li>title.episode.tsv.gz</li>
    <li>title.principals.tsv.gz</li>
    <li>title.ratings.tsv.gz</li>
    <li>name.basics.tsv.gz</li>
</ul>

The MovieLens data consists of the following 6 .csv files:

<ul>
    <li>genome-scores.csv</li>
    <li>genome-tags.csv</li>
    <li>links.csv</li>
    <li>movies.csv</li>
    <li>ratings.csv</li>
    <li>tags.csv</li>
</ul>

In [None]:
import os

from pyspark.sql import SparkSession
spark = SparkSession \
            .builder \
            .config(
                "spark.jars.packages",
                "org.apache.hadoop:hadoop-aws:3.0.0"
            ) \
            .getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 

Let's take a look at each file to see how they are structured and identify how we can use them within the project.

#### IMDB Data Files

In [None]:
IMDB_SOURCE_DIR = "s3a://moviepipeline/imdb/"

imdb_read_options = {
    'sep': '\t',
    'header': True
}

`title.basics.tsv.gz`

In [None]:
imdb_title_basics_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.basics.tsv.gz")
)
imdb_title_basics_df.show(10)

It looks like the `titleType` field makes it apparent that there are more than movies in this data set. Let's see what values that field may hold:

In [None]:
imdb_title_basics_df.select("titleType").distinct().collect()

`title.akas.tsv.gz`

In [None]:
imdb_title_akas_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.akas.tsv.gz")
)
imdb_title_akas_df.show(10)

At first glance, it looks like this file contains alternate titles in different languages for worldwide regions. This is out of the scope of this project, so we will not use this file.

`name.basics.tsv.gz`

In [None]:
imdb_name_basics_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "name.basics.tsv.gz")
)
imdb_name_basics_df.show(10)

This file contains names, along with unique identifiers, to identify actors, directors, etc. This looks like it will be used to join people to their individual movie titles. 

`title.principals.tsv.gz`

In [None]:
imdb_title_principals_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.principals.tsv.gz")
)
imdb_title_principals_df.show(10)

After looking at the contents of this file, it is apparent that this is a lookup file that is used to join the names of people in the `name.basics.tsv.gz` to their respective titles in the `title.basics.tsv.gz` files. This is done via the `tconst` and `nconst` keys. For example, the `nconst` identifier *nm0005690* for the category of director on `tconst` title *tt0000001* looks like so:

In [None]:
imdb_title_principals_df.where("tconst = 'tt0000001' and nconst = 'nm0005690' and category = 'director'").first()

If we look up that `nconst` value in the `name` file, we can see who the director is for that title:

In [None]:
imdb_name_basics_df.where("nconst = 'nm0005690'").first()

`title.crew.tsv.gz`

In [None]:
imdb_title_crew_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.crew.tsv.gz")
)
imdb_title_crew_df.show(10)

From the description in the data from the [IMDB](https://www.imdb.com/interfaces/) site, this file contains the directors and writers for each title in the data set. The `principals` file looked like it contained this information as well. Let's do a quick spot check:

In [None]:
imdb_title_crew_df.where("tconst='tt0000001'").first()

It looks like the director for title *tt0000001* is also linked in the `principals` file, which means that the `crew` file appears to have this data duplicated. For this reason, we will choose not to use this file.

`title.episode.tsv.gz`

In [None]:
imdb_title_episode_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.episode.tsv.gz")
)
imdb_title_episode_df.show(10)

From looking at a sample of the `episode` data file and from the description on the [IMDB](https://www.imdb.com/interfaces/) site, it appears that the records in this file represent episode in televsion shows. Since we are building a pipeline for movies only, we do not need to use this file in this project.

`title.ratings.tsv.gz`

In [None]:
imdb_title_ratings_df = spark.read.options(
    **imdb_read_options).csv(
    os.path.join(IMDB_SOURCE_DIR, "title.ratings.tsv.gz")
)
imdb_title_ratings_df.show(10)

It looks like this file contains the average user ratings for titles, along with how many votes the title has received. We will use this file to include in our final data set, since we are interested in user ratings for analytical purposes.

#### MovieLens/GroupLens Data Files

`movies.csv`

In [None]:
ML_SOURCE_DIR = "s3a://moviepipeline/grouplens/"

In [None]:
ml_movies_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "movies.csv"))
ml_movies_df.show(5)

It is apparent that this is where each movie title is defined. From looking at the data, this file violates First Normal Form by containing multi-value fields (such as having the year in the title). This will need to be cleaned before this data can be used.

`links.csv`

In [None]:
ml_links_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "links.csv"))
ml_links_df.show(5)

This is great - here we have a link to the IMDB id that is used in the IMDB data file. This will allow us to join the two data sets much more directly than by title. Let's do a spot check to make sure that the IDs are correct (note that the IMBD id's are prepended with 'tt'):

In [None]:
imdb_title_basics_df.where("tconst = 'tt0114709'").first()

In [None]:
ml_links_df.where("imdbId = '0114709'").first()

In [None]:
ml_movies_df.where("movieId = '1'").first()

ID number *0114709* pulled up the movie *Toy Story* in both data sets. We can be reasonably sure that the ID links between the data sets are correct.

`tags.csv`

In [None]:
ml_tags_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "tags.csv"))
ml_tags_df.show(5)

This file shows different tags added by users. These may be useful to in our model to allow analysis by tag.

`ratings.csv`

In [None]:
ml_ratings_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "ratings.csv"))
ml_ratings_df.show(5)

Unlike the ratings in the IMDB file, which are aggregated, this file contains individual ratings on a user level. We do not need to get that granular for this project, so we will probably need to average out ratings on a per-movie basis. It also looks like these ratings are on a scale of 5 while the IMDB ratings are on a scale of 10. Let's check that in the data:

In [None]:
ml_ratings_df.select("rating").distinct().collect()

In [None]:
imdb_title_ratings_df.select("averageRating").distinct().collect()

Yes, it looks like the differences in scale that we thought was true is does exist. We will need to make a note of this when doing calculations.

`genome_scores.csv`

In [None]:
ml_genome_scores_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "genome-scores.csv"))
ml_genome_scores_df.show(5)

It looks like this file joins movie IDs and tags (presumably in the `genome-tags.csv` file, since this is the `genome-scores.csv` file. Let's see what the `genome-tags.csv` file contains.

In [None]:
ml_genome_tags_df = spark.read.options(header=True).csv(os.path.join(ML_SOURCE_DIR, "genome-tags.csv"))
ml_genome_tags_df.show(5)

It looks like this is the lookup file for the data in the `genome-scores` file. Reading the description on the [GroupLens](https://files.grouplens.org/datasets/movielens/ml-latest-README.html) site about the data, genome scores and tags are assigned by a Machine Learning algorithm, and more information can be found in [this article](http://files.grouplens.org/papers/tag_genome.pdf).

All of the data above are read into DataFrame objects in the `read_imdb_source_files` and `read_movielens_source_files` methods in the pipeline's code.

#### Cleaning Steps
Once the data is loaded, the following cleaning steps are taken before loading the data into the model:

1. Since we are only interested in movies, let's limit the `imdb_title_basics_df` DataFrame to only contain records where `titleType` is `movie`. This is done in the `clean_imdb_title_basics` method.

2. In the GroupLens `movies.csv` file, we saw that First Normal Form was violated by having the year in the same field as the title as well as the genres being pipe-delimited. Let's take care of those two issues by splitting the year and title data as well as turning the genres into an array. This is done in the `clean_ml_movies_title` and `clean_ml_movies_genres` methods.

3. We saw that in the IMDB files, ID numbers are prepended 'tt' and 'nm' values. We want to remove those prepended characters so that the values are numeric and so that they match the IMDB id references from the MovieLens data set. This is done in the `clean_imdb_title_ids` and `clean_imdb_name_ids` methods.

4. The genres of the IMDB `title.basics.tsv.gz` file also need to be split into an array like we did with the MovieLens genres. This is done in the `clean_imdb_genres` method.

5. We saw that when exploring the MovieLens ratings, the entries of the file are individual ratings actions by users. We do not need to get that granular, and would rather have them aggregated by title. This is done in the `clean_movielens_ratings` method.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The data model that has been developed to organize the raw data is as follows:

![Data Model](modeldiagram.png)

Since the MovieLens data set is smaller than the IMDB data set and references the IMDB id, I decided to use the IMDB id as the primary identifier for movies. I also decided on lookup tables for the tag, crew and genre data in order to minimize duplication and simplify querying by these attributes. 

#### 3.2 Mapping Out Data Pipelines
The steps that are needed to pipeline the data into the chosen data model are:

<ul>
    <li>Create a movie DataFrame based on the data in imdb_title_basics_df.</li>
    <li>Calculate the individual ratings from the ml_ratings_df DataFrame into the IMDB rating to get a weighted average of the two. This will populate the average_rating field in the movie DataFrame.</li>
    <li>Collect the IMDB ids and movie genres from both the imdb_title_basics_df DataFrame and the ml_movies_df DataFrame to populate moviegenre.</li>
    <li>Create a name DataFrame based on the imdb_name_basics DataFrame that includes the fields needed for the model.</li>
    <li>Create a castcrew DataFrame based on the imdb_title_principals DataFrame to map names to their respective categories for movie titles.</li>
    <li>Create usertag and genometag DataFrames using the MovieLens tag data.</li>
</ul>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the Data Model

1. First, let's create the movie DataFrame using information from imdb_title_basics. This is handled in the `create_movie_df` method.

2. Next we need to add ratings data from both the IMDB and MovieLens data sets and calculate the average the ratings between the two. This is handled in the `populate_movie_ratings` and `calculate_movie_ratings` methods.

3. Now let's create the `moviegenre` table by combining the genres from both data sets. This is done in the `create_moviegenre` method.

4. Next, we will compile the IMDB `name` file into the `name` table of our data model. This is handled in the `create_name_df` method.

5. Next we will create the `castcrew` table from the `imdb_title_principals_df` DataFrame. This is done in the `create_castcrew_df` method.

6. Next, we will create the usertag table based on the MovieLens `tags.csv` file. This is done in the `create_usertag_df`method.

7. Lastly, we will create the `genometag` table based on the MovieLens `genome-tags.csv` and `genome-scores` files. This is done in the `create_genometag_df` method.

#### 4.2 Repartition the Model

Now let's repartition each DataFrame in order to improve performance when querying on the cluster. This is done in order to avoid shuffling between the nodes. Since the relationships between tables are defined between the `imdb_id` and `name_id` fields, we will use Spark's `repartitionByRange` method on those fields. This is done in the `repartition_model` method.

#### 4.3 Data Quality Checks
The data quality checks we will run to check the integrity of our data after the pipeline has been run are as follows:
<ul>
    <li>Test that the movie table does not contain any blank or NULL ID values.</li>
    <li>Test that the movie table does not contain any duplicate imdb_id values.</li>
    <li>Test that the name table does not contain any blank or NULL ID values.</li>
    <li>Test that the movie table does not contain any duplicate imdb_id values.</li>
    <li>Test that the source IMDB movie file contains the same amount of records as in the movie table.</li>
    <li>Test that the source IMDB name file contains the same amount of records as in the name table.</li>
</ul>

The following methods handle the above test cases:

    test_movie_null_key
    test_movie_unique_key
    test_name_null_key
    test_name_unique_key
    test_movie_count
    test_name_count

#### 4.4 Running the Pipeline on an EMR Cluster

To run the pipeline on an EMR cluster, first copy `datapipeline.py` to your cluster's main node:

    scp -i ./emrkeypair.pem ./datapipeline.py hadoop@ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:/home/hadoop/datapipeline.py

Then run the job with the `spark-submit` command:
    
    spark-submit /home/hadoop/datapipeline.py

Once the pipeline has been run successfully, the model is written out to parquet files under `s3://moviepipeline/output`.

#### 4.5 Data dictionary
Below is a description of our data model's tables, each table's corresponding fields, and the sources of each field.

`movie`
<ul>
<li><strong>imdb_id</strong> - The unique IMDB movie title identifier. This is defined in the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>title</strong> - The title of a movie. This comes from the <i>primaryTitle</i> field of the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>release_year</strong> - The year the movie was released. This comes from the <i>startYear</i> field of the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>length_minutes</strong> - The runtime length of the movie. This comes from the <i>runtimeMinutes</i> field of the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>is_adult</strong> - A boolean field to determine if the movie is an adult title or not. This comes from the <i>isAdult</i> field of the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>average_rating</strong> - The weighted average of user ratings. This is calculated using the data from IMDB's <i>title.ratings.tsv</i> file and from the MovieLens file <i>ratings.csv</i>.</li>    
</ul>

`moviegenre`
<ul>
<li><strong>imdb_id</strong> - The unique IMDB movie title identifier. This is defined in the IMDB <i>title.basics.tsv</i> file.</li>
<li><strong>genre</strong> - A genre (Action, Adventure, etc.) that is applied to the movie. There can be more than one genre per movie in this table. This data comes from both IMDB's <i>title.basics.tsv</i> file and the MovieLens <i>movies.csv</i> file.</li>  
</ul>

`name`
<ul>
<li><strong>name_id</strong> - The unique identifier for an individual. This comes from the <i>nconst</i> field of the IMDB <i>name.basics.tsv</i> file.</li>
<li><strong>name</strong> - The name (first and last combined) of the individual. This comes from the <i>primaryName</i> IMDB <i>name.basics.tsv</i> file.</li> 
<li><strong>birth_year</strong> - The year the individual was born. This comes from the <i>birthYear</i> field of the IMDB <i>name.basics.tsv</i> file.</li> 
<li><strong>death_year</strong> - The year that the invidual died (if deceased). This field will contain an '\N' if the person is still alive. This comes from the <i>deathYear</i> field of the IMDB <i>name.basics.tsv</i> file.</li> 
</ul>

`castcrew`
<ul>
<li><strong>name_id</strong> - The identifier for an individual. This comes from the <i>nconst</i> field in the IMDB <i>title.principals.tsv</i> file and is a relation to the <i>nconst</i> field of the IMDB <i>name.basics.tsv</i> file.</li>
<li><strong>imdb_id</strong> - The movie identifier ID. This comes from the <i>tconst</i> field of the IMDB <i>title.principals.tsv</i> file and is a relation to the <i>tconst</i> field of the <i>title.basics.tsv</i> file.
<li><strong>category</strong> - The corresponding category that the related individual played as part of the related movie title. For example, <i>director</i>, <i>cinematographer</i>, <i>composer</i>, etc.</li>
</ul>

`usertag`
<ul>
    <li><strong>imdb_id</strong> - This unique ID for the movie title. This is joined to the <i>movie</i> table via the <i>movieId</i> field of the MovieLens <i>links.csv</i> file.</li>
    <li><strong>tag</strong> -The corresponding tag value given to the movie by a MovieLens user. For example, <i>epic</i>, <i>sci-fi</i>, etc. This comes from the <i>tag</i> field of the MovieLens <i>tags.csv</i></li>
    <li><strong>timestamp</strong> -The timestamp value that the tag was assigned to the movie by a user. This comes from the <i>timestamp</i> field of the MovieLens <i>tags.csv</i> file.</li>
</ul>

`genometag`
<ul>
    <li><strong>imdb_id</strong> - This unique ID for the movie title. This is joined to the <i>movie</i> table via the <i>movieId</i> field of the MovieLens <i>links.csv</i> file.</li>
    <li><strong>tag</strong> -The corresponding tag value given to the movie via a genome tagging algorithm. For example, <i>action</i>, <i>moody</i>, <i>spying</i>, etc. This comes from the <i>tag</i> field of the MovieLens <i>genome-tags.csv</i> file.</li>
    <li><strong>relevance</strong> The relevance score given to the genome tag for the particular movie title by the MovieLens genome tagging algorithm. This comes from the <i>relevance</i> field of the MovieLens <i>genome-scores.csv</i> file.</li>
</ul>

#### 4.6 Sample Analysis
Below are sample queries that can be run on the resulting model for data analysis purposes.

1. Find the 100 most popular movies by average user rating for the release year 2017.

In [None]:
OUTPUT_DIR = "s3a://moviepipeline/output/"

In [None]:
movie_df = spark.read.parquet(os.path.join(OUTPUT_DIR, "movie.parquet"))
movie_df.createOrReplaceTempView("movie_df")

popular_movies = spark.sql(
    """
    SELECT 
        imdb_id,
        title,
        release_year,
        average_rating
    FROM movie_df
    WHERE release_year = '2017'
    AND average_rating IS NOT NULL
    ORDER BY average_rating desc
    LIMIT 100
    """
)
popular_movies.show(100, False)

2. Find the 20 most popular genres based on average user ratings for the release year 2017.

In [None]:
moviegenre_df = spark.read.parquet(os.path.join(OUTPUT_DIR, "moviegenre.parquet"))
moviegenre_df.createOrReplaceTempView("moviegenre_df")

popular_genres = spark.sql(
    """
    SELECT 
        mg.genre,
        ROUND(AVG(m.average_rating), 2) as avg_rating
    FROM movie_df m
    INNER JOIN moviegenre_df mg on (m.imdb_id = mg.imdb_id)
    GROUP BY mg.genre
    ORDER by avg_rating DESC
    LIMIT 20
    """
)
popular_genres.show(20)

3. Explore the user tags for movies with ratings greater than or equal to 9.0

In [None]:
usertag_df = spark.read.parquet(os.path.join(OUTPUT_DIR, "usertag.parquet"))
usertag_df.createOrReplaceTempView("usertag_df")

popular_tags = spark.sql(
    """
    SELECT 
        tag,
        COUNT(*) as tag_count
    FROM usertag_df ut
    INNER JOIN movie_df m on (ut.imdb_id = m.imdb_id)
    WHERE m.average_rating >= 9.0
    GROUP BY tag
    ORDER BY tag_count DESC
    """
)
popular_tags.show(100)

4. The scope of this project was to use a data set containing at least a million rows. We can see that there were over a million tags that were assigned by users to titles in the MovieLens data:

In [None]:
tag_count = spark.sql(
    """
    SELECT count(*) from usertag_df
    """
)
tag_count.show()

As well as close to 600,000 movie titles:

In [None]:
movie_count = spark.sql(
    """
    SELECT count(*) from movie_df
    """
)
movie_count.show()

And there are over 11 million names of cast and crew members:

In [None]:
name_df = spark.read.parquet(os.path.join(OUTPUT_DIR, "name.parquet"))
name_df.createOrReplaceTempView("name_df")
name_count = spark.sql(
    """
    SELECT count(*) from name_df
    """
)
name_count.show()

#### Step 5: Project Write-Up
<strong>Clearly state the rationale for the choice of tools and technologies for the project.</strong>

For this project, Spark was chosen to build the pipeline. This is primarily because the Spark 'schema-on-read' ecosystem allows for flexible modeling, as well as the built-in functionality to cleanly parse, join, modify and combine data from multiple sources. This approach has advantages over using a traditional data warehouse due to less intensive setup, more flexibility to modify the schema, and also the ability to scale horizontally when the data grows. 

<strong>Propose how often the data should be updated and why.</strong>

Since new movies are usually released on a weekly basis, the movie titles of this pipeline should most likely be updated weekly. The user scores from MovieLens should be updated more often, perhaps on a daily basis, to keep track of trends for popular movies. 

<strong>Write a description of how you would approach the problem differently under the following scenarios:</strong>

<strong><i>The data was increased by 100x.</i></strong>

The pipeline was built to read entire files rather than incremental updates. If the data were to increase by 100 times, it would be much more prudent to incrementally update the data by timestamp and/or daily files rather than process the entire pipeline every time the source data is updated. If incremental updates were to be set up, we would also need to keep the IMDB and MovieLens individual ratings values (number of ratings and average rating) in the data model, in order to be able to recalculate the overall average rating when new ratings are available in the source data. 
 
<strong><i>The data populates a dashboard that must be updated on a daily basis by 7am every day.</i></strong>
 
If the pipeline was needed to be completed to populate a business dashboard every day, the best approach would be to use the code in the notebook to create a DAG in Airflow. This DAG could be set up to run with a specific SLA to complete by a certain time every night, and page the Data Engineering team if it did not complete or if there was an error.
 
<strong><i>The database needed to be accessed by 100+ people.</i></strong>
 
If the data needed to be accessed by over 100 people, then it would be a good decision to export the pipeline's results into a Data Warehouse such as Amazon Redshift. This is because the database system can handle multiple connections for querying the data or power the backend to a BI dashboard application.