# 03.1 Introduction to orchestration with Dagster

## The Dagster orchestration model


Dagster is a data orchestration framework that manages the flow of data and computation across pipelines (called jobs) made up of reusable components (either assets or ops).

Unlike the schedulers that consider each element of the graph a task (like Airflow), Dagster has different types of nodes that describe how data is produced and cosumed. [Source](https://docs.dagster.io/getting-started/concepts)

The main types we are going to be using in this lesson are:

- [Assets](https://docs.dagster.io/guides/build/assets): An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. An asset definition is a description, in code, of an asset that should exist and how to produce and update that asset. 

- [Resources](https://docs.dagster.io/guides/build/external-resources): Dagster resources are objects used by Dagster assets and ops that provide access to external systems, databases, or services. For example, a simple ETL (Extract Transform Load) pipeline fetches data from an API, ingests it into a database, and updates a dashboard.

- [IO Managers](https://docs.dagster.io/guides/build/io-managers): I/O managers in Dagster allow you to keep the code for data processing separate from the code for reading and writing data.  These are considered a special type of resource.

- [Jobs](https://docs.dagster.io/guides/build/jobs): Jobs are the main unit of execution and monitoring in Dagster. They allow you to execute a portion of a graph of asset definitions or ops based on a schedule or an external trigger.

- [Schedule](https://docs.dagster.io/guides/automate/schedules): Schedules enable automated execution of jobs at specified intervals. These intervals can range from common frequencies like hourly, daily, or weekly, to more complex patterns defined using cron expressions.

## Project structure

The dagster project that we are going to build during this course is located at `aw-dwh/projects/dagster/adventureworks-orchestration`, you can find more information about how to create a new project in the [official documentation](https://docs.dagster.io/guides/build/projects/creating-a-new-project).

The project is structured as follows


```
adventureworks_orchestration/
┣ assets/ <- define the assets here
┣ resources/ <- define the resources here
┣ __init__.py
┣ constants.py
┣ definitions.py <- load definitions from assets, resources, jobs, etc..
┗ jobs.py <- define the jobs here
    
```

## Defining resources and IO managers

As stated before Dagster resources are an abstraction over external services that are used to construct assets, while IO managers handle how these assets are read and written to external storage

In this case, since we are going to be automating the code from the previous lesson we only need a Spark session with access to S3.

Luckily for us, dagster has handful of pre-made resources that we can use, including a PySpark one, we just need to configure it to access the our S3 compatible storage (MinIO)


In [None]:
# definitions.py

from dagster_pyspark import pyspark_resource
import os


@definitions
def defs():

    configured_pyspark = pyspark_resource.configured(
        {
            "spark_conf": {
                "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
                "fs.s3a.access.key": os.getenv("AWS_ACCESS_KEY_ID"),
                "fs.s3a.secret.key": os.getenv("AWS_SECRET_ACCESS_KEY"),
                "fs.s3a.endpoint": os.getenv("AWS_ENDPOINT"),
                "fs.s3a.region": os.getenv("AWS_REGION")
            }
        }
    )

    return Definitions(
        # other code...
        resources={
            "spark_s3_rsc" : configured_pyspark
        },
    )

Then we can use this configured PySpark session to handle the IO of our assets, in this case we are going to be saving our assets to CSV files so we can easily check.

To define our custom resource we can create a `csv_io_manager` inside the `resources` folder 

In [None]:
# resources/csv_io_manager.py

import os
from typing import Union
import dagster as dg
from dagster_pyspark.resources import PySparkResource
from pyspark.sql import DataFrame


class PartitionedCsvIOManager(dg.ConfigurableIOManager):
    
    pyspark: dg.ResourceDependency[PySparkResource]

    @property
    def _base_path(self):
        raise NotImplementedError()

    def handle_output(self, context: dg.OutputContext, obj: DataFrame):
        path = self._get_path(context)

        # code to save the asset to external storage here...

        # add metadata to the asset here...
        context.add_output_metadata({"path": path})


    def load_input(self, context) -> DataFrame:
        path = self._get_path(context)

        # code to read the asset from external storage here...

    
    def _get_path(self, context: Union[dg.InputContext, dg.OutputContext]):
        return os.path.join(self._base_path, *context.asset_key.parts)


class S3PartitionedCsvIOManager(PartitionedCsvIOManager):
    s3_bucket: str

    @property
    def _base_path(self):
        return "s3a://" + self.s3_bucket


Then we can also register this IO manager in our definitions

In [None]:
# definitions.py

return Definitions(
    assets=[*sample_assets],
    jobs=jobs,
    resources={
        
        "spark_s3_rsc" : configured_pyspark,
        "s3_test_io_manager": S3PartitionedCsvIOManager(
            pyspark=configured_pyspark,
            s3_bucket="test"
        )
    },
)

## Defining the assets

The assets in Dagster represents data in the persistent storage, from the last lesson we had local files, then we uploaded those files, and then we transformed those and created reports from the transformed data.


So the dependencies between our assets should look something like this:

[Source] -> [Transformed] -> [Reports]

Lets start with the source assets. In this lesson we will be creating a `sample` folder inside the `assets` folder, to group all the assets for this lesson, this is a recommended practice, please note that the folder should be a python module (has a `__init__.py` file)

### Our first assets

In [None]:
# assets/sample/assets.py

import dagster as dg
from adventureworks_orchestration.constants import ASSET_GROUP_LABS
from .constants import ASSET_PREFIX_SOURCE, ASSET_PREFIX_TRANSFORMED, ASSET_PREFIX_REPORTS
from dagster_pyspark import PySparkResource
from pyspark.sql import DataFrame


files_path = "/home/iceberg/notebooks/data/bookings"
files = ["bookings", "facilities", "members"]


@dg.multi_asset(
    outs={
        file: dg.AssetOut(
            io_manager_key="s3_test_io_manager",
            key=[ASSET_GROUP_LABS, ASSET_PREFIX_SOURCE, file],
            group_name=ASSET_GROUP_LABS,
            is_required=False
        ) for file in files
    },
    can_subset=True
)
def bookings_source_files(context: dg.AssetExecutionContext, spark_s3_rsc: PySparkResource):
    spark = spark_s3_rsc.spark_session

    has_errors = False
    
    for file in context.selected_output_names:
        try:

            # read the file from local storage using PySpark
            # df = ?

            context.log.info(f"Finshed reading file {file}.csv")

            yield dg.Output(
                output_name=file,
                value=df
            )

        except Exception:
            context.log.exception(f"Couldn't materialize the asset {file}")
            has_errors = True

    if has_errors:
        raise Exception("Errors while materializing the assets")
    

Lets break down the code

The `multi_asset` decorator indicates that the method `bookings_source_files` produces multiple assets

The `outs` parameters is a dictionary for specifying the properties of each one of the assets the method produces, in this case all assets will be saved to CSV files so we use the same `io_manager_key` which refers to our custom CSV IO manager implemented early. The `key` of the asset is custom string, in this case a string made of different parts `ASSET_GROUP_LABS`, `ASSET_PREFIX_SOURCE` and the file name; this way we make sure the naming of assets stay consistent with the format `labs/source/{file}`. The `group_name` parameter is to group assets, which comes handy when defining jobs later. The `is_required` indicates if method should raise an error if the asset is not returned.

We also passed a `can_subset=True` which for a multi asset means that the method can be used to materialize only a subset of the assets.

In the body of the method we got the spark session from the PySparkResource we defined earlier. Then for each one of the assets we need to materialize we need to read them from the local storage. Once the file is read into a DataFrame we can yield is value using the `Output` class. Once Dagster receives the `Output` object will pass it to the IO manager we defined which will save the asset to MinIO

### Dependencies between assets

We can declare dependencies between assets using the `ins` parameter in the `asset` decorator, we just need to specify the key of the asset and the IO manager will automatically retrieve it from the storage for us

In [None]:
# assets/sample/assets.py

@dg.asset(
    key=[ASSET_GROUP_LABS, ASSET_PREFIX_TRANSFORMED, "members"],
    group_name=ASSET_GROUP_LABS,
    ins={
        "members": dg.AssetIn(key=[ASSET_GROUP_LABS, ASSET_PREFIX_SOURCE,"members"])
    },
    io_manager_key="s3_test_io_manager"
)
def bookings_transformed_members(context: dg.AssetExecutionContext, members: DataFrame):

    # transform the members dataframe ...
    
    return dg.Output(value=result)

### Registering the assets

We need to register our assets in the definition file

In [None]:
import dagster as dg
from .assets import sample
#...

@dg.definitions
def defs():
    #...

    sample_assets = dg.load_assets_from_package_module(sample) # load assets from the module
    
    return dg.Definitions(
        assets=[*sample_assets],
        resources={
            "spark_s3_rsc" : configured_pyspark,
            "s3_test_io_manager": S3PartitionedCsvIOManager(
                pyspark=configured_pyspark,
                s3_bucket="test"
            )
        },
    )

## Defining a job

A job definition is very easy to do, you just need a name and select the assets that you want to materialize in the job. In this case we are selecting all the assets in the group `ASSET_GROUP_LABS` we defined at the beginning.

In [None]:
# jobs.py

sample_job = dg.define_asset_job(
        "sample_job",
        selection=AssetSelection.groups(ASSET_GROUP_LABS)
    )

jobs = [sample_job]

Then we register these jobs in our definitions file

In [None]:
from .jobs import jobs

#...

@dg.efinitions
def defs():
    #...
    
    return dg.Definitions(
        assets=[*sample_assets],
        jobs=jobs,
        resources={
            "spark_s3_rsc" : configured_pyspark,
            "s3_test_io_manager": S3PartitionedCsvIOManager(
                pyspark=configured_pyspark,
                s3_bucket="test"
            )
        },
    )

## Scheduling

Creating a schedule is very easy with the `ScheduleDefinition` class, you just need to define the job you want to schedule and the cron expression that should trigger it. Then you will need to register the schedule in your definitions file as well

In [None]:
five_minutes_schedule = dg.ScheduleDefinition(
    job=sample_job,
    cron_schedule="*/5 * * * *",
)

## Using Dagster server to run your pipeline

If your dagster code is fine you should be able to see it in the Dagster manager "Definitions" tab with a `Loaded` status

![](./imgs/dagster/definitions.png)

From the manager you can also see your assets as well as their dependency lineage from the "Assets" tab 

![](./imgs/dagster/assets.png)

![](./imgs/dagster/lineage.png)

You can also control the jobs from the "Jobs" tab

![](./imgs/dagster/jobs.png)