# Capstone Project Part 2 - Data Quality, Orchestration and Visualization

For the second part of the capstone project, you will further develop the data pipeline. You will integrate data quality checks and orchestration to improve the robustness of the architecture and include data visualization and analytical views to showcase the insights you can generate with the data pipeline.  

# Table of Contents

- [ 1 - Introduction](#1)
- [ 2 - Deployment of the Previous Architecture](#2)
- [ 3 - Data Quality with AWS Glue](#3)
  - [ 3.1 - Configuring the Rule Sets](#3.1)
  - [ 3.2 - Creating Materialized Views with *dbt*](#3.2)
- [ 4 - Orchestration with Apache Airflow](#4)
  - [ 4.1 - Accessing Apache Airflow](#4.1)
  - [ 4.2 - DAG for Songs Data in RDS Source](#4.2)
  - [ 4.3 - DAG for Users and Sessions Data from API Source](#4.3)
- [ 5 - Data Visualization with Apache Superset](#5)

<a name='1'></a>
## 1 - Introduction

DeFtunes is a new company in the music industry, offering a subscription-based app for streaming songs. Recently, they have expanded their services to include digital song purchases. With this new retail feature, DeFtunes requires a data pipeline to extract purchase data from their new API and operational database, enrich and model this data, and ultimately deliver the comprehensive data model for the Data Analysis team to review and gain insights. You and your team have developed an initial version of the pipeline, now there are some new requirements to improve it.

The new requirements for this project are:

![Capstone_Diagram_Part2](images/Capstone-diagram2.png)

1. The pipeline should allow for incremental ingestion of new data from the data sources.
2. The pipeline should run daily using data orchestration (you will use Airflow).
3. Data quality checks should be implemented to verify the quality of newly ingested and cleansed data.
4. Analytical views should be added on top of the star schema data model.
5. A dashboard should be added to the architecture, to allow the visualization of the analytical views and insights (you will use Apache Superset).

<a name='2'></a>
## 2 - Deployment of the Previous Architecture

You will recreate the data architecture from the first part of the capstone, this is a refresher of the elements:

![Capstone_Diagram](images/Capstone-diagram.png)

1. **Data Sources**:
   1. *DeFtunes API*: Contains the `users` and `sessions` endpoints, used to gather information about sales parametrized by a start and end date.
   2. *DeFtunes Operational RDS*: Contains the `songs` table, with all the information related to the available songs in the platform
2. **Extract Jobs**: Three AWS Glue jobs in charge of extracting information from the data sources into the `landing zone`. A new argument has been added to the jobs known as `ingest_date`, this will help with the incremental load of new data.
3. **Transform Jobs**: Two AWS Glue jobs in charge of transforming the raw information coming from the `landing zone` and saving the cleansed data into Apache Iceberg tables in the `transformation zone`.
4. **Redshift Spectrum + Glue Catalog**: Enable Redshift to query Apache Iceberg tables in the `transformation zone`
5. **Redshift**: Data warehouse solution used as the `serving layer`, the data modelling is done using *dbt*.

To deploy this infrastructure again, you have been provided with some Terraform files. 

2.1. Go to the AWS console and search for **CloudFormation**. Click on the alphanumeric ID stack and then open the **Outputs** tab. You will see the key `APIEndpoint`, copy the corresponding **Value**. 

Open the glue file `terraform/modules/extract_job/glue.tf`, replace the `<API_ENDPOINT>` placeholders with the API Endpoint value (in two places). 

Take a look at the ingestion date which is in the format `YYYY-MM-DD`. Jobs will be running each month to extract data from the previous month, starting by ingesting the data of January 2020. So the ingestion date will need to be the first day of the next month, `"2020-02-01"`. Replace the placeholders `<INGEST_DATE_YYYY-MM-DD>` with the ingestion date `"2020-02-01"` in the format `YYYY-MM-DD` (in three places).

Save changes.

2.2. Open the glue file `terraform/modules/transform_job/glue.tf` and replace the placeholders `<INGEST_DATE_YYYY-MM-DD>` with the same ingestion date in the format `YYYY-MM-DD` (in two places). Save changes.

Save changes.

2.3. Now, navigate to the VSCode terminal and enter the `terraform` folder. You will apply the terraform plan module by module.

*Note*:<span style="color:red"> All terminal commands in this lab should be run in the VSCode terminal, not Jupyter, as it may cause some issues.</span>

```bash
cd terraform
terraform init
terraform plan
terraform apply -target=module.extract_job
terraform apply -target=module.transform_job
terraform apply -target=module.serving
```
*Note*: Remember that the command `terraform apply` will prompt you to reply `yes`.

2.4. Perform an initial run of the glue jobs contained in the Terraform outputs. First, you need to run three jobs, the names of which are in the following output variables: `glue_api_users_extract_job`, `glue_sessions_users_extract_job` and `glue_rds_extract_job`. Use the following command in the VSCode terminal to execute each job based on its name (please put the names without the double quotes):

```bash
aws glue start-job-run --job-name <JOB-NAME> | jq -r '.JobRunId'
```

You should get `JobRunID` in the output. Use this job run ID to track each job status by using this command:

```bash
aws glue get-job-run --job-name <JOB-NAME> --run-id <JobRunID> --output text --query "JobRun.JobRunState"
```

Wait until the three jobs statuses change to `SUCCEEDED` (each job should take around 3 mins). 

2.5. Now run two jobs, the names of which are in the following outputs: `glue_json_transformation_job` and `glue_songs_transformation_job`. Follow the same steps.

2.6. Let's verify that the data is available connecting to the Redshift cluster. Complete the connection details by replacing the placeholder `<REDSHIFT_ENDPOINT>` with the value of `RedshiftClusterEndpoint` key, then run the code:

In [None]:
%load_ext sql

In [None]:
REDSHIFTDBHOST = '<REDSHIFT_ENDPOINT>'
REDSHIFTDBPORT = 5439
REDSHIFTDBNAME = 'dev'
REDSHIFTDBUSER = 'defaultuser'
REDSHIFTDBPASSWORD = 'Defaultuserpwrd1234+'

redshift_connection_url = f'postgresql+psycopg2://{REDSHIFTDBUSER}:{REDSHIFTDBPASSWORD}@{REDSHIFTDBHOST}:{REDSHIFTDBPORT}/{REDSHIFTDBNAME}'
%sql {redshift_connection_url}

2.7. Verify the existing schemas, among them should be the `deftunes_transform` and `deftunes_serving`.

In [None]:
%sql SHOW SCHEMAS FROM DATABASE dev

If everything was set up correctly, there should be three tables in the `deftunes_transform` schema in Redshift. Run the following cell:

In [None]:
%sql SHOW TABLES FROM SCHEMA dev.deftunes_transform

<a name='3'></a>
## 3 - Data Quality with AWS Glue

To perform the data quality checks, you will use AWS Glue Data Quality which provides a better integration with tables in the Glue Data Catalog, AWS Glue ETL Jobs and Airflow. You will use the Data Quality service for the Data Catalog on top of the `transform` layer. This requires that the tables already exist, a rule set is defined for the quality checks to be performed on the table, and an IAM role is in place to run the rule set evaluation.

Most of the configuration has been already set up on Terraform under the `data_quality` module, you will have to define the rule set for each table and apply the corresponding module. To define the quality checks, you have to use `Data Quality Definition Language (DQDL)`, these are some examples:

- **ColumnDataType**: Checks if a column is compliant with a datatype.
  - *Syntax*: `ColumnDataType <COL_NAME> = <EXPECTED_TYPE>`
  - *Example*: `ColumnDataType "colA" = "INTEGER"`
- **ColumnExists**: Checks if columns exist in a dataset.
  - *Syntax*: `ColumnExists <COL_NAME>`
  - *Example*: `ColumnExists "Middle_Name"`
- **ColumnLength**: Checks if the length of data is consistent.
  - *Syntax*: `ColumnLength <COL_NAME><EXPRESSION>`
  - *Example:* `ColumnLength "Postal_Code" = 5`
- **ColumnValues**: Runs an expression against the values in a column.
  - *Syntax*: `ColumnValues <COL_NAME> <EXPRESSION>`
  - *Example*: `ColumnValues "Country" in [ "US", "CA", "UK", NULL, EMPTY, WHITESPACES_ONLY ]`
- **Completeness**: Checks the percentage of complete (non-null) values in a column.
  - *Syntax*: `Completeness <COL_NAME> <EXPRESSION>`
  - *Example*: `Completeness "First_Name" > 0.95`
- **IsComplete**: Checks whether all of the values in a column are complete (non-null).
  - *Syntax*: `IsComplete <COL_NAME>`
  - *Example*: `IsComplete "email"` OR `IsComplete "Email" where "Customer_ID between 1 and 50"`
- **IsPrimaryKey**: Checks whether a column contains a primary key.
  - *Syntax*: `IsPrimaryKey <COL_NAME>`
  - *Example*: `IsPrimaryKey "Customer_ID"`
- **IsUnique**: Checks whether all of the values in a column are unique, and returns a boolean value.
  - *Syntax*: `IsUnique <COL_NAME>`
  - *Example*: `IsUnique "email"`

<a name='3.1'></a>
### 3.1 - Configuring the Rule Sets

This is an example of how to create a rule set for a table in the data catalog in Terraform, in the `ruleset` argument you pass a list of Rules separated by commas, in the `target_table` argument you point to the target table in the Glue Catalog:

```bash
resource "aws_glue_data_quality_ruleset" "example" {
  name    = "example"
  ruleset = "Rules = [ IsComplete \"user_id\", IsComplete \"session_id\"]"
  target_table {
    database_name = aws_glue_catalog_database.example.name
    table_name    = aws_glue_catalog_table.example.name
  }
}
```

3.1.1. Open the glue file `terraform/modules/data_quality/glue.tf`. 

In the resource "aws_glue_data_quality_ruleset" "sessions_dq_ruleset" replace the placeholder `<RULESET_HERE>` with

```bash
IsComplete \"user_id\", IsComplete \"session_id\", ColumnLength \"user_id\" = 36, ColumnLength \"session_id\" = 36, IsComplete \"song_id\", ColumnValues \"price\" <= 2
```

In the resource "aws_glue_data_quality_ruleset" "users_dq_ruleset" replace the same placeholder with

```bash
IsComplete \"user_id\", Uniqueness \"user_id\" > 0.95, IsComplete \"user_lastname\", IsComplete \"user_name\", IsComplete \"user_since\"
```

Save changes.

3.1.2. Run the Terraform module `data_quality` with the following command:

```bash
terraform apply -target=module.data_quality
```

<a name='3.2'></a>
### 3.2 - Creating Materialized Views with *dbt*

Before you configure and create the orchestration and dashboard, you have to create some materialized views to address some business questions, for this purpose you will use **dbt** to perform the required data modeling. For now, to test the data visualization, the only requirements in terms of views are related to aggregations of total sales per artists and per year and month. You will create these materialized views on top of the current star schema in the serving layer, luckily `dbt` has already been set up to run alongside the Redshift cluster for modeling.

3.2.1. In the `./dbt_modelling/models` folder, create a new subfolder called `bi_views`. Inside the new subfolder, create a `schema.yml` file to define the schema for the views:

```yaml
version: 2

models:
  - name: sales_per_country_vw
    description: "Sales per country view"
    columns:
      - name: session_month
      - name: session_year
      - name: country_code
      - name: total_sales
```

3.2.2. Create file `sales_per_artist_vw.sql` in the same folder and copy the `SELECT` statement for the model:

```sql
SELECT
date_part('year', fs.session_start_time) AS session_year,
da.artist_name,
SUM(fs.price) AS total_sales
FROM {{var("target_schema")}}.fact_session fs
LEFT JOIN {{var("target_schema")}}.dim_artists da
ON fs.artist_id = da.artist_id
GROUP BY 1,2
```

Save changes.

3.2.3. Create file `sales_per_country_vw.sql` in the same folder and copy the `SELECT` statement for the model:

```sql
SELECT
date_part('month', fs.session_start_time) AS session_month,
date_part('year', fs.session_start_time) AS session_year,
du.country_code,
SUM(fs.price) AS total_sales
FROM {{var("target_schema")}}.fact_session fs
LEFT JOIN {{var("target_schema")}}.dim_users du
ON fs.user_id = du.user_id
GROUP BY 1,2,3
```

Save changes.

3.2.4. Open file  `./dbt_modelling/dbt_project.yml`. Check the following variables in the file that simplify definition of the model for each views:

```yaml
vars:
  source_schema: deftunes_transform
  target_schema: deftunes_serving
```

3.2.5. Add the `bi_views` model in the models section.

```yaml
    bi_views:
      +materialized: view
      +schema: bi_views
```

Save changes.

3.2.6. Go to CloudFormation Outputs tab and copy the value of the key `DBTBucket`. Replace the placeholders `<DBTBucket>` with it (in two places). Run the commands in the VSCode terminal to copy the new models and `dbt_project.yml` into the S3 bucket. The Airflow pipelines will point to this bucket when they run *dbt* as a final step of their processes. 

*Note:* Before you execute the following commands, make sure that inside the `dbt_modeling` folder (or any of its subfolders) there is no folder named `.ipynb_checkpoints`. This folder appears when you modify any file by using the Jupyter's UI. Make sure to delete those checkpoint folders in your local environment and only then execute the commands.

```yaml
aws s3 cp $HOME/workspace/dbt_modeling/models/bi_views s3://<DBTBucket>/dbt_project/dbt_modeling/models/bi_views --recursive

aws s3 cp $HOME/workspace/dbt_modeling/dbt_project.yml s3://<DBTBucket>/dbt_project/dbt_modeling/dbt_project.yml
```

<a name='4'></a>
## 4 - Orchestration with Apache Airflow

<a name='4.1'></a>
### 4.1 - Accessing Apache Airflow

4.1.1. In CloudFormation Outputs tab, search for the `AirflowDNS`; copy the value and paste it into another browser tab. This is the Apache Airflow environment that you can use to develop and run your dags. In the login page, use `airflow` for both, the user and password. You will see that there is already one DAG named `deftunes_songs_pipeline_dag`; this is an example DAG to show you the usage of several types of operators: [the `GlueJobOperator`](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/glue/index.html#airflow.providers.amazon.aws.operators.glue.GlueJobOperator), [the `GlueDataQualityRuleSetEvaluationRunOperator`](https://airflow.apache.org/docs/apache-airflow-providers-amazon/8.26.0/_api/airflow/providers/amazon/aws/operators/glue/index.html#airflow.providers.amazon.aws.operators.glue.GlueDataQualityRuleSetEvaluationRunOperator), and [`DockerOperator`](https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html#airflow.providers.docker.operators.docker.DockerOperator).

*Note:* In case your Apache Airflow environment presents any issues, you can always 
restart it by running the following bash script:

```bash
bash ./scripts/restart_airflow.sh
```

This process will end when the service is healthy. That should take less than 3 minutes.

<a name='4.2'></a>
### 4.2 - DAG for Songs Data in RDS Source

4.2.1. Open the file `./dags/deftunes_songs_pipeline.py`. This file corresponds to the `deftunes_songs_pipeline_dag` that is already deployed in Airflow and has the tasks dependencies shown in the image:

![songs_dag](./images/deftunes_songs_dag.png)

4.2.2. The deployed DAG is not completed. Replace the following placeholders with the values of the Terraform outputs:

- `<DATA-LAKE-BUCKET>` with the value of `data_lake_bucket`.
- `<SCRIPTS-BUCKET>` with the value of `scripts_bucket`.
- `<GLUE-EXECUTION-ROLE>` with the value of `glue_role_arn`, full ARN.

Go through the comments in the file to understand the tasks.

4.2.3. Let's update and execute this DAG. Exchange the placeholder `<DAGS-BUCKET>` with the value of the key `DagsBucket` in CloudFormation Outputs and run the command in the VSCode terminal:

```bash
aws s3 cp $HOME/workspace/dags/deftunes_songs_pipeline.py s3://<DAGS-BUCKET>/dags/deftunes_songs_pipeline.py
```

4.2.4. Then, go to the Airflow UI. Remember to press the toggle button to unpause it and it should start automatically. Click on each task and in the logs of each task to understand what they are doing. You will see similar logs to the following ones:

- **Glue jobs**. Logs from the Glue jobs will look similar to the following image:

![glue_job_task](./images/glue_job_task_output.png)

You can see that the `GlueJobOperator` is continuously requesting the status of the current glue job run until it is `SUCCEEDED` or `FAILED`.

- **Data Quality checks**. Logs from Data quality check tasks would look similar to:

![data_quality_task](./images/data_quality_task_output.png)

The `GlueDataQualityRuleSetEvaluationRunOperator` will show some metrics regarding the quality rules that you created and will show a `PASS` or `FAILED` message for each column depending if a particular column holds the imposed rule or not.

- **DBT with DockerOperator**. The Logs from the `DockerOperator` that runs `dbt` will look like the following image:

![dbt_task](./images/dbt_task_output.png)

The output will be quite similar to the `dbt` output that you have seen in previous labs, showing you each table in the schema that you are creating and the status of the execution. This DAG should perform a backfilling process for 2 months.

<a name='4.3'></a>
### 4.3 - DAG for Users and Sessions Data from API Source

Now you need to create the DAG to orchestrate the extraction, transformation, quality checks and creation of star schema for the data obtained from the two API endpoints: users and sessions. This is the diagram:

![deftunes_api_dag](./images/deftunes_api_dag.png)

4.3.1. Open the file `./dags/deftunes_api_pipeline.py`. Replace the following placeholders with the values of the Terraform outputs:

- `<DATA-LAKE-BUCKET>` with the value of `data_lake_bucket`.
- `<SCRIPTS-BUCKET>` with the value of `scripts_bucket`.
- `<GLUE-EXECUTION-ROLE>` with the value of `glue_role_arn`, full ARN (in two places).

Also, replace `<API-ENDPOINT>` placeholder with the value of the key `APIEndpoint` in CloudFormation Outputs.

Go through the comments in the file to understand the tasks.

4.3.2. Exchange the placeholder `<DAGS-BUCKET>` with the value of the key `DagsBucket` in CloudFormation Outputs and run the command in the VSCode terminal:

```bash
aws s3 cp $HOME/workspace/dags/deftunes_api_pipeline.py s3://<DAGS-BUCKET>/dags/deftunes_api_pipeline.py
```

Now, go to your Airflow UI and refresh it or wait until you see the new DAG named `deftunes_api_pipeline_dag`. Click on the toggle button to unpause it and it should start executing automatically. It will perform a backfilling extracting the data from the first available date. This DAG should perform a backfilling process for 3 months.

<a name='5'></a>
## 5 - Data Visualization with Apache Superset

Finally, to incorporate data visualization to the current data architecture, an EC2 instance has been set up for you to work with Apache Superset, the URL is among the CloudFormation outputs. 

5.1. Access the Superset UI using the URL provided in the CloudFormation Outputs, you should see a login screen like this:

![superset_login](images/superset_ui.png)

Login using the following credentials: 

* `user`: admin
* `password`: admin

5.2. Now, you will configure the Redshift connection, click on the dropdown `Settings` menu in the top right, under the Data section select `Data Connections`. Click on the top right `+ Database` button, a new menu should appear to configure the new connection:

![superset_conf](images/superset_conf.png)

Use the `Choose database...` dropdown menu, the first option should be **Amazon Redshift**, select it and click Next. Go down to the link that says `Connect this database with a SQLAlchemy URI string instead`, click it and then input the string inputted by the following cell:

In [None]:
print(redshift_connection_url)

5.3. You can test the connection or just click on the CONNECT button to finally perform the connection. After that, select the Datasets tab in the top header menu, you will be directed to the following page:

![dataset_ui](images/datasets_ui.png)

Click on the `+ DATASET` button on the top right, and a new screen will appear, you can use the connection that you just configured for Amazon Redshift, then select the business views schema `deftunes_bi_views` and finally one of the views.

![dataset_menu](images/dataset_menu.png)

Then click on the **CREATE DATASET AND CREATE CHART** button, you will be directed to a new page to create a chart based on the dataset. Once you are done with the chart, hit the **Save** button on the top right, it will ask you to give the chart a name and then save it. Create a chart for each view then create a new dashboard in the *Dashboards* section of the top navigational header, using the `+ Dashboard` button. Enter a name (in the top left part) for your dashboard and then drag and drop the charts you created earlier onto the dashboard canvas, Resize and arrange the charts as desired to create your dashboard layout and finally click **Save** to save your dashboard layout.


During the second and final part of the capstone, you enhanced the existing data architecture for DeFtunes' new business operation. You implemented data quality checks to systematically evaluate the cleansed data, added orchestration among the various pipeline components, and incorporated data visualization and materialized views for end users. You have successfully experimented with a set of data engineering tools to develop a comprehensive data project. In the future, you will be required to work with similar tools. It is essential to understand the underlying principles and requirements of each tool, and, in due time, incorporate them into your skill set as a data engineer. 

Congratulations on completing the capstone project!