# workflows

Complex machine learning applications often require multi-stage pipelines (e.g., data loading, transforming, training, testing, iterating). **Workflows** in Spell allow you to manage these pipelines as a sequence of Spell runs, and are a lightweight alternative to tools like [Airflow](https://airflow.apache.org/) and [Luigi](https://github.com/spotify/luigi) for managing your model training pipelines.

Workflows can be launched using either the Spell CLI or the Spell Python API. In this tutorial we demonstrate both approaches by example.

## understanding workflows

Every workflow consists of one *master run* and one more more *worker runs*. The master run is responsible for control flow: that is, determining which worker runs should get executed when, and why. The worker runs then do all of the work required.

Our demo workflow consists of three steps:

1. downloading the dataset (a Project Gutenberg copy of _War and Peace_) and saving it to disk.
2. mounting that text corpus into a run, training the neural network on it, and saving the model to disk.
3. mounting the saved model into yet another run, sampling it for an interesting result, and streaming that output to logs.

To accomplish this, we will need one one master run and three worker runs, arranged thusly:

![](https://i.imgur.com/W5Ugs0S.png)

For this simple example we will execute the steps consecutively, conditioning the start of each worker run in the workflow on the success of its predecessor. More complex workflows may require more complicated control flow.

While the instance type of the worker runs is configurable, the master run always executes on the basic `cpu` instance type. Try to keep any computationally intensive logic isolated to the workers!

## understanding the workflow script

In order to execute a workflow, we need to define a workflow script. The **workflow script** is what gets executed on the master run: a Python script using the Spell Python API to define worker jobs and the control flow logic surrounding them.

Here is a dead-simple workflow script. Don't worry if you don't understand all of it right away, we'll walk through it step by step.

In [None]:
%%writefile simple.py
import spell.client
client = spell.client.from_environment()

print(client.active_workflow)

r1 = client.runs.new(command="echo Hello World! > foo.txt")
r1.wait_status(*client.runs.FINAL)
r1.refresh()
if r1.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")

r2 = client.runs.new(
    command="cat /mnt/foo.txt",
    attached_resources={f"runs/{r1.id}/foo.txt": "/mnt/foo.txt"}
)
r2.wait_status(*client.runs.FINAL)
r2.refresh()
if r2.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")

print("Finished workflow!")

Let's walk through this script step-by-step:

```python
import spell.client
client = spell.client.from_environment()
```

This initializes the client object. If you are not familiar with our Python API, check out the [Python API Reference](http://spell.run/docs/python) to learn more.


```python
print(client.active_workflow)
```

You can use this variable to determine which workflow the script is currently executing in. In the case that this script is not being run from inside of a workflow this will be set to `None`.

```python
r1 = client.runs.new(command="echo 'Hello World!' > foo.txt")
```

This next block of code executes a new run, one which creates a file containing `Hello World!` on disk. This file automatically gets saved to SpellFS.

```python
r1.wait_status(*client.runs.FINAL)
r1.refresh()
if r1.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")
```

We can only proceed to the next stage of the workflow when the first stage completes successfully. This next bit of code is a control flow block that achieves this.

Every run transitions through a sequence of states as part of its execution: `machine_requested`, `running`, `pushing`, and so on. Runs eventually transition to a so-called **final state**: the state that the run is assigned at the end of its execution. There are four different possible final states, the most important of which is `COMPLETE`. A run which terminates in the `COMPLETE` state is one which has successfully run all of its code and pushed all of its outputs to SpellFS.

This `wait_status` methods blocks execution until the run API reports that the run has reached a final state. We then `refresh` the information on the run object (this has to be done manually because it requires a network roundtrip) and check if the `r.status` field reports that the run is `COMPLETE`. We only proceed with the rest of the script if it is&mdash;if it is not, e.g. if the run reached a failing final state (`FAILED`, `STOPPED`, or `INTERRUPTED`), we raise an error instead.

```python
r2 = client.runs.new(
    command="cat /mnt/foo.txt",
    attached_resources={f"runs/{r1.id}/foo.txt": "/mnt/foo.txt"}
)
r2.wait_status(*client.runs.FINAL)
r2.refresh()
if r2.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r.id}")
```

The next code block creates another Spell run. This time instead of writing `Hello World!` to disk, we mount the `foo.txt` file we created in `r1` into the run. We then `cat` it (print it out to `stdout`), which will cause it to show up in the run logs.

## executing the workflow script

You can execute the workflow script using the Spell CLI:

In [31]:
!spell workflow "python simple.py"

[0m✨ Preparing uncommitted changes…
[0mEnumerating objects: 9, done.
Counting objects: 100% (9/9), done.
Delta compression using up to 12 threads
Compressing objects: 100% (5/5), done.
Writing objects: 100% (5/5), 649 bytes | 649.00 KiB/s, done.
Total 5 (delta 4), reused 0 (delta 0)
To git.spell.run:aleksey/e6cee8710721a8ef6f3d2924713ac7d351c972ca.git
 * [new branch]      HEAD -> br_9beb42bead69bba7ca10038c6207ac35601c371b
💫 Casting workflow #14…
[0m✨ Following workflow at run 350.
[0m✨ Stop viewing logs with ^C
[0m[K[0m[?25h[0m✨ Building… donecode[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m
[0m✨ [0mRun is running
[0m[K[0m[?25h[0m✨ Machine_Requested… done-- waiting for a CPU machine..[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m[0m

We can verify that this workflow executed successfully by checking the run logs of the last worker run:

In [43]:
!spell logs 352

[0m[K[0m[?25h[0m✨ Machine_Requested… done
[0m[K[0m[?25h[0m✨ Building… done
[0m[K[0m[?25h[0m✨ Mounting… done
[0m[0m✨ [0mRun is running
[0mHello World!
[0m[K[0m[?25h[0m✨ Saving… done
[0m[K[0m[?25h[0m✨ Pushing… done
[0m🎉 [0mTotal run time: 11.525986s
[0m🎉 [0mRun 352 complete
[0m[K[0m[?25h[0m[0m

## a more complex example

As with any run, the code environment in a worker run can be initialized from a GitHub repository using the `--github-url` flag.

However, with more complex pipelines it is sometimes useful to make the exact model code used a runtime variable. To support this use case, the Python API additionally supports initializing the code environment from a local `git` repository inside of the master run using the `--repo` flag.

The following example demonstrates how this feature works. This workflow downloads a copy of _War and Peace_ from Project Gutenberg in a first run; trains a character-level RNN on this data in a second run; and then samples some text from the model in a third and final run. Note the use of the `commit_label` flag on the `run` command; this tells the run to initialize the code environment using the repository with the label `char-rnn`. It is the responsibility of the user to set this value accordingly.

In [2]:
%%writefile workflow.py
import spell.client
client = spell.client.from_environment()

# create the first run to download the dataset (War and Peace, by Leo Tolstoy)
# if desired, replace data_url with url to another plain text file to train on
data_url = "https://www.gutenberg.org/files/2600/2600-0.txt"
r1 = client.runs.new(
    command="wget -O input.txt {}".format(data_url)
)
print("waiting for run {} to complete".format(r1.id))
r1.wait_status(*client.runs.FINAL)
r1.refresh()
if r1.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r1.id}")

# create the second run to train char-RNN on the dataset
data_dir = "/data"
r2 = client.runs.new(
    machine_type="t4",
    command="python train.py --data_dir={}".format(data_dir),
    attached_resources={
        "runs/{}/input.txt".format(r1.id): "{}/input.txt".format(data_dir)
    },
    commit_label="char-rnn",
)
print("waiting for run {} to complete".format(r2.id))

r2.wait_status(*client.runs.FINAL)
r2.refresh()
if r2.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r2.id}")

# create the third run that samples the model to generate some text
r3 = client.runs.new(
    machine_type="t4",
    command="python sample.py",
    attached_resources={"runs/{}/save".format(r3.id): "save"},
    commit_label="char-rnn",
)
print("waiting for run {} to complete".format(r3.id))

r3.wait_status(*client.runs.FINAL)
r3.refresh()
if r3.status != client.runs.COMPLETE:
    raise OSError(f"failed at run {r3.id}")

Overwriting workflow.py


To run this workflow we will need the following model code:

In [13]:
!git clone https://github.com/sherjilozair/char-rnn-tensorflow.git

Cloning into 'char-rnn-tensorflow'...
remote: Enumerating objects: 404, done.[K
remote: Total 404 (delta 0), reused 0 (delta 0), pack-reused 404[K
Receiving objects: 100% (404/404), 508.45 KiB | 1.19 MiB/s, done.
Resolving deltas: 100% (238/238), done.


Finally, when we execute this workflow, we parameterize the repo label using the `--repo` flag:

In [14]:
!spell workflow \
    --repo char-rnn=char-rnn-tensorflow/ \
    "python workflow.py"

[0m✨ Syncing repo char-rnn-tensorflow/.
[0mEverything up-to-date
✨ Preparing uncommitted changes…
[0mEnumerating objects: 11, done.
Counting objects: 100% (11/11), done.
Delta compression using up to 12 threads
Compressing objects: 100% (6/6), done.
Writing objects: 100% (6/6), 2.41 KiB | 2.41 MiB/s, done.
Total 6 (delta 5), reused 0 (delta 0)
To git.spell.run:aleksey/e6cee8710721a8ef6f3d2924713ac7d351c972ca.git
 * [new branch]      HEAD -> br_b1dfb5675ed1f875f975838304d4cfc546e207db
💫 Casting workflow #15…
[0m✨ Following workflow at run 353.
[0m✨ Stop viewing logs with ^C
[1m[36m⭐[0m Building… Retrieving codee[0m[0m[0m[0m[0m[0m[0m^C
[0m
[0m✨ Use 'spell logs 353' to view logs again
[0m[K[0m[?25h[0m[0m

Here's the sample output we generated using our finished model:

In [9]:
!spell logs 338 | tail -n 20

2020-05-05 15:35:54.000600: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcublas.so.10.0
 of Moscupe, who foll and since hoer. We are and still turned
merely as the corner that argument in lors for so a quality that.
Welllaration wehe return, raisements of
such a Frenchmen inspecting for them tallow me with the same correct actions,
fellows and well—or watching, in animation, gay others.





CHAPTER XVIII
 Yasova and givein offers, and man—do not restraining the woode, they pause they seemed to many apply as he left cordier in
which Now did not be week wiplocking France 
✨ Run is saving
✨ Run is pushing
Saving build environment for future runs
🎉 Total run time: 22.285643s
🎉 Run 338 complete
Scanning for modified or new files from the run


Not bad for just an one hour of model training time on a single book!

## further reference

The `with-metrics` and `video-generation-workflow` folders in this repository contain even more code samples.