Dagster tutorial
https://docs.dagster.io/tutorial

In [None]:
import requests
import csv
from dagster import pipeline, solid, execute_pipeline

"Solid" is dagster term for a single computational task in a pipeline. Preferred syntax is via the `@solid` function decorator.

Example just returns lines on a CSV file downloaded on the fly. `context` argument is provided by dagster when workflow executes; used here for logging.

In [None]:
@solid
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals

Likewise *pipeline*s are functions decorated with `@pipeline`.

Pipelines specify dependency DAG of solids; recall solids refer to jobs, not data (inputs/outputs).
Can execute sets of solids without running an entire pipeline (recovery). 

*IOManager*s are used to write output of a solid to persistent storage; encapsulates I/O & separated from pipeline logic.

In [None]:
@pipeline
def hello_cereal_pipeline():
    hello_cereal()

Run it:

In [None]:
result = execute_pipeline(hello_cereal_pipeline)

In [None]:
print(result)

Now connect two solids. First one doesn't do logging, so we don't need `context`. 

Pipeline just through function composition; apparently don't need to pass `context` explicitly (always assumed first arg?)

Diamond-pattern pipeline. Is dagster generating the actual DAG from the AST of the deocrated pipeline function? That'd be cool.

In [None]:
@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]

@solid
def find_highest_calorie_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    return sorted_cereals[-1]["name"]

@solid
def find_highest_protein_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    return sorted_cereals[-1]["name"]

@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")

@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

In [None]:
result = execute_pipeline(complex_pipeline)

In [None]:
result

Run-time configuration for each solid is passed through context. (Keyed by solid name -- what if we want to call a solid multiple times?) Support for validation schemas.

Following specifies `url` from config.

In [None]:
@solid(config_schema={"url": str})
def download_csv(context):
    response = requests.get(context.solid_config["url"])
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]

@solid
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal["calories"])
    )
    context.log.info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')

@pipeline
def configurable_pipeline():
    sort_by_calories(download_csv())

Config provided at runtime potentially through multiple methods:

In [None]:
run_config = {
    "solids": {
        "download_csv": {
            "config": {"url": "https://docs.dagster.io/assets/cereal.csv"}
        }
    }
}
result = execute_pipeline(configurable_pipeline, run_config=run_config)

In [None]:
import dataclasses as dc
import textwrap
import requests
import csv

import dagster as dg
import dagster_shell

@dc.dataclass
class ConfigClass():
    url: str
    sort_field: str

@dg.solid(
    output_defs=[dg.OutputDefinition(name="config_dc", dagster_type=ConfigClass)]
)
def parse_context(context):
    return ConfigClass(
        url=context.solid_config["url"],
        sort_field=context.solid_config["sort_field"]
    )
    
@dg.solid(
    input_defs=[dg.InputDefinition(name="config_dc", dagster_type=ConfigClass)],
    output_defs=[dg.OutputDefinition(name="cereals", dagster_type=dg.List)]
)
def download_csv(config_dc):
    response = requests.get(config_dc.url)
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]

@dg.solid(
    input_defs=[
        dg.InputDefinition(name="config_dc", dagster_type=ConfigClass),
        dg.InputDefinition(name="cereals", dagster_type=dg.List)
    ],
    output_defs=[dg.OutputDefinition(name="cereal_name", dagster_type=dg.String)],
)
def sort_by_calories(config_dc, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal[config_dc.sort_field])
    )
    return sorted_cereals[-1]["name"]

@dg.solid
def assemble_command(cereal_name: str):
    return textwrap.dedent(f"""
        echo "{cereal_name} is a cool thing."
    """)
@dg.composite_solid(
    input_defs=[dg.InputDefinition(name="cereal_name", dagster_type=dg.String)]
    # output_defs=[dg.OutputDefinition(str, "result")]
)
def template_shell(cereal_name):
    shell_command = assemble_command(cereal_name)
    result = dagster_shell.shell_solid(shell_command)

@dg.pipeline
def test_pipes():
    config_dc = parse_context()
    template_shell(sort_by_calories(config_dc, download_csv(config_dc)))
    
run_config = {
    "solids": {
        "parse_context": {
            "config": {
                "url": "https://docs.dagster.io/assets/cereal.csv",
                "sort_field": "calories"
            }
        }
    }
}
result = dg.execute_pipeline(test_pipes, run_config=run_config)

In [None]:
@dg.solid(config_schema={"n": int})
def add_n(context, number: int):
    return number + context.solid_config["n"]


@dg.solid(config_schema={"m": int})
def multiply_by_m(context, number: int):
    return number * context.solid_config["m"]


@dg.composite_solid(input_defs=[dg.InputDefinition("number", int)])
def add_n_times_m_solid(number):
    return multiply_by_m(add_n(number))

def config_mapping_fn(config):
    x = config["x"]
    return {"add_n": {"config": {"n": x}}, "multiply_by_m": {"config": {"m": x}}}


@composite_solid(
    config_fn=config_mapping_fn,
    config_schema={"x": int},
    input_defs=[InputDefinition("number", int)],
)
def add_x_multiply_by_x(number):
    return multiply_by_m(add_n(number))


@pipeline
def my_pipeline_config_mapping():
    add_x_multiply_by_x(return_one())

In [None]:
def decorator_factory_1(arg=1):
    def decorator(function):
        def wrapper(*args, **kwargs):
            return function(*args, **kwargs) + arg
        return wrapper
    return decorator

def decorator_factory_2(foo=1):
    def decorator(function):
        return decorator_factory_1(arg=foo)(function)
    return decorator

@decorator_factory_1(arg=60)
def g(x):
    return x+5

@decorator_factory_2(foo=60)
def h(x):
    return x+5

print(g(1))
print(h(1))

# NB: can't nest dynamic maps!

https://github.com/dagster-io/dagster/issues/4359 circa July
https://github.com/dagster-io/dagster/issues/4364

Other limitation: the map()ed function can only take one argument (dynamicOutput; may be a tuple, but that's hacky)

Single-stage map/collect (works):

In [1]:
import dagster as dg

@dg.solid(output_defs=[dg.DynamicOutputDefinition(name="i", dagster_type=int)])
def fanout_1(context):
    for i in [0, 10]:
        yield dg.DynamicOutput(i, mapping_key=f"a{i}", output_name="i")

@dg.solid(
    input_defs=[dg.InputDefinition(name="i", dagster_type=int)],
    output_defs=[dg.OutputDefinition(name="result", dagster_type=int)]
)
def do_stuff_1(context, i):
    context.log.info(f"test: {i} {context.get_mapping_key()}")
    return i+1

@dg.solid
def fan_in(context, sizes):
    return sum(sizes)

@dg.pipeline
def pipe_1():
    temp1 = fanout_1().map(do_stuff_1)
    fan_in(temp1.collect())

In [2]:
result = dg.execute_pipeline(pipe_1)

2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - PIPELINE_START - Started execution of pipeline "pipe_1".
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - ENGINE_EVENT - Executing steps in process (pid: 49703)
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - ENGINE_EVENT - Starting initialization of resources [io_manager].
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - ENGINE_EVENT - Finished initialization of resources [io_manager].
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - fanout_1 - LOGS_CAPTURED - Started capturing logs for solid: fanout_1.
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 - 872fae19-ec6b-4362-a8bf-8117ce111da7 - 49703 - fanout_1 - STEP_START - Started execution of step "fanout_1".
2021-08-26 17:47:30 - dagster - DEBUG - pipe_1 

Try two stages. No good.

In [4]:
import dagster as dg

@dg.solid(output_defs=[dg.DynamicOutputDefinition(int, name="i")])
def fanout_1(context):
    for i in [0, 10]:
        yield dg.DynamicOutput(value=i, mapping_key=f"a{i}")

@dg.solid(input_defs=[dg.InputDefinition("i", int)])
def do_stuff_1(context, i):
    context.log.info(f"test: {i} {context.get_mapping_key()}")
    return i+1

@dg.solid(
    input_defs=[dg.InputDefinition("i", int)],
    output_defs=[dg.DynamicOutputDefinition(int, name="j")]
)
def fanout_2(context, i):
    for j in [5, 6]:
        yield dg.DynamicOutput(value=i+j, mapping_key=f"b{j}")

@dg.solid(
    input_defs=[dg.InputDefinition("i", int), dg.InputDefinition("j", int)]
)
def do_stuff_2(context, i,j):
    context.log.info(f"test: {i} {j} {context.get_mapping_key()}")
    return i+j

@dg.solid
def fan_in(context, sizes):
    return sum(sizes)

@dg.pipeline
def pipe_2():
    # NB map(), collect() defined on... OutputDefinitions? Solids?
    temp_is = fanout_1().map(do_stuff_1)
    for i in temp_is:
        temp_sum = fanout_2(i).map(do_stuff_2)
    fan_in(temp_sum.collect())

DagsterInvariantViolationError: Attempted to iterate over an InvokedSolidDynamicOutputWrapper. This object represents the dynamic output "result" from the solid "do_stuff_1". Use the "map" method on this object to create downstream dependencies that will be cloned for each DynamicOutput that is resolved at runtime.

In [3]:
result = dg.execute_pipeline(pipe_2)

2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - PIPELINE_START - Started execution of pipeline "pipe_1".
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - ENGINE_EVENT - Executing steps in process (pid: 63454)
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - ENGINE_EVENT - Starting initialization of resources [io_manager].
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - ENGINE_EVENT - Finished initialization of resources [io_manager].
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - fanout_1 - LOGS_CAPTURED - Started capturing logs for solid: fanout_1.
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 - bf9d634a-5d06-43fc-94e2-8c55e8e8587c - 63454 - fanout_1 - STEP_START - Started execution of step "fanout_1".
2021-08-10 12:37:09 - dagster - DEBUG - pipe_1 

In [None]:
import dataclasses as dc

@dc.dataclass
class DagsterIO():
    name: str = None
    foo: str = None

@dc.dataclass
class Bar(DagsterIO):
    was: str = None

    @property
    def foo(self):
        return self.name + "_bla"


baz = DagsterIO(name="asdf")
bar = Bar(name="asdf")
print(bar.foo)


# How about solids as method calls?

In [6]:
import dataclasses as dc
import dagster as dg

class A():
    @dg.solid(output_defs=[dg.DynamicOutputDefinition(name="b")])
    def fanout_1(self, context):
        for i in [0, 10]:
            yield dg.DynamicOutput(B(value=i), mapping_key=f"b{i}", output_name="b")

    @dg.solid
    def fan_in(self, context, sizes):
        return sum(sizes)

@dc.dataclass
class B():
    value: int

    @dg.solid
    def do_stuff_1(self, context):
        context.log.info(f"test: {self.value} {context.get_mapping_key()}")
        return self.value+1

@dg.solid(
    input_defs=[dg.InputDefinition(name="b")],
    output_defs=[dg.OutputDefinition(name="result", dagster_type=int)]
)
def method_wrap(context, b):
    return b.do_stuff_1(context)

@dg.pipeline
def pipe_1():
    a = A()
    temp1 = a.fanout_1().map(method_wrap)
    a.fan_in(temp1.collect())

DagsterInvalidDefinitionError: "context" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.

# debug subprocess

In [96]:
import os
import collections
import copy
import errno
import json
import signal
import subprocess
import textwrap

class CompletedProcess(subprocess.CompletedProcess):
    def __init__(self, *args, env_in=None, env_out=None, **kwargs):
        self.env_in = env_in
        self.env_out = env_out
        super(CompletedProcess, self).__init__(*args, **kwargs)

    @classmethod
    def from_subprocess_result(cls, completed_process, **kwargs):
        env_in = kwargs.get('env_in', None)
        env_out = kwargs.get('env_out', None)
        return cls(
            args=completed_process.args,
            returncode=completed_process.returncode,
            stdout=completed_process.stdout,
            stderr=completed_process.stderr,
            env_in=env_in, env_out=env_out
        )

# def _preexec_fn():
#     """setsid() is duplicated by start_new_session=True in constuctor, but other
#     signal restoration taken from dagster_shell.
#     """
#     # Restore default signal disposition and invoke setsid
#     for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
#         if hasattr(signal, sig):
#             signal.signal(getattr(signal, sig), signal.SIG_DFL)
#     os.setsid()

SHELL = '/bin/csh'
SHELL_FLAGS = [SHELL, '-f'] # no -c

_popen_kwargs = {
    'encoding':'utf-8', 
    'executable': SHELL, 
    'restore_signals': True,
    'start_new_session': True
}

def subproc_handler(func, cmd_or_args, log, log_name=None, **kwargs):
    retcode = 1
    try:
        log.info(f"Running command '{log_name}'.")
        return func(cmd_or_args, **kwargs)
    except subprocess.TimeoutExpired:
        log.error(f"Command '{log_name}' timed out (> {kwargs['timeout']} sec).")
        retcode = errno.ETIME
    except subprocess.CalledProcessError:
        log.error(f"Command '{log_name}' raised CalledProcessError.")
    except FileNotFoundError:
        log.error(f"Command '{log_name}' raised CalledProcessError.")

    env_in = kwargs.get('env', None)
    return CompletedProcess(
        args=cmd_or_args,
        returncode=retcode, stdout="", stderr="", env_in=env_in, env_out=env_in
    )

def _run_command(args_list, **kwargs):
    kwargs.update({'capture_output': True})
    kwargs.update(_popen_kwargs)
    env_in = kwargs.get('env', None)
    result = subprocess.run(args_list, **kwargs)
    return CompletedProcess.from_subprocess_result(
        result, env_in=env_in, env_out=env_in
    )

def run_command(args_list, *args, **kwargs):
    if 'log_name' not in kwargs:
        kwargs['log_name'] = args_list[0]
    if kwargs.get('update_env', False):
        raise ValueError('Need run_shell instead')
    return subproc_handler(_run_command, args_list, *args, **kwargs)

def _run_shell(commands, timeout=None, update_env=False, **kwargs):
    def _exception_handler(proc):
        # kill subprocess and any subsubprocesses it may have spawned
        # https://stackoverflow.com/a/36955420
        os.killpg(proc.pid, signal.SIGINT)
        proc.wait()
        # return proc.returncode

    assert isinstance(commands, str)
    kwargs.update({'shell': False})
    kwargs.update(_popen_kwargs)
    commands = textwrap.dedent(commands)
    if update_env:
        commands_in = commands + '\necho "\v";' \
            + '/usr/bin/env python -c "import os, json; print(json.dumps(dict(os.environ)))"'
    else:
        commands_in = commands

    with subprocess.Popen(
        SHELL_FLAGS,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        **kwargs
    ) as proc:
        try: 
            (stdout, stderr) = proc.communicate(
                input=commands_in, timeout=timeout # encode?
            )
            retcode = proc.returncode
            # stdout = stdout.decode('utf-8', 'backslashreplace')
            # stderr = stderr.decode('utf-8', 'backslashreplace')
            if update_env:
                stdout, env_out = stdout.split('\v')
                env_out = json.loads(env_out)
            else:
                env_out = kwargs.get('env', None)
        except (subprocess.TimeoutExpired, subprocess.CalledProcessError, KeyboardInterrupt):
            _exception_handler(proc)
            raise

    return CompletedProcess(
        args=SHELL_FLAGS + [commands],
        returncode=retcode, stdout=stdout, stderr=stderr,
        env_in=kwargs.get('env', None), env_out=env_out
    )

def run_shell(commands, *args, **kwargs):
    if 'log_name' not in kwargs:
        if '\n' not in commands:
            kwargs['log_name'] = commands
        else:
            kwargs['log_name'] = '<multiple shell commands>'
    if not kwargs.get('shell', True):
        raise ValueError('Need run_command instead')
    return subproc_handler(_run_shell, commands, *args, **kwargs)

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger('LOGGER_NAME')

In [92]:
with subprocess.Popen(
    ['/bin/csh', '-f'],
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    encoding='utf-8'
) as proc:
    (stdout, stderr) = proc.communicate(input='ls -t')
    print(stdout)
    print(stderr)

dagster_tutorial.ipynb
mdtf_demo.ipynb
src
conda_env_dev.yml
__pycache__
test.py
README.md
LICENSE.txt
FRE




In [93]:
print(run_command(['ls', '-t'], logger, shell=False))
print(run_command(['ls', '-t'], logger, shell=True))
# print(run_command(['ls -t'], logger, shell=False))
print(run_command(['ls -t'], logger, shell=True))
print(run_shell('ls -t', logger, log_name='ls -t'))

INFO:LOGGER_NAME:Running command 'ls'.
CompletedProcess(args=['ls', '-t'], returncode=0, stdout='', stderr='')
INFO:LOGGER_NAME:Running command 'ls'.
CompletedProcess(args=['ls', '-t'], returncode=0, stdout='FRE\nLICENSE.txt\nREADME.md\n__pycache__\nconda_env_dev.yml\ndagster_tutorial.ipynb\nmdtf_demo.ipynb\nsrc\ntest.py\n', stderr='')
INFO:LOGGER_NAME:Running command 'ls -t'.
CompletedProcess(args=['ls -t'], returncode=0, stdout='dagster_tutorial.ipynb\nmdtf_demo.ipynb\nsrc\nconda_env_dev.yml\n__pycache__\ntest.py\nREADME.md\nLICENSE.txt\nFRE\n', stderr='')
INFO:LOGGER_NAME:Running command 'ls -t'.
CompletedProcess(args=['/bin/csh', '-f', 'ls -t'], returncode=0, stdout='dagster_tutorial.ipynb\nmdtf_demo.ipynb\nsrc\nconda_env_dev.yml\n__pycache__\ntest.py\nREADME.md\nLICENSE.txt\nFRE\n', stderr='')


In [95]:
print(run_shell('echo "foo: ${FOO}"', logger, env={'FOO':'BAR'}))
result_obj = run_shell('''
    echo "foo: ${FOO}"
    setenv FOO "BAZ"
    echo "foo2: ${FOO}"
''', logger, env={'FOO':'BAR'}, update_env=True)
print(result_obj)
print(result_obj.env_in.get('FOO', 'ERROR'))
print(result_obj.env_out.get('FOO', 'ERROR'))

INFO:LOGGER_NAME:Running command 'echo "foo: ${FOO}"'.
CompletedProcess(args=['/bin/csh', '-f', 'echo "foo: ${FOO}"'], returncode=0, stdout='foo: BAR\n', stderr='')
INFO:LOGGER_NAME:Running command '<multiple shell commands>'.
CompletedProcess(args=['/bin/csh', '-f', '\necho "foo: ${FOO}"\nsetenv FOO "BAZ"\necho "foo2: ${FOO}"\n'], returncode=0, stdout='foo: BAR\nfoo2: BAZ\n', stderr='')
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'args', 'check_returncode', 'env_in', 'env_out', 'from_subprocess_result', 'returncode', 'stderr', 'stdout']
BAR
BAZ


In [28]:
def f(*args):
    print(a,b)

def wrapper(*args, **kwargs):
    return f(*args, **kwargs)

new_params = \
    [inspect.Parameter(name=x, kind=inspect.Parameter.POSITIONAL_ONLY) \
        for x in ['a', 'b']]

print(new_params)

sig = inspect.signature(f)

print(sig)
f.__signature__ = sig.replace(parameters=new_params)

print(inspect.signature(f))
f(1,2)



[<Parameter "a">, <Parameter "b">]
(*args)
(a, b, /)


NameError: name 'a' is not defined

In [21]:
import inspect, functools

def add_pos_args(*new_pos_arg_names):
    """Programmatically set function signature; https://stackoverflow.com/a/33112180
    """
    def decorator(f):
        # @functools.wraps(f)
        def wrapper(*args, **kwargs):
            return f(*args, **kwargs)

        # Override signature
        new_params = \
            [inspect.Parameter(name=x, kind=inspect.Parameter.POSITIONAL_ONLY) \
                for x in new_pos_arg_names]
        sig = inspect.signature(f)
        sig = sig.replace(parameters=new_params)
        wrapper.__signature__ = sig

        return wrapper
    return decorator

@add_pos_args("a", "b")
def f():
    print(a,b)

In [8]:
f(1,2)

TypeError: f() takes 0 positional arguments but 4 were given

In [13]:
def f(a, b, c=3):
    pass
sig = inspect.signature(f)
sig.parameters

mappingproxy({'a': <Parameter "a">,
              'b': <Parameter "b">,
              'c': <Parameter "c=3">})

In [31]:

import dataclasses as dc

@dc.dataclass
class A():
    a:str

@dc.dataclass
class B():
    a:str

foo_str = "Hey {foo.a} == {bar.a}"
foo_str.format(foo=A(a=3), bar=B(a=5))

'Hey 3 == 5'