In [1]:
%%writefile hello_flow.py

from metaflow import FlowSpec, step


class HelloFlow(FlowSpec):
    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.
        """
        self.next(self.hello)

    @step
    def hello(self):
        print("Hello <TODO>.")
        self.next(self.end)

    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.
        """
        print("HelloFlow is all done.")


if __name__ == "__main__":
    HelloFlow()

Overwriting hello_flow.py


In [2]:
!python hello_flow.py help

[35m[1mMetaflow 2.3.0[0m[35m[22m executing [0m[31m[1mHelloFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:jovyan[0m[35m[22m[K[0m[35m[22m[0m
Usage: hello_flow.py [OPTIONS] COMMAND [ARGS]...

Options:
  --quiet / --not-quiet           Suppress unnecessary messages  [default:
                                  not-quiet]
  --metadata [local|service]      Metadata service type  [default: local]
  --environment [local|conda]     Execution environment type  [default: local]
  --datastore [local|s3]          Data backend type  [default: local]
  --datastore-root TEXT           Root path for datastore
  --package-suffixes TEXT         A comma-separated list of file suffixes to
                                  include in the code package.  [default:
                                  .py,.R,.RDS]
  --with TEXT                     Add a decorator to all steps. You can
                                  specify this option multiple times to attach
                            

In [4]:
!python hello_flow.py run --help

[35m[1mMetaflow 2.3.0[0m[35m[22m executing [0m[31m[1mHelloFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:jovyan[0m[35m[22m[K[0m[35m[22m[0m
Usage: hello_flow.py run [OPTIONS]

  Run the workflow locally.

Options:
  --tag TEXT                Annotate this run with the given tag. You can
                            specify this option multiple times to attach
                            multiple tags in the run.
  --max-workers INTEGER     Maximum number of parallel processes.  [default:
                            16]
  --max-num-splits INTEGER  Maximum number of splits allowed in a foreach.
                            This is a safety check preventing bugs from
                            triggering thousands of steps inadvertently.
                            [default: 100]
  --max-log-size INTEGER    Maximum size of stdout and stderr captured in
                            megabytes. If a step outputs more than this to
                            stdout/stderr

In [5]:
!python hello_flow.py run

[35m[1mMetaflow 2.3.0[0m[35m[22m executing [0m[31m[1mHelloFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:jovyan[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2021-06-20 06:48:19.709 [0m[1mWorkflow starting (run-id 1624171699705581):[0m
[35m2021-06-20 06:48:19.715 [0m[32m[1624171699705581/start/1 (pid 565)] [0m[1mTask is starting.[0m
[35m2021-06-20 06:48:20.181 [0m[32m[1624171699705581/start/1 (pid 565)] [0m[1mTask finished successfully.[0m
[35m2021-06-20 06:48:20.190 [0m[32m[1624171699705581/hello/2 (pid 570)] [0m[1mTask is starting.[0m
[35m2021-06-20 06:48:20.629 [0m[32m[1624171699705581/hello/2 (pid 570)] [0m[22mHello <TODO>.[0m
[35m2021-06-20 06:48:20.690 [0m[32m[1624171699705581/hello/2 (pid 570)] [0m[1mTask finished successfully.[0m


In [12]:
%%writefile linear_flow.py
from metaflow import FlowSpec, step, current

class LinearFlow(FlowSpec):

    @step
    def start(self):
        self.logbook = current.step_name
        self.next(self.a)

    @step
    def a(self):
        self.logbook += f".{current.step_name}"
        self.next(self.end)

    @step
    def end(self):
        self.logbook += f".{current.step_name}"
        assert self.logbook == 'start.a.end'

if __name__ == '__main__':
    LinearFlow()

Overwriting linear_flow.py


In [13]:
!python linear_flow.py run

[35m[1mMetaflow 2.3.0[0m[35m[22m executing [0m[31m[1mLinearFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:jovyan[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2021-06-20 06:49:34.483 [0m[1mWorkflow starting (run-id 1624171774480104):[0m
[35m2021-06-20 06:49:34.488 [0m[32m[1624171774480104/start/1 (pid 606)] [0m[1mTask is starting.[0m
[35m2021-06-20 06:49:35.028 [0m[32m[1624171774480104/start/1 (pid 606)] [0m[1mTask finished successfully.[0m
[35m2021-06-20 06:49:35.039 [0m[32m[1624171774480104/a/2 (pid 611)] [0m[1mTask is starting.[0m
[35m2021-06-20 06:49:35.508 [0m[32m[1624171774480104/a/2 (pid 611)] [0m[1mTask finished successfully.[0m
[35m2021-06-20 06:49:35.518 [0m[32m[1624171774480104/end/3 (pid 616)] [0m[1mTask is starting.[0m
[35m2