# Understanding **Flyte** concepts and creating simple **logistic** **regression** workflow

In [None]:
#!pip install flytekit

In [None]:
# Basic flyte code with task & workflows concepts


import flytekit as fk

@fk.task
def my_task(x: int) -> int:
  return x * 2

@fk.task
def my_another_task(val : int) -> int:
  return val + 3

@fk.workflow
def my_workflow(x: int) -> int:
  y = my_task(x=x)
  ans = my_another_task(val = y)
  # ans2 = ans + 3  => This is wrong
  # ans2 is Promise type(decrypted) while 3 is int type. We can't do any operations here.
  return ans



In [None]:
ans = my_workflow(x=9)
ans

21

**Note**:
1. Tasks in Flyte are strongly typed.
2. When calling tasks or workflows, only keyword arguments(**kwargs) is allowed.
3. A workflow is a directed acyclic graph (**DAG**) of units of work encapsulated by nodes

#Nodes
A node represents a unit of **execution** or work within a workflow. Ordinarily, a node encapsulates an **instance** of a task, but it can also contain an entire subworkflow or trigger an external workflow.

# Workflow structure is flexible because:

1. Nodes can be executed in **parallel** if they are not dependent.

2. The same task definition can be **re-used** within a different workflow.

3. A single workflow can contain any **combination** of task types.

4. A workflow can contain a **single** functional node.

5. A workflow can contain **multiple** nodes in all sorts of arrangements.

6. A workflow can **launch** other workflows.


Flyte workflows are defined in **protobuf**(serialization) and the flytekit SDK facilitates writing workflows. Users can define workflows as a collection of nodes.

In [None]:
# Creating custom Container Image using Flyte rather than writing dockerfile
custom_image = fk.ImageSpec(name='image_first', registry='localhost:30080', packages=['scikit-learn', 'numpy'])

In [None]:
custom_image

ImageSpec(name='image_first', python_version=None, builder='envd', source_root=None, env=None, registry='localhost:30080', packages=['scikit-learn', 'numpy'], requirements=None, apt_packages=None, cuda=None, cudnn=None, base_image=None, platform='linux/amd64', pip_index=None, registry_config=None)

In [None]:
# Locally running workflows as a python script
%%sh
python /content/main.py

Getting /tmp/flyte-h9_ymy3w/raw/2d3f7094a0d93f5fdb7f26f0c183245a/0d2321b540108c3796be0f90ac62a60e.joblib to /tmp/flyte-h9_ymy3w/sandbox/local_flytekit/d8aa1687858217a4870ba91061859ef2
ans:  LogisticRegression(C=0.3, max_iter=2600)


In [None]:
# Running locally for debugging workflows

%%sh
pyflyte run /content/main.py training_workflow --hyperparameters '{"C":0.2}'

Running Execution on local.
Getting /tmp/flyte-lhfet_g4/raw/0076b47399a09ee55cfc33ad988a6b29/cf05e0c997986fab1b9366f31b44353e.joblib to /tmp/flyte-lhfet_g4/sandbox/local_flytekit/6e75bd0a868f4e6b7977b289304e9da7
LogisticRegression(C=0.2, max_iter=2500)


In [None]:
# Running workflows on actual flyte cluster
# Before running it, please setup the environment variable
%%sh
pyflyte run --remote /content/main.py training_workflow --hyperparameters '{"C":0.2}'

# **Flyte Console**
1. It shows the running details of each tasks in the workflows.
2. It also shows the dependency graphs between various tasks running in the workflows along with its inputs and outputs.
3. It also shows the time taken by each tasks to complete.
4. Each workflow can be re-launched as another version to reproduce the results or for experimentation purpose.

In [None]:
# FlyteRemote
# To run the tasks & workflows programmatically as a microservice

In [None]:
import utils
import main

remote = utils.getRemote()

# creation execution workflow and execute it
workflow_execution = remote.execute_local_workflow(
    main.training_workflow,
    inputs={
        "hyperparameters":main.Hyperparameters(C=0.3, max_iter=2400)
    }
    )


In [None]:
# Generating the flyte console for the workflow execution
flyte_console = remote.generate_console_url(workflow_execution)


In [None]:
# Call to sync the inputs and outputs of every tasks in the console
synced_execution = remote.wait(execution=workflow_execution, sync_nodes=True)

# Launch Plans
Launch plans help execute workflows. A workflow can be associated with multiple launch plans and launch plan versions, but an **individual** launch plan  is **always** associated with a **single**, specific workflow.


Launch plans contain a set of **bound** workflow inputs that are passed as **arguments** to create an execution.


Each launch plan can optionally define a **single schedule** (which can be easily disabled by disabling the launch plan) as well as optional notifications.

**Note**: Every workflow comes with a **default** launch plan that has the same name as that of a workflow. But, User cannot interact with it.

# **Scheduling Launchplans in Flyte**


During **workflow** **registration**, Flyte validates the workflow structure and saves the workflow. The registration process updates the workflow graph. A compiled workflow will always have a start and end node injected into the workflow graph

Note: Registered workflows are **immutable**, i.e., an instance of a workflow defined by a specific {Project, Domain, Name, Version} combination can’t be updated

In [None]:
# Workflows can be run automatically using schedules associated with launch plans.
"""
Only one launch plan version for a given {Project, Domain, Name} combination can
be active, which means only one schedule can be active for a launch plan.

Creating a new schedule creates a new version of the launch plan.
Because schedule cannot be edited.
"""

In [None]:
# FixedRate Schedule LaunchPlan for workflow
fixed_rate_lp_days = LaunchPlan.get_or_create(
    name="Logistic_Regression_workflow_v1",
    workflow=training_workflow,
    # run every day
    schedule=FixedRate(duration=timedelta(days=1)),
    # use default inputs
    default_inputs={"hyperparameters":main.Hyperparameters(C=0.3, max_iter=2400)},
)

In [None]:
# Cron schedule LaunchPlan for workflow
scron_schedule_workflow = LaunchPlan.get_or_create(
    name="Logistic_Regression_workflow_v2",
    workflow=training_workflow,
    # run every 2 minutes
    schedule=CronSchedule(schedule="*/2 * * * *"),
    # use default inputs
    default_inputs={"hyperparameters":main.Hyperparameters(C=0.3, max_iter=2400)},
)