Skip to content

mharrisb1/daglib

Repository files navigation

⚗️ Daglib - Lightweight DAG composition framework

PyPI version PyPI - Downloads PyPI - Python Version Code style: black Checked with mypy pre-commit

Daglib is a lightweight, embeddable parallel task execution library used for turning pure Python functions into executable task graphs.

Installation

Core

pip install daglib

With visualizations enabled

pip install 'daglib[graphviz]'  # static visualizations
# or
pip install 'daglib[ipycytoscape]'  # interactive visulizations

Create your first DAG

import daglib

dag = daglib.Dag()


@dag.task()
def task_1a():
    return "Hello"


@dag.task()
def task_1b():
    return "world!"


@dag.task()
def task_2(task_1a, task_1b):
    return f"{task_1a}, {task_1b}"


dag.run()
'Hello, world!'

Beyond the "Hello, world!" example

For a more involved example, we will create a small pipeline that takes data from four source tables and creates a single reporting table. The data is driver-level information from the current 2022 Formula 1 season. The output will be a pivot table for team-level metrics.

Source Tables

  1. Team - Team of driver
  2. Points - Current total Driver's World Championship points for each driver for the season
  3. Wins - Current number of wins for each driver for the season
  4. Podiums - Current number of times the driver finished in the top 3 for the season
import pandas as pd
import daglib

# Ignore. Used to render the DataFrame correctly in the README
pd.set_option("display.notebook_repr_html", False)

dag = daglib.Dag()


@dag.task()
def team():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        team=["Red Bull", "Ferrari", "Mercedes", "Red Bull", "Ferrari", "Mercedes"],
    )).set_index("driver")


@dag.task()
def points():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        points=[258, 178, 146, 173, 156, 158]
    )).set_index("driver")


@dag.task()
def wins():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        wins=[8, 3, 0, 1, 1, 0]
    )).set_index("driver")


@dag.task()
def podiums():
    return pd.DataFrame(dict(
        driver=["Max", "Charles", "Lewis", "Sergio", "Carlos", "George"],
        podiums=[10, 5, 6, 6, 6, 5]
    )).set_index("driver")


@dag.task()
def driver_metrics(team, points, wins, podiums):
    return team.join(points).join(wins).join(podiums)


@dag.task()
def team_metrics(driver_metrics):
    return driver_metrics.groupby("team").sum().sort_values("points", ascending=False)


dag.run()
          points  wins  podiums
team
Red Bull     431     9       16
Ferrari      334     4       11
Mercedes     304     0       11

Task Graph Visualization

The DAG we created above will create a task graph that looks like the following

task graph