# Week 2 Workflow Orchestration

Note: DTC DE Zoomcamp is using Mage as its orchestrator for the 2024 class cohort.

## What is workflow orchestration?
A large part of data engineering is **extracting**, **transforming**, and **loading** data between sources.

```mermaid
---
title: Data engineering lifecycle
---
flowchart LR
    A[Data generation] --> B
    subgraph "Storage"
    B[Ingestion] --> C[Tranformation] --> D[Serving]
    end
    D --> E(Analytics)
    D --> F(Machine Learning)
    D --> G(Reverse ETL)
```

**Orchestration** is the process of dependency management between these sources, facilitated through automation.

The data **orchestrator** manages scheduling, triggering, monitoring, and resource allocation.
- Every **workflow** requires *sequential* steps or tasks.
- Workflows are typically represented as DAGS (directed acyclic graphs).

A good orchestrator handles workflow management, automation, error handling and recovery, monitoring and alerting, resource optimization, observability and debugging, and compliance and auditing.

## Intro to Mage
Mage is an open-source pipeline tool for orchestrating, transforming, and ingregrating data.

```mermaid
---
title: Mage Workflow
---
flowchart LR
    A[Projects] --> B[Pipelines] --> C[Blocks]
    C --> D[Load]
    C --> E[Transform]
    C --> F[Export]
```

- A **project** contains the code for all pipelines, blocks, and other assets (the "repo" of your orchestrator).

- A **pipeline** is a workflow that executes some data operation (extracting, transforming, loading), also called a DAG. Each pipeline is represented by a YAML file in the "pipelines" fodler of your project.

- A **block** is a chunk of code (SQL, Python, or R) that can be executed independently or within a pipeline. Together, blocks form DAGs called pipelines. A block won't start running in a pipeline until all its upstream dependencies are met.

# Configuring Mage
1. `git clone` the repo at https://github.com/mage-ai/mage-zoomcamp
2. Pull the latest Mage image with `docker pull mageai/mageai:latest`.
3. Run `docker compose build` to set up the environment.
4. Run `docker compose up` to run the environment.
5. Navigate to [localhost:6789](http://localhost:6789) to enter the Mage environment.

## Configuring Postgres to work with Mage

```yaml
postgres:
    image: postgres:15
    restart: on-failure
    container_name: ${PROJECT_NAME}-postgres
    env_file:
        - .env
    environment:
        POSTGRES_DB: ${POSTGRES_DBNAME}
        POSTGRES_USER: ${POSTGRES_USER}
        POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    port:
        - "${POSTGRES_PORT}:5432"
```

In Mage, connections are managed in `io_config.yml`. Connections can have different profiles, the default profile is `default`. To add another profile:
```yaml
dev:
    POSTGRES_CONNECT_TIMEOUT: 10
    POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}"
    POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}"
    POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}"
    POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}"
    POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}"
    POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}"
```

Note that injecting variables into `io_config.yml` is done with Jinja template tags.

Next test the Postgres connection by creating a SQL block, set it to use Postgres SQL, the `dev` profile, and check `Use raw SQL`. Then run a simple query `SELECT 1` against the database to check that it's connecting.


## Loading data from API to Posgres

#### Loading the data 
1. Create a new batch pipeline named `api_to_postgres`.
2. Create a Data Loader block with an API Template named `load_api_data`.
3. Update the block with:
```Python
@data_loader
def load_data_from_api(*args, **kwargs):
    url = ""
    # declare data types ahead of import
    taxi_dtypes = {
        'VendorID': pd.Int64Dtype(),
        'passenger_count': pd.Int64Dtype(),
        'trip_distance': float,
        'RatecodeID':pd.Int64Dtype(),
        'store_and_fwd_flag':str,
        'PULocationID':pd.Int64Dtype(),
        'DOLocationID':pd.Int64Dtype(),
        'payment_type': pd.Int64Dtype(),
        'fare_amount': float,
        'extra':float,
        'mta_tax':float,
        'tip_amount':float,
        'tolls_amount':float,
        'improvement_surcharge':float,
        'total_amount':float,
        'congestion_surcharge':float
    }

    parse_dates =['tpep_pickup_datetime', 'tpep_dropoff_datetime']

    return pd.read_csv(url, sep=',', compression='gzip', dtype=taxi_dtypes, parse_dates=date_columns)
```
4. Run the block.

#### Transform the data
1. Create a Transformer block named `transform_taxi_data`.
2. Update the block with:
```Python
def transform(data, *args, **kwargs):
    print(f"Preprocessing: rows with zero passengers: {data['passenger_count'].isin([0]).sum()}")

    return data[data['passenger_count'] > 0]

@test
def test_output(output, *args, **kwargs):
    assert output['passenger_count'].isin([0]).sum() == 0, 'There are rides with zero passenger.'
```

#### Export the data
1. Create a Data Exporter block using a Postgres template named `taxi_data_to_postgres`.
2. Update the code block:
```Python
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
    schema_name = 'ny_taxi'
    table_name = 'yellow_cab_data'
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'dev'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df, 
            schema_name,
            table_name,
            index=False
            if_exists='replace'
        )
```
3. Run the block.

## Configuring GCP in Mage
Set up storage in Google Cloud:
1. Create a new Cloud Storage bucket `<unique bucker name>` and check "Enforce public access prevention on this bucket" to make your bucket private.
2. Create a GCP service account for Mage and create a new key and copy into the local project directory.
3. Then within Mage, in `io_config.yaml`, set `GOOGLE_SERVICE_ACC_KEY_FILEPATH=<credentials .json key file>`
4. Test the connection by creating a Data Loader block and executing SQL `SELECT 1`.

## Loading data from API to GCP
1. Create a new batch pipeline and add the `load_api_data` block and `transform_taxi_data` block from the previous steps.
2. Create a Data Export with Google Cloud Storage template called `taxi_to_gcs_parquet`.
3. Update the block with:
```Python
@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = '<gcs bucket name>'
    object_key = '<gcs bucket key>'

    GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df,
        bucket_name,
        object_key,
    )
```
4. Run the block and all upstream blocks.

This will write the data to one large parquet file. If the data is large enough, it's advised to split the data into **partitioned** parquet files. Datasets are often paritioned by data dimensions or time periods.

To write to partitioned parquet files, create a `Data Exporter` block attached to the `Transformer` block with a generic template named `taxi_to_gcs_partitioned` and update the block with:
```Python
import pyarrow as pa
import pyarrow.parquet as pq
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '<path to creds.json>'
project_id = '<gcs project id>'
bucket_name = '<gcs bucket name>'
object_key = 'ny_taxi_data.parquet'
table_name = 'ny_taxi_data'
root_path = f'{bucket_name}/{table_name}'


@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
    # create a date column from tpep_pickup_datetime column
    df['tpep_pickup_date'] = df['tpep_pickup_datetime'].dt.date

    table = pa.Table.from_pandas(df)

    gcs = pa.fs.GcsFileSystem()

    pq.write_to_dataset(
        table,
        root_path=root_path,
        parition_cols=['tpep_pickup_date'],
        filesystem=gcs
    )
```

## Loading data from GCS to BigQuery
The next step is to take the data stored in parquet files and write it to a OLAP database for further analytics.

1. Create a batch pipeline named `gcs_to_bigquery`.
2. Create a `Data Loader` block named `load_taxi_gcs` and fill in the `bucket_name` and `object_id`. Run the block to load the data.
3. Create a `Transformer` block named `transform_staged_data` where we will standardize the column names with:
```Python
@transformer
def transform(data, *args, **kwargs):
    data.columns = data.columns.str.replace(' ', '_').str.lower()

    return data
```
4. Create a `Data Exporter` block with the SQL template named `write_taxi_to_bigquery`. Use a BigQuery connection with the `default` profile and set the schema to `ny_taxi` and the table to `yellow_cab_data`. Then to export data using SQL in Mage, we just have to select the dataframe from the previous step:
```SQL
SELECT * FROM {{ df_1 }}
```

## Scheduling in Mage
In Mage, scheduling when and how often we run our pipelines are done with **Triggers**. Triggers can be run according to a **Schedule**, a specific **Event**, or via an  **API**.

For this example, we will use a Schedule type Trigger. Click `Schedule` to open a new Trigger; name the trigger `gcs_to_bigquery_schedule`, set the frequency to `daily`, and set a start date. A Mage Schedule Trigger also allows the use of more complex schedules with a [Cron expression](https://docs.oracle.com/cd/E12058_01/doc/doc.1014/e12030/cron_expressions.htm). 

Save the changes to the Trigger and click `Enable trigger`. Now the Trigger is active and ready to run!

## Parameterized Execution
We can pass variables or parameters through entire DAGs when loading data in Mage in a couple different ways. The first option is to pass variables between blocks via `**kwargs**`. For example:

```Python
def export_taxi_data_to_gcp_parameter(df: DataFrame, **kwargs) -> None:

    # we can access variables passed through **kwargs by
    now = kwargs.get('execution_date')
```

Another options is set pipeline global variables through the pipeline editor itself or runtime variables in the Triggers page of the GUI.

## Backfills
**Backfills** allow us to run a pipeline multiple times with different parameters. This is usually used to populate historical or lost data especially when pipelines depend on date and time windows. For example, if a pipeline that collects daily timestamped data goes down for a couple of days, we can use backfills to rerun the pipeline for those outage days to *backfill* the missing data.

Navigate to the Backfills tab on the left navigation bar in the Mage GUI and `Create a new backfill`.

From here, we can configure the Backfill to run our pipeline a specific number of times based on a start date, an end date, and an interval.

