# Onboarding Tutorial

This tutorial will take you through the core concepts of Flyte.

## Introduction

### Environment Setup

Follow the instructions in the [setup instructions](./README.md#setup) of
the README.

### Example 0: Flyte Basics

Let's take a look at the [first example](./workflows/example_00_intro.py).

In it, you'll see a simple pipeline that uses the penguins dataset to train a
penguin species classifier. You can run this workflow locally with:

```
python workflows/example_00_intro.py
```

#### Exercise: Understanding Workflows

Workflows are basically a domain-specific language (DSL) that builds an
execution graph that uses tasks as the building blocks for more complex pipelines.

Insert a debugging breakpoint `import pdb; pdb.set_trace()` right before the
return statement of the `training_workflow` function in the `example_00_intro.py`
script and rerun it. Take a look at all the variables in the `training_workflow`
like `data` and `model`. What data type is it?

#### Running a Workflow

Use `pyflyte run` to run the workflow on a Python runtime:

```bash
pyflyte run workflows/example_00_intro.py training_workflow \
    --hyperparameters '{"C": 0.01, "max_iter": 5000}'
```

Then supply the `--config`, `--remote`, and `--image` flags to run it on a
remote Flyte cluster:

```bash
export IMAGE='ghcr.io/unionai-oss/union-cloud-templates:onboarding-latest'
pyflyte --config ~/.uctl/config.yaml \
    run --remote \
    --image $IMAGE \
    workflows/example_00_intro.py training_workflow \
    --hyperparameters '{"C": 0.01, "max_iter": 5000}'
```

#### Registering Your Workflow

Once you're happy with the state of your tasks and workflows, you can register
them by first packaging them up into a portable flyte archive:

```bash
pyflyte --pkgs workflows package --image $IMAGE -f
```

This will create a `flyte-package.tgz` archive file that contains the serialized
tasks and workflows in this project. Then, you can register it with:

```bash
~/bin/uctl --config ~/.uctl/config.yaml register files --project flytesnacks --domain development --archive flyte-package.tgz --version v0
```

Now we can go over to https://sandbox.union.ai/console
(or http://localhost:30080/console if you're using a local Flyte cluster) to
check out the tasks and workflows we just registered.

In [None]:
from workflows import example_00_intro
from workflows.example_00_intro import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_00_intro.training_workflow,
    inputs={
        "hyperparameters": Hyperparameters(C=0.1, max_iter=5000),
        "test_size": 0.2,
        "random_state": 11,
    }
)
remote.generate_console_url(execution)

In [None]:
execution = remote.wait(execution)

#### Scheduling Launchplans

Activate the schedule:

In [None]:
from workflows.utils import get_remote

remote = get_remote()
lp_id = remote.fetch_launch_plan(name="scheduled_training_workflow").id
remote.client.update_launch_plan(lp_id, "ACTIVE")
print("activated scheduled_training_workflow")

Get the execution for the most recent schedule run.

In [None]:
recent_executions = [
    execution
    for execution in remote.recent_executions()
    if execution.spec.launch_plan.name == "scheduled_training_workflow"
]

scheduled_execution = None
model = None
if recent_executions:
    scheduled_execution = recent_executions[0]
    
print(scheduled_execution)

Now deactivate the schedule

In [None]:
remote.client.update_launch_plan(lp_id, "INACTIVE")
print("deactivated scheduled_training_workflow")

#### `pyflyte register`

Flyte support rapid iteration during development via "fast registration" via
`pyflyte register`. This This zips up all of the source code of your Flyte 
application and bypasses the need to re-build a docker image.

```
pyflyte register --project onboarding --domain development --image $IMAGE workflows
```

Now go back the Flyte console and take a look at one of the workflows. You'll
see our fast-registered version under the **Recent Workflow Versions** panel.

## Scalability

### Example 1: Dynamic Workflows

Dynamic workflows allow you to create execution graphs on the fly. This allows
you to specify for loops over inputs to implement a grid search model tuning
workflow.

In [None]:
from workflows import example_01_dynamic
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_01_dynamic.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            Hyperparameters(C=1.0, max_iter=5000),
            Hyperparameters(C=0.1, max_iter=5000),
            Hyperparameters(C=0.01, max_iter=5000),
            Hyperparameters(C=0.001, max_iter=5000),
        ],
    }
)
remote.generate_console_url(execution)

### Example 2: Map Tasks

Map tasks enable larger fan-outs of embarrassingly parallel computations compared
to dynamic workflows.

In [None]:
from workflows import example_02_map_task
from workflows.example_00_intro import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_02_map_task.tuning_workflow,
    inputs={
        "hyperparam_grid": [
            Hyperparameters(C=1.0, max_iter=1000),
            Hyperparameters(C=0.1, max_iter=1000),
            Hyperparameters(C=0.01, max_iter=1000),
            Hyperparameters(C=0.001, max_iter=1000),
        ],
    }
)
print(remote.generate_console_url(execution))

### Example 3: Plugins

Flyte has a plugin system that lets you integrate with a wide variety of
data and machine learning tools that help you to scale, like BigQuery,
Pyspark, and Ray.

In [None]:
from workflows import example_03_plugins
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_03_plugins.training_workflow,
    inputs={
        "n_epochs": 50,
        "hyperparameters": example_03_plugins.Hyperparameters(
            in_dim=4, hidden_dim=100, out_dim=3, learning_rate=0.03
        ),
    }
)
print(remote.generate_console_url(execution))

## Data Quality

### Example 4: Type System

The Flyte type system is responsible for a lot of Flyte's magic: Flyte uses
the regular Python type hints to automatically serialize outputs of tasks
and deserialize inputs of tasks from Flyte's native serialization format,
including handling the off-loading of tabular data like `pandas.DataFrame`
objects.

A nice consequence of this is that Flyte can also analyze the execution graph
that's built at compile-time and raise errors.

Take a look at [example_04_type_system.py](./workflows/example_04_type_system.py).
Try changing the output signature of `get_data` from `pd.DataFrame` to `dict`
and to fast register it:

```
pyflyte register --project flytesnacks --domain development --image $IMAGE workflows
```

What error do you see?

Then you can revert it to the correct implementation and run it again:

In [None]:
from workflows import example_04_type_system
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_04_type_system.get_splits,
    inputs={"test_size": 0.2, "random_state": 123},
)
print(remote.generate_console_url(execution))

### Example 5: DataFrame Types

Pandera is a data validation tool for dataframe-like objects. In
[example_05_pandera_types.py](./workflows/example_05_pandera_types.py), we define
a pandera schema that validates the output of `get_data` as well as the DataFrame
input of `split_data` at runtime.

#### Exercise

- Uncomment line 49 in the `example_05_pandera_types.py`
- Fast register your workflows then run the cell below. What error do you see?
- Bonus: comment the offending line and fast register the workflows again.
  Re-run the cell again... what do you see?

In [None]:
from workflows import example_05_pandera_types
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_05_pandera_types.get_splits,
    inputs={"test_size": 0.2},
)
print(remote.generate_console_url(execution))

## Reproducibility

### Example 6: Reproducibility

Next, we'll learn about multiple levels of reproducibility:

- **Environment-level reproducibility**: As you can see in the
  [Dockerfile](./Dockerfile), we're containerizing our Flyte application to
  capture a snapshot of all the dependencies that your tasks and workflows rely on.
- **Code-level reproducibility**: In [example_06_reproducibility.py](./workflows/example_06_reproducibility.py)
  we take care of setting a random seed for our model. This is a common practice 
  but an important one to remember!
- **Resource-level reproducibility**: Finally, as you've seen previously we can
  declare the compute and memory requirements of our pipeline at the task-level.

Combined with built-in versioning for all tasks, workflows, launchplans, and
executions, Flyte gives you the ability to roll back/forward to previous versions
of any of these entities. Flyte tasks/workflows are sort of like hermetically-sealed
containers that are guaranteed to produce the same output (error or not) given
the same input.

In [None]:
from workflows import example_06_reproducibility
from workflows.utils import get_remote

remote = get_remote()

for i in range(3):
    execution = remote.execute_local_workflow(
        example_06_reproducibility.training_workflow,
        inputs={
            "hyperparameters": example_06_reproducibility.Hyperparameters(
                penalty="l2", alpha=0.001, random_state=42,
            )
        },
    )
    print(f"Execution {i}: {remote.generate_console_url(execution)}")

## Recoverability

### Example 7: Caching

In [example_07_caching.py](./workflows/example_07_caching.py), we revisit the model-tuning use case using `@dynamic` workflows,
showing how caching can help reduce wasted compute.

In [None]:
from workflows import example_07_caching
from workflows.example_06_reproducibility import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
hyperparam_grid = [
    Hyperparameters(alpha=alpha)
    for alpha in [10.0, 1.0, 0.1, 0.01, 0.001, 0.0001]
]
execution = remote.execute_local_workflow(
    example_07_caching.tuning_workflow,
    inputs={"hyperparam_grid": hyperparam_grid}
)
print(remote.generate_console_url(execution))

Once the execution is completed, re-run the workflow with the same inputs in the
cell below. In the Union Cloud console you'll see a **checkmark icon 🔄** next to
the task execution status, indicating that the output of the execution was
taken from the cache.

In [None]:
execution = remote.execute_local_workflow(
    example_07_caching.tuning_workflow,
    inputs={"hyperparam_grid": hyperparam_grid}
)
print(remote.generate_console_url(execution))

### Example 8: Recovering Failed Executions

In [example_08_recover_executions.py](./workflows/example_08_recover_executions.py), we see how Flyte
provides a mechanism by which you can automatically recover from unexpected failures.

In [None]:
from workflows import example_08_recover_executions
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_08_recover_executions.tuning_workflow,
    inputs={"alpha_grid": [100.0, 10.0, 1.0, 0.1, 0.01, 0.001, 0.0001, 0.00001]}
)
print(remote.generate_console_url(execution))

### Example 9: Checkpointing

In [example_09_checkpointing.py](./workflows/example_09_checkpointing.py), we
learn about how you can do intra-task checkpoints natively in Flyte to pick
up from where you left off in, e.g., a model training task.

In [None]:
from workflows import example_09_checkpointing
from workflows.example_06_reproducibility import Hyperparameters
from workflows.utils import get_remote

remote = get_remote()
execution = remote.execute_local_workflow(
    example_09_checkpointing.training_workflow,
    inputs={
        "n_epochs": 30,
        "hyperparameters": Hyperparameters(penalty="l1", random_state=42),
    }
)
print(remote.generate_console_url(execution))
execution = remote.wait(execution)

## Auditability

### Example 10: Visiualization with Flyte Decks

In [example_10_flyte_decks.py](./workflows/example_10_flyte_decks.py) we
create tasks that produce static html reports that help you understand the
inputs/outputs of your tasks.

In [None]:
from workflows import example_10_flyte_decks

remote = get_remote()
execution = remote.execute_local_workflow(
    example_10_flyte_decks.penguins_data_workflow,
    inputs={},
)
print(remote.generate_console_url(execution))

### Example 11: Extending Flyte Decks

Flyte decks can be easily extended to support any arbitrary visualization, as
we can see in [example_11_extend_flyte_decks.py](./workflows/example_11_extend_flyte_decks.py)

#### Exercise

Come up with a visualization for one of inputs or outputs of any of the tasks
in `example_11_extend_flyte_decks.py`, and create a custom Flyte deck for it.

In [None]:
from workflows import example_11_extend_flyte_decks
from workflows.example_06_reproducibility import Hyperparameters

remote = get_remote()
execution = remote.execute_local_workflow(
    example_11_extend_flyte_decks.training_workflow,
    inputs={
        "hyperparameters": Hyperparameters(
            penalty="l1", alpha=0.03, random_state=12345
        )
    },
)
print(remote.generate_console_url(execution))