# ETL Pipeline: Anime Recommendation Database
### Data Engineering Capstone Project

#### Project Summary
The goal of this project:


* Download Anime Recommendation Database 2020 data from Kaggle and load raw dataset to AWS S3.
* Create an ETL pipeline that extracts, processes using Spark, and loads the data back into set of dimensional tables in S3.



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

## Import Modules

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import split, col
from pyspark.sql.functions import lit


import configparser

## Create Spark Session

In [2]:
config_object = configparser.ConfigParser()
config_object.read_file(open('dl.cfg'))
profile_info = config_object["aws_profile"]

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.path.style.access", True) \
    .config("spark.hadoop.fs.s3a.access.key", profile_info.get('aws_key_id')) \
    .config("spark.hadoop.fs.s3a.secret.key", profile_info.get('aws_secret_access_key')) \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will perform transformations on two datasets. We will then create two dimension tables, each for the respective dataset. The resulting databases are intended to be used as analytical foundation, and data for creating a recommendation system . Spark will be used to process the data.

#### Describe and Gather Data 

### Source data:

https://www.kaggle.com/hernan4444/anime-recommendation-database-2020?select=anime.csv

#### About Files

* `animelist.csv`: List of all animes register by the user with the respective score, watching status and numbers of episodes watched. This dataset contains 109 Million row, 17.562 different animes and 325.772 different users.

In [3]:
anime_list_path = "s3a://capstone-final-project-udacity/animelist.csv"
anime_list_df = spark.read.csv(anime_list_path, header=True)
anime_list_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- anime_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- watching_status: string (nullable = true)
 |-- watched_episodes: string (nullable = true)



In [4]:
anime_list_df.show(5, False)

+-------+--------+------+---------------+----------------+
|user_id|anime_id|rating|watching_status|watched_episodes|
+-------+--------+------+---------------+----------------+
|0      |67      |9     |1              |1               |
|0      |6702    |7     |1              |4               |
|0      |242     |10    |1              |4               |
|0      |4898    |0     |1              |1               |
|0      |21      |10    |1              |0               |
+-------+--------+------+---------------+----------------+
only showing top 5 rows



* `anime.csv`: Information of anime scrapped of main page and stats page.

In [5]:
anime_path = "s3a://capstone-final-project-udacity/anime.csv"
anime_df = spark.read.csv(anime_path, header=True)
anime_df.printSchema()

root
 |-- MAL_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Japanese name: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Aired: string (nullable = true)
 |-- Premiered: string (nullable = true)
 |-- Producers: string (nullable = true)
 |-- Licensors: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Ranked: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Members: string (nullable = true)
 |-- Favorites: string (nullable = true)
 |-- Watching: string (nullable = true)
 |-- Completed: string (nullable = true)
 |-- On-Hold: string (nullable = true)
 |-- Dropped: string (nullable = true)
 |-- Plan to Watch: string (nullable = true)
 |

## Step 2: Cleaning the data

#### Removing Unnecessary Columns

In [6]:
anime_df = anime_df.drop("Japanese name", "English name", "Score-1", "Score-2", "Score-3", "Score-4", "Score-5", "Score-6", "Score-7", "Score-8", "Score-9",
                         "Score-10", "Rating", "Premiered", "Aired", "Producers", "Studios", "Duration", "Licensors")
anime_df.printSchema()

root
 |-- MAL_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Ranked: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Members: string (nullable = true)
 |-- Favorites: string (nullable = true)
 |-- Watching: string (nullable = true)
 |-- Completed: string (nullable = true)
 |-- On-Hold: string (nullable = true)
 |-- Dropped: string (nullable = true)
 |-- Plan to Watch: string (nullable = true)



#### Convert column genres to column array type.

In [7]:
anime_df=anime_df.select("MAL_ID", "Name", "Score", "Type", "Episodes", "Source", "Ranked",
             "Popularity", "Members", "Favorites", "Watching", "Completed",
             "On-Hold", "Dropped", "Plan to Watch", f.split("Genres", ",").alias("genre"))

#### Drop adult animes

In [8]:
anime_df=anime_df.select("MAL_ID", "Name", "Score", "Type", "Episodes", "Source", "Ranked",
                         "Popularity", "Members", "Favorites", "Watching","Completed",
                         "On-Hold", "Dropped", "Plan to Watch", "genre", f.array_contains(anime_df.genre, "Hentai").alias("check"))

In [9]:
anime_df=(anime_df.select("MAL_ID", "Name", "Score", "Type", "Episodes", "Source", "Ranked",
                          "Popularity", "Members", "Favorites", "Watching", "Completed", 
                          "On-Hold", "Dropped", "Plan to Watch", "genre").where(anime_df.check == "False"))
anime_df.show(5)

+------+--------------------+-----+-----+--------+--------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------------------+
|MAL_ID|                Name|Score| Type|Episodes|  Source|Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|               genre|
+------+--------------------+-----+-----+--------+--------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------------------+
|     1|        Cowboy Bebop| 8.78|   TV|      26|Original|  28.0|        39|1251960|    61971|  105808|   718161|  71513|  26678|       329800|[Action,  Adventu...|
|     5|Cowboy Bebop: Ten...| 8.39|Movie|       1|Original| 159.0|       518| 273145|     1174|    4143|   208333|   1935|    770|        57964|[Action,  Drama, ...|
|     6|              Trigun| 8.24|   TV|      26|   Manga| 266.0|       201| 558913|    12944|   29113|   343492|  25465|  13925|       146918|[Action,  Sci-Fi,...|
|   

#### Generating unique ID's for users.

In [10]:
anime_list_df = anime_list_df.drop("user_id")

In [11]:
anime_list_df=anime_list_df.select("anime_id", "rating", "watching_status", "watched_episodes").withColumn("user_id", f.monotonically_increasing_id())

In [12]:
anime_list_df.show(5)

+--------+------+---------------+----------------+-------+
|anime_id|rating|watching_status|watched_episodes|user_id|
+--------+------+---------------+----------------+-------+
|      67|     9|              1|               1|      0|
|    6702|     7|              1|               4|      1|
|     242|    10|              1|               4|      2|
|    4898|     0|              1|               1|      3|
|      21|    10|              1|               0|      4|
+--------+------+---------------+----------------+-------+
only showing top 5 rows



### Step 3: Define the Data Model

This will allow the review team to find information about the anime and the creation of a recommendation system.

#### 3.1 Conceptual Data Model

##### The first dimension table will contain the list of all animes register by the user with the respective score, watching status and numbers of episodes watched.

* `user_id`: Randomly generated user id.
* `anime_id`: ID of the anime. (e.g. 1).
* `rating`: Score between 1 to 10 given by the user. 0 if the user didn't assign a score. (e.g. 10)
* `watching_status`: State ID from this anime in the anime list of this user. (e.g. 2)
* `watched_episodes`: Numbers of episodes watched by the user. (e.g. 24)

##### The second dimension table will contain general information of every anime (17.562 different anime).

* `MAL_ID`: MyAnimelist ID of the anime. (e.g. 1)
* `Name`: full name of the anime. (e.g. Cowboy Bebop)
* `Score`: average score of the anime given from all users in MyAnimelist database. (e.g. 8.78)
* `Genre`: comma separated list of genres for this anime. (e.g. Action, Adventure, Comedy, Drama, Sci-Fi, Space)
* `Type`: TV, movie, OVA, etc. (e.g. TV)
* `Episodes`: number of chapters. (e.g. 26)
* `Source`: Manga, Light novel, Book, etc. (e.g Original)
* `Ranked`: position based in the score. (e.g 28)
* `Popularity`: position based in the the number of users who have added the anime to their list. (e.g 39)
* `Members`: number of community members that are in this anime's "group". (e.g. 1251960)
* `Favorites`: number of users who have the anime as "favorites". (e.g. 61,971)
* `Watching`: number of users who are watching the anime. (e.g. 105808)
* `Completed`: number of users who have complete the anime. (e.g. 718161)
* `On-Hold`: number of users who have the anime on Hold. (e.g. 71513)
* `Dropped`: number of users who have dropped the anime. (e.g. 26678)
* `Plan to Watch`: number of users who plan to watch the anime. (e.g. 329800)

### 3.2 Mapping Out Data Pipelines

* Create anime list and anime general information dimension table.
* Change columns to correct date types.
* Write tables to parquet file

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [13]:
anime_by_user_table=anime_list_df.select("user_id", "anime_id", "rating",
                                        "watching_status", "watched_episodes")

In [14]:
anime_by_user_table=anime_by_user_table.withColumn("rating",col("rating").cast(IntegerType())) \
    .withColumn("watched_episodes",col("watched_episodes").cast(IntegerType()))
anime_by_user_table.printSchema()

root
 |-- user_id: long (nullable = false)
 |-- anime_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- watching_status: string (nullable = true)
 |-- watched_episodes: integer (nullable = true)



In [18]:
anime_by_user_table.show(5)

+-------+--------+------+---------------+----------------+
|user_id|anime_id|rating|watching_status|watched_episodes|
+-------+--------+------+---------------+----------------+
|      0|      67|     9|              1|               1|
|      1|    6702|     7|              1|               4|
|      2|     242|    10|              1|               4|
|      3|    4898|     0|              1|               1|
|      4|      21|    10|              1|               0|
+-------+--------+------+---------------+----------------+
only showing top 5 rows



In [15]:
anime_table=anime_df.select("MAL_ID", "Name", "Score", "Type", "Episodes", "Source", "Ranked",
                          "Popularity", "Members", "Favorites", "Watching", "Completed", 
                          "On-Hold", "Dropped", "Plan to Watch", "genre") \
.withColumn("score", col("Score").cast(FloatType())) \
.withColumn("episodes", col("Episodes").cast(IntegerType())) \
.withColumn("members", col("Members").cast(IntegerType())) \
.withColumn("favorites", col("Favorites").cast(IntegerType())) \
.withColumn("watching", col("Watching").cast(IntegerType())) \
.withColumn("completed", col("Completed").cast(IntegerType())) \
.withColumn("on_hold", col("On-Hold").cast(IntegerType())) \
.withColumn("dropped", col("Dropped").cast(IntegerType())) \
.withColumn("plan_to_watch", col("Plan to Watch").cast(IntegerType())) \
.withColumnRenamed("MAL_ID", "anime_id") \
.withColumnRenamed("Name", "name_of_anime") \
.withColumnRenamed("Type", "type_of_anime") \
.withColumnRenamed("Source", "source") \
.withColumnRenamed("Ranked", "ranked") \
.withColumnRenamed("Popularity", "popularity")

In [16]:
anime_table=anime_table.drop("On-Hold", "Plan to Watch")
anime_table.printSchema()

root
 |-- anime_id: string (nullable = true)
 |-- name_of_anime: string (nullable = true)
 |-- score: float (nullable = true)
 |-- type_of_anime: string (nullable = true)
 |-- episodes: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- ranked: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- members: integer (nullable = true)
 |-- favorites: integer (nullable = true)
 |-- watching: integer (nullable = true)
 |-- completed: integer (nullable = true)
 |-- dropped: integer (nullable = true)
 |-- genre: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- on_hold: integer (nullable = true)
 |-- plan_to_watch: integer (nullable = true)



In [19]:
anime_table.show(5)

+--------+--------------------+-----+-------------+--------+--------+------+----------+-------+---------+--------+---------+-------+--------------------+-------+-------------+
|anime_id|       name_of_anime|score|type_of_anime|episodes|  source|ranked|popularity|members|favorites|watching|completed|dropped|               genre|on_hold|plan_to_watch|
+--------+--------------------+-----+-------------+--------+--------+------+----------+-------+---------+--------+---------+-------+--------------------+-------+-------------+
|       1|        Cowboy Bebop| 8.78|           TV|      26|Original|  28.0|        39|1251960|    61971|  105808|   718161|  26678|[Action,  Adventu...|  71513|       329800|
|       5|Cowboy Bebop: Ten...| 8.39|        Movie|       1|Original| 159.0|       518| 273145|     1174|    4143|   208333|    770|[Action,  Drama, ...|   1935|        57964|
|       6|              Trigun| 8.24|           TV|      26|   Manga| 266.0|       201| 558913|    12944|   29113|   343

### 4.2 Write to AWS S3 Bucket in parquet files

In [17]:
output_data  = "s3a://capstone-final-project-udacity/results" 

anime_by_user_table.write.parquet(output_data + "anime_by_user/" + "anime_by_user.parquet")
anime_table.write.parquet(output_data + "anime_list/" + "anime_list.parquet")

AnalysisException: 'path s3a://capstone-final-project-udacity/resultsanime_by_user/anime_by_user.parquet already exists.;'

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [21]:
def table_exists(df):
    if df is not None:
        return print("data quality check passed")
    else:
        return print("data quality check failed")
    
table_exists(anime_table)
table_exists(anime_by_user_table)

data quality check passed
data quality check passed


In [22]:
def quality_check(df, description):
    '''
    Input: Spark dataframe
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(anime_table, "Animes table")
quality_check(anime_by_user_table, "Animes by user table")

Data quality check passed for Animes table with 16398 records
Data quality check passed for Animes by user table with 109224747 records


0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Anime table

* `user_id`: Randomly generated user id.
* `anime_id`: ID of the anime. (e.g. 1).
* `rating`: Score between 1 to 10 given by the user. 0 if the user didn't assign a score. (e.g. 10)
* `watching_status`: State ID from this anime in the anime list of this user. (e.g. 2)
* `watched_episodes`: Numbers of episodes watched by the user. (e.g. 24)

### Anime by user table

* `MAL_ID`: MyAnimelist ID of the anime. (e.g. 1)
* `Name`: full name of the anime. (e.g. Cowboy Bebop)
* `Score`: average score of the anime given from all users in MyAnimelist database. (e.g. 8.78)
* `Genre`: comma separated list of genres for this anime. (e.g. Action, Adventure, Comedy, Drama, Sci-Fi, Space)
* `Type`: TV, movie, OVA, etc. (e.g. TV)
* `Episodes`: number of chapters. (e.g. 26)
* `Source`: Manga, Light novel, Book, etc. (e.g Original)
* `Ranked`: position based in the score. (e.g 28)
* `Popularity`: position based in the the number of users who have added the anime to their list. (e.g 39)
* `Members`: number of community members that are in this anime's "group". (e.g. 1251960)
* `Favorites`: number of users who have the anime as "favorites". (e.g. 61,971)
* `Watching`: number of users who are watching the anime. (e.g. 105808)
* `Completed`: number of users who have complete the anime. (e.g. 718161)
* `On-Hold`: number of users who have the anime on Hold. (e.g. 71513)
* `Dropped`: number of users who have dropped the anime. (e.g. 26678)
* `Plan to Watch`: number of users who plan to watch the anime. (e.g. 329800)

#### Step 5: Complete Project Write Up

##### Clearly state the rationale for the choice of tools and technologies for the project.
* Spark was chosen since it can easily handle large amounts of data.

##### Propose how often the data should be updated and why.
* The data should be updated monthly.

#### Write a description of how you would approach the problem differently under the following scenarios:

##### The data was increased by 100x.
* Use AWS EMR + S3 if the data was increased by 100x.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
* Use Apache Airflow.
* DAG retries, or send emails on failures
* Daily intervals with quality checks
* If checks fail, then send emails to operators, freeze dashboard, look at DAG logs to figure out what went wrong

##### The database needed to be accessed by 100+ people.
* Store parquet files in AWS S3, give read access to users.