<div align="center">
  <img src="https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/readme/flyte_and_lf.png" alt="Flyte and LF AI & Data Logo" width="250">
</div>

<h1 align="center">
  Flyte School
</h1>

<h3 align="center">
  Developing and Productionizing Data and ML Pipelines
</h3>

This module guides you on preparing pipelines for both development and production.

### Learning Objectives

Learn how to:
- Simplify the pipeline development lifecycle
- Build custom images without using a Dockerfile
- Explore different methods to register Flyte tasks and workflows
- Make data and ML pipelines production-ready
- Understand how projects and domains facilitate team collaboration and the transition from development to production

### Outline

* A brief introduction to Flyte tasks and workflows
* How to simplify local development
* How to simplify iteration on a Flyte cluster
* How to make pipelines production-ready
* How to transition pipelines from development to production

## A brief introduction to Flyte tasks and workflows

Task is a core building block for type-safety, statelessness and reproducibility.

A workflow is basically a domain-specific language (DSL) that builds an
execution graph that uses tasks as the building blocks for more complex pipelines.

<image src="static/flyte_tasks.png" width="400px">
<image src="static/flyte_workflows.png" width="500px">

In [42]:
from flytekit import task, workflow


@task
def error(x: list[float], y: list[float]) -> list[float]:
    return [xi - yi for xi, yi in zip(x, y)]

@task
def squares(x: list[float]) -> list[float]:
    out = [xi ** 2 for xi in x]
    return out

@task
def sum_task(x: list[float]) -> float:
    return sum(x)

@workflow
def sum_of_squares(x: list[float], y: list[float]) -> float:
    errors = error(x=x, y=y)
    squared = squares(x=errors)
    return sum_task(x=squared)


sum_of_squares(x=[1.0, 2.0, 4.0], y=[1.0, 3.0, 6.0])

5.0

In [43]:
%%sh
python workflows/example_intro.py

Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/raw/88f3a2e4787dfb21b8c8b205fd9c6841/b1526e5124d082047b3fc78c6ffb0516.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/sandbox/local_flytekit/c40d05a5ca992f4b672217b5fe8d37a1
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/raw/88f3a2e4787dfb21b8c8b205fd9c6841/b1526e5124d082047b3fc78c6ffb0516.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/sandbox/local_flytekit/aec5d0a7d2781dfe916c7966b04ee7e1
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/raw/88f3a2e4787dfb21b8c8b205fd9c6841/b1526e5124d082047b3fc78c6ffb0516.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-u_2errsg/sandbox/local_flytekit/4b2409087cb79040bb08b2d5f9f9a7f7
DefaultNamedTupleOutput(o0=LogisticRegression(C=0.1, max_iter=5000), o1=0.9926739926739927, o2=0.9855072463768116)


### Summary

A task exhibits the following characteristics:

* Versioned (typically aligned with the git sha)
* Strong interface (annotated inputs and outputs)
* Independently executable

Workflows can be written as Python functions, but it’s important to distinguish tasks and workflows.

A task’s body executes at run-time on a Kubernetes cluster, in a Query Engine like BigQuery, or on hosted services like AWS Batch or Sagemaker.

In contrast, a workflow’s body doesn’t perform computations; it’s used to structure tasks. A workflow’s body executes at registration time, during the workflow’s registration process. 

### Resources

* [Imperative workflows](https://docs.flyte.org/en/latest/flytesnacks/examples/basics/imperative_workflow.html)
* [Launch plans](https://docs.flyte.org/en/latest/flytesnacks/examples/basics/launch_plan.html)
* [Map tasks](https://docs.flyte.org/en/latest/flytesnacks/examples/advanced_composition/map_task.html)
* [Dynamic workflows](https://docs.flyte.org/en/latest/flytesnacks/examples/advanced_composition/dynamic_workflow.html)

## How to simplify local development

Local development is a crucial prerequisite before promoting your data and ML pipelines to production. 
It facilitates faster debugging, allows for rapid experimentation, and, consequently, 
instills confidence in you when you want to productionize your pipelines.

### What is local development in Flyte?

It involves executing tasks and workflows locally, right on your terminal as a simple starting point.

For instance, you should be able to run your code as a Python script:

In [44]:
%%sh
python workflows/example_intro.py

Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/raw/9cb7ab55eca975638904271bfc5b8e70/f28ad795bc68f234eb6d6fc25dc5039b.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/sandbox/local_flytekit/d1157c4618273542345d12e60fe384fe
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/raw/9cb7ab55eca975638904271bfc5b8e70/f28ad795bc68f234eb6d6fc25dc5039b.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/sandbox/local_flytekit/cab07a4c1fe8a0603c83f32f357cb665
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/raw/9cb7ab55eca975638904271bfc5b8e70/f28ad795bc68f234eb6d6fc25dc5039b.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-bkoe1hji/sandbox/local_flytekit/37a501172ba804d405eddae5974a5b79
DefaultNamedTupleOutput(o0=LogisticRegression(C=0.1, max_iter=5000), o1=0.9926739926739927, o2=0.9855072463768116)


This should essentially be your first step in testing and debugging.

However, with `python`, you don't get to leverage all the benefits offered by Flyte, such as type checking.

That's when you'll want to transition to Flyte-native commands like `pyflyte run`.

`pyflyte run` is the simplest command to test your tasks and workflows locally.

In [45]:
%%sh
pyflyte run \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'

Running Execution on local.


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/raw/d04e96a8eefbc6dd93b7e0097ef5c1eb/8319bff46d831ddba69b41a5c52e3cbb.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/sandbox/local_flytekit/4d0e2756ec68f16bd10d0135abd09568
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/raw/d04e96a8eefbc6dd93b7e0097ef5c1eb/8319bff46d831ddba69b41a5c52e3cbb.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/sandbox/local_flytekit/aa412e45b66881444c79b516506abff7
Getting /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/raw/d04e96a8eefbc6dd93b7e0097ef5c1eb/8319bff46d831ddba69b41a5c52e3cbb.joblib to /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-uz38psdz/sandbox/local_flytekit/b005207dd1d98c6b854ecc5b3a687308
DefaultNamedTupleOutput(o0=LogisticRegression(C=0.01, max_iter=2500), o1=0.9743589743589743, o2=0.9855072463768116)


In [46]:
%%sh
pyflyte run \
    workflows/example_intro.py get_data

Running Execution on local.
       species  bill_length_mm  bill_depth_mm  flipper_length_mm  body_mass_g
0       Adelie            39.1           18.7              181.0       3750.0
1       Adelie            39.5           17.4              186.0       3800.0
2       Adelie            40.3           18.0              195.0       3250.0
4       Adelie            36.7           19.3              193.0       3450.0
5       Adelie            39.3           20.6              190.0       3650.0
..         ...             ...            ...                ...          ...
339  Chinstrap            55.8           19.8              207.0       4000.0
340  Chinstrap            43.5           18.1              202.0       3400.0
341  Chinstrap            49.6           18.2              193.0       3775.0
342  Chinstrap            50.8           19.0              210.0       4100.0
343  Chinstrap            50.2           18.7              198.0       3775.0

[342 rows x 5 columns]


In [34]:
%%sh
pyflyte run --help

                                                                                
 [33mUsage:[0m [1mpyflyte run[0m [[1;36mOPTIONS[0m] [1;36mCOMMAND[0m [[1;36mARGS[0m]...                                 
                                                                                
 This command can execute either a workflow or a task from the command line,    
 allowing for fully self-contained scripts. Tasks and workflows can be imported 
 from other files.                                                              
 [2mNote: This command is compatible with regular Python packages, but not with [0m   
 [2mnamespace packages. When determining the root of your project, it identifies [0m  
 [2mthe first folder without an ``__init__.py`` file.[0m                              
                                                                                
[2m╭─[0m[2m Options [0m[2m───────────────────────────────────────────────────────────────────[0m[2m─╮[0m
[2

No-brainer: You need to install the required dependencies locally before running your code.

### Local caching

Flyte, by default, provides local caching to store the outputs of your executions locally.

### [Bonus] Agents

An agent is a long-running, stateless service that can be used to execute tasks. 
It reduces the overhead of creating a pod for each task and enables interactions with external services.

Whenever you need to interact with external services from within Flyte and want to test it locally, you can use agents.

You can run an agent just like any other Flyte task or workflow using the `pyflyte run` command.

## How to simplify iteration on a Flyte cluster

Running code locally, on your terminal, is beneficial for catching code-related bugs early on.

For quick iterations, access to GPUs, data lineage, and overall leveraging everything Flyte has to offer, 
you need to run code on a local Flyte cluster.

This should pretty much set the standards for running your code on a production Flyte cluster.

The good thing is, you just need to add the `--remote` flag to your `pyflyte run` command:

In [7]:
import os
from pathlib import Path

os.environ["FLYTECTL_CONFIG"] = str(Path.home() / ".flyte/config-sandbox.yaml")
os.environ["IMAGE"] = "ghcr.io/unionai-oss/flyte-school:00-intro-latest"

In [47]:
%%sh
pyflyte run --remote \
    --image $IMAGE \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'

Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f87d48e97282c4e1bb88 to see execution in the console.


### Decoding the power of `--remote`!

* Every task in your workflow runs in a Kubernetes pod
* Data lineage
* Recover a partially successful run
* Your workflow and tasks are versioned
* Executions are immutable
* Type checking
* Timeline view
* Parallelism

### What's `--image`?

The workflow we're using requires additional libraries besides `flytekit` and the ones it installs.
The `image` parameter is used to supply the Docker image that Flyte uses to spin up containers
in which the workflow code runs.

If you want to avoid the hassle of building a Docker image, you can use the image spec.

```python
custom_image = ImageSpec(
    name="dev-and-prod",
    registry="localhost:30000",
    packages=["palmerpenguins", "scikit-learn"],
)

@task(container_image=custom_image)
def t1() -> ...:
    ...
```

In [48]:
%%sh
envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
pyflyte run --remote \
    workflows/example_intro_with_imagespec.py training_workflow \
    --hyperparameters '{"C": 0.01}'

error: context "flyte-sandbox" already exists


Running Execution on Remote.
Image localhost:30000/dev-and-prod:vWfB_Tn_U7YWRVvdChB_fA.. found. Skip building.
Image localhost:30000/dev-and-prod:vWfB_Tn_U7YWRVvdChB_fA.. found. Skip building.
Image localhost:30000/dev-and-prod:vWfB_Tn_U7YWRVvdChB_fA.. found. Skip building.
Image localhost:30000/dev-and-prod:vWfB_Tn_U7YWRVvdChB_fA.. found. Skip building.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f68f2b5c46c1a436c92a to see execution in the console.


### Caching

Caching is especially useful during quick iterations while developing pipelines. 
None of us wants to spend more resources and time than needed 
running an execution which has succeeded already again and again!

```python
@task(cache=True, cache_version="1.0")
def t1() -> ...:
    ...
```

### Decks

Decks allow you to gain visibility into your tasks. You can use Flyte Decks to render visualizations.

You can render:
* Dataframe
* Markdown
* Box plot
* Image
* Table
* Source code
* Anything HTML!

In [49]:
%%sh
pyflyte run --remote \
    workflows/example_deck.py pca_plot

Running Execution on Remote.
Image localhost:30000/flyte-decks-example:zBKgwt8ut0I5M17TSnD__Q.. found. Skip building.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/fb6fa9583115e4f1bb6b to see execution in the console.


Overall, Decks, from the perspective of streamlining execution on a Flyte cluster, 
allows you to debug faster and enables you to quickly grasp how your workflow's faring.

### Registration

`pyflyte run` incorporates packaging, registering, and launching a workflow into a single command.
It is designed to be a quick and easy iteration tool to get started with Flyte or test small self-contained scripts. 

`pyflyte run` leverages fast registration. How? When a script is executed using `pyflyte run`, 
the script is bundled up and uploaded to FlyteAdmin. 
When the task is executed in the backend, this zipped file is extracted and utilized.

Fast registration is useful when you already have a container image hosted in your container 
registry of choice, and you change your workflow/task code without any alterations to 
your system-level or Python dependencies.

In contrast, regular registration expects the user to embed the user-defined workflows and all 
their dependencies into a Docker image. This method ensures that the workflows are fully containerized, 
making both the system- and Python-level dependencies, along with your workflow source code, immutable.

<image src="static/flyte-registration-patterns.png" width="800px">

`pyflyte package` and `flytectl register` are appropriate if you’re:

* Working with multiple Flyte clusters since it uses a portable package
* Deploying workflows to a production context
* Testing your Flyte workflows in your CI/CD infrastructure

When iterating on your workflows, fast registration is the go-to option.

## How to make pipelines production-ready

Your pipeline's working fine, and now you want to productionize it.

### What's productionizing a pipeline?

You need to ensure that your pipeline is now ready to handle real-world scenarios. 
From the perspective of data and ML pipelines, this can mean any of the following:

* Ensuring your pipelines have the necessary resources
* Receiving notifications when there's a workflow failure
* Setting a cadence at which your workflow has to run
* Establishing a logging system
* Providing access to GPUs
* Adding secrets

### Task resources

The following attributes can be specified for a Resource.

* cpu
* mem
* gpu

```python
@task(requests=Resources(cpu="1", mem="100Mi"), limits=Resources(cpu="2", mem="150Mi"))
def t1() -> ...:
    ...
```

A task can possibly be allocated more resources than it requests, but never more than its limit. 
Requests are treated as hints to schedule tasks on nodes with available resources, whereas limits are hard constraints.

[Resource](https://docs.flyte.org/en/latest/flytesnacks/examples/productionizing/customizing_resources.html)

### Notifications

If a workflow fails in production, you want to be notified of the failure, right?

* Email
* Pagerduty
* Slack

```python
wacky_int_doubler_lp = LaunchPlan.get_or_create(
    name="wacky_int_doubler",
    workflow=int_doubler_wf,
    default_inputs={"a": 4},
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.FAILED],
            recipients_email=["me@example.com", "you@example.com"],
        ),
        Email(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=["myboss@example.com"],
        ),
        Slack(
            phases=[
                WorkflowExecutionPhase.SUCCEEDED,
                WorkflowExecutionPhase.ABORTED,
                WorkflowExecutionPhase.TIMED_OUT,
            ],
            recipients_email=["myteam@slack.com"],
        ),
    ],
)
```

You can use Sendgrid mail servers to set up notifications.

[Resource](https://docs.flyte.org/en/latest/flytesnacks/examples/productionizing/lp_notifications.html)

### Scheduling

Launch plans can be set to run automatically on a schedule using the Flyte native scheduler. 

```python
training_launchplan = LaunchPlan.get_or_create(
    training_workflow,

    name="scheduled_training_workflow",

    # run every 2 minutes
    schedule=CronSchedule(schedule="*/2 * * * *"),

    # use default inputs
    default_inputs={"hyperparameters": Hyperparameters(C=0.1, max_iter=1000)},
)
```

[Resource](https://docs.flyte.org/en/latest/flytesnacks/examples/productionizing/lp_schedules.html)

In [50]:
%%sh
pyflyte register workflows/example_intro_with_launchplan.py --activate-launchplans

Running pyflyte register from /Users/samhitaalla/Desktop/flyte-school/01-dev-and-prod with images ImageConfig(default_image=Image(name='default', fqn='cr.flyte.org/flyteorg/flytekit', tag='py3.11-1.10.2'), images=[Image(name='default', fqn='cr.flyte.org/flyteorg/flytekit', tag='py3.11-1.10.2')]) and image destination folder /root on 1 package(s) ('/Users/samhitaalla/Desktop/flyte-school/01-dev-and-prod/workflows/example_intro_with_launchplan.py',)
Registering against localhost:30080
Detected Root /Users/samhitaalla/Desktop/flyte-school/01-dev-and-prod, using this to create deployable package...
No output path provided, using a temporary directory at /var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/tmpqvz46m_a instead
Computed version is N2-qz-ke0H0l5vypGngJ7w==
Loading packages ['workflows.example_intro_with_launchplan'] under source root /Users/samhitaalla/Desktop/flyte-school/01-dev-and-prod
Successfully serialized 7 flyte objects
[✔] Registration workflows.example_intro_with_launchp

In [51]:
%%sh
pyflyte launchplan scheduled_training_workflow --deactivate

[2K[36m Deactivating...[0m [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [35m  0%[0m [36m-:--:--[0m
 Launchplan was set to INACTIVE: scheduled_training_workflow:N2-qz-ke0H0l5vypGngJ7w==
[2K
[?25h

### Logging

To debug your workflows in production, you want to access logs from your tasks as they run. 
These logs are different from the core Flyte platform logs, are specific to execution.

Some examples of the log aggregators include cloud-hosted solutions like 
AWS CloudWatch, GCP Stackdriver, Splunk, Datadog, etc.

[Resource](https://docs.flyte.org/en/latest/flytesnacks/examples/productionizing/configure_logging_links.html)

### GPU

Most ML training jobs typically require a GPU, 
and you want to ensure that you assign the right GPU to your training job.

```python
from flytekit.extras.accelerators import T4


@task(
    limits=Resources(gpu="1"),
    accelerator=T4,
)
def my_task() -> None:
    ...
```

### Secrets

You don't want to expose the secrets that your pipelines require, 
especially in production deployments.

```python
@task(
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_NAME,
            mount_requirement=Secret.MountType.ENV_VAR,
        )
    ]
)
def secret_file_task() -> Tuple[str, str]:
    secret_manager = flytekit.current_context().secrets

    # get the secrets filename
    f = secret_manager.get_secrets_file(SECRET_GROUP, SECRET_NAME)

    # get secret value from an environment variable
    secret_val = secret_manager.get(SECRET_GROUP, SECRET_NAME)

    # returning the filename and the secret_val
    return f, secret_val
```

You can configure a secret management system like AWS secret manager or Vault to inject secrets.

That's all it takes to make your pipelines production-ready!

## How to transition pipelines from development to production

Domains allow you to transition pipelines between different environments, 
providing an abstraction to isolate resources and feature configuration for each deployment environment.

Examples of domains include development, staging, and production. 
You can specify lower resource limits 
on the development and staging domains than on production domains.

You can also use domains to disable launch plans and schedules on development and staging domains, 
as these features are typically meant for production deployments.

In [40]:
%%sh
pyflyte run --remote \
    --domain development \
    --image $IMAGE \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'

Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f0282a8e34cb54471994 to see execution in the console.


In [52]:
%%sh
pyflyte run --remote \
    --domain production \
    --image $IMAGE \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'

Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/production/executions/f6955ec8aa9c545739bb to see execution in the console.


Projects are collections of tasks and workflows, 
providing a way to associate a project with a team or an individual. 
Flyte allows users to set resource limits and automatically provides basic reports and dashboards for each project.

In [15]:
%%sh
pyflyte run --remote \
    --project flytesnacks \
    --domain production \
    --image $IMAGE \
    workflows/example_intro.py training_workflow \
    --hyperparameters '{"C": 0.01}'

Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/production/executions/f1d9c6aae08da4bcfbf8 to see execution in the console.


You can update cluster resources by adding the required quota to a YAML file as follows:

```yaml
attributes:
    projectQuotaCpu: "50"
    projectQuotaMemory: "100Gi"
domain: production
project: flytesnacks
```

Then run `flytectl update cluster-resource-attribute --attrFile cra.yaml`.

A workflow that's production-ready can also run in the development domain. 
All the features we've seen are especially useful when running in production but should work either way!

You should also be able to reference tasks and workflows from other projects and domains.

```python
@reference_task(
    project="flytesnacks",
    domain="development",
    name="data_types_and_io.file.normalize_columns",
    version="{{ registration.version }}",
)
def normalize_columns(
    csv_url: FlyteFile,
    column_names: List[str],
    columns_to_normalize: List[str],
    output_location: str,
) -> FlyteFile:
    ...
```

## Conclusion

Congrats!

You've made it through to the end of this module! To recap, we've:

- Briefly explored the basic constructs of Flyte: tasks and workflows
- Understood how you can develop pipelines locally
- Recognized how easy it is to run workflows on a Flyte cluster
- Understood which features to leverage to make your pipelines production-ready
- Learned how to transition pipelines from development to production

### Resources

<a href="https://docs.flyte.org/en/latest/">
    <img src="https://img.shields.io/badge/Flyte-Docs-purple?style=for-the-badge" alt="Flyte Docs" />
</a>
<a href="https://slack.flyte.org">
    <img src="https://img.shields.io/badge/Slack-Chat-pink?style=for-the-badge&logo=slack" alt="Flyte Slack" />
</a>
<a href="https://github.com/flyteorg/flyte">
    <img src="https://img.shields.io/badge/Flyte-Github%20Repo-black?style=for-the-badge&logo=github" alt="Flyte Repo" />
</a>