Skip to content

This package helps to reduce the amount of boilerplate code when creating Airflow DAGs from Python callables.

License

Notifications You must be signed in to change notification settings

michaelosthege/apache-airflow-flowitems

Repository files navigation

PyPI version pipeline coverage

apache-airflow-flowitems

This project helps to reduce the amount of boilerplate code when writing Airflow DAGs with lots of Python callables.

For example, consider the following DAG:

graph LR;
    all_ready-->get_city;
    get_city-->get_temperature;
    get_temperature-->summarize;
    get_runid-->summarize;
Loading

From the following Python callables, there are multiple ways to create the above DAG. But what if weather_reports.py is owned by another project, so we can't @task decorate it?

# file: weather_reports.py
def get_city():
    return "London"

def get_temperature(city):
    return {
        "London": 10,
        "Paris": 15,
    }


# file: airflow_weather.py
def get_runid(context, **kwargs):
    kwargs.setdefault("city", "London")
    dagrun = context["run_id"]
    return f"{city}_{dagrun}"

def summarize(runid, temperature):
    return {
        "report_id": runid,
        "text": f"The temperature in {city} is {temperature} °C.",
    }

With apache-airflow-flowitems, the DAG definition can be very concise:

from apache_airflow_flowitems import PythonItem
import weather_airflow as gluecode
import weather_reports as external

with airflow.DAG(...):
    t_ready = BashOperator(bash_command="...", task_id="all_ready")
    # Upstream dependencies can be passed as 👇 args
    t_city = PythonItem(external.get_city, t_ready)()
    # Function parameters must be passed as kwargs 👇
    t_temp = PythonItem(external.get_temperature, city=t_city)(task_id="temp_London")

    t_id = PythonItem(gluecode.get_runid)()
    t_summary = PythonItem(gluecode.summarize, runid=t_id, temperature=t_temp)

For comparison, the 2nd task manually wrapped in a @task decorator would look like this:

    t_temp = task(task_id="temp_London")(external.get_temperature)(city=t_city.output)
    t_ready >> t_temp

Okay, it's not that much shorter, but FlowItems can do more cool stuff:

  • Inputs can be XComArg or Operator, whereas the @task decorator only supports Operator
  • If the callable passed to a PythonItem has a context argument, the Airflow context dict will be passed. In contrast to @task, this means that the function may ALSO take **kwargs.

But in the end this is, of course, just a slightly different syntax for writing DAGs.

I'm open-sourcing this package mostly to make it easier for myself to use it in different projects.

Installation

pip install apache_airflow_flowitems

Contributing

First make sure that you can run the tests:

pip install -r requirements-dev.txt
pytest --cov=./apache_airflow_flowitems --cov-report xml --cov-report term-missing .

Also set up pre-commit for automated code style enforcement:

pip install pre-commit
pre-commit install

About

This package helps to reduce the amount of boilerplate code when creating Airflow DAGs from Python callables.

Topics

Resources

License

Stars

Watchers

Forks

Languages