# Kubeflow Introduction

> __Kubeflow is an abstraction layer over `k8s` which enables easier `ML` oriented workflows__

In comparison to `k8s` it is:
- more Python centric
- uses other cluster-specific projects like [`istio`](https://github.com/istio/istio)
- leverages `Service`s out of the box and is all about micro-services defined with `Pipelines` (described later)

## What can I use it for?

- Experimentation and hyper-parameter tuning via [`katib`](https://www.kubeflow.org/docs/components/katib/) (way faster due to parallelization across cluster)
- Deploying and managing complex ML tasks at scale ([`kfserving`](https://www.kubeflow.org/docs/components/kfserving/))
- Monitoring our application (or ease of addition of respective components)
- `CI/CD` is not officially supported, although it can be quite easily mixed with popular solutions like `GH-Actions`, `Jenkins` or a-like



## Components

> `Kubeflow` provides various components for various common tasks durign typical `ML` lifecycle

- [Central Dashboard](https://www.kubeflow.org/docs/components/central-dash/) - visualize `pipeline`s, resource usage and other relevant statistics with easy to use `UI`
- [Notebook Servers](https://www.kubeflow.org/docs/components/notebooks/) - `jupyter notebook`s provided as a mean for experimentation, data visualization or exploration
- [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/) - high level DAG which allows us to specify necessary steps for deployment/training/tuning/CI etc.
- [KFServing](https://www.kubeflow.org/docs/components/kfserving/) - similar idea to `tensorflow`'s `Serving` 
- [Katib](https://www.kubeflow.org/docs/components/katib/) - hyperparameter tuning
- [Training Operators](https://www.kubeflow.org/docs/components/training/) - operators used to `train` specific frameworks (__mostly `DL` oriented__) to downstream common tasks
- [Multi-Tenancy](https://www.kubeflow.org/docs/components/multi-tenancy/) - IAM and isolation, security related and sharing resources across teams

# Dependencies

Before `Kubeflow` itself we should know what other dependencies (except `kubernetes`) are required (or possible).

> Only brief explanations of each component is presented; for more information refer to their respective documentation

In this cell, __required dependencies__ are presented. Next cell, shows an example of __external dependency__ one could use.

## istio

> __Only necessary `3rd` party addition__

It is automatically included with `kubeflow` during installation.

Following `steps` outline the need for `istio`:
1. Applications are built using distributed microservices
2. Each microservice is an instance of `Service` with it's own `API` and a way to communicate with it
3. __`service mesh` - network of such services working together__
4. __It is getting progressively harder to manage such stack__

This is where `istio` comes in, which, after being added allows us to:
- Add observability for our services (e.g. gathering metrics)
- Provides `load-balancing`, failure recovery and metrics __via simple `.yaml` configuration__
- Secures and validates services taking part in such mesh (e.g. `TLS` encryption)
- Controls traffic between services via `proxies`

Services before `istio`:

![](./images/istio-before.svg)

Services after `istio` mesh was applied:

![](./images/istio-after.svg)

### `istio` in `kubeflow`

- Used for applying appropriate policies
- Adding identity of a `user` for other services we communicate with

Creating new `Notebook Server` can be seen in the image below:

![](./images/istio-in-kubeflow.svg)

## Elyra

> set of AI-centric extensions to JupyterLab Notebooks.

From workflow enhancement for Data Scientists, e.g.:
- `LSP` (Language Server Protocol) integration for `jupyterlab` (__renaming, finding by reference and other features known from `IDE`s__)
- Notebooks navigation via `TOC`
- [Code snippets](https://elyra.readthedocs.io/en/latest/getting_started/overview.html#reusable-code-snippets)

to more advanced features, namely:
- [Visual Pipeline Editor](https://elyra.readthedocs.io/en/latest/getting_started/overview.html#ai-pipelines-visual-editor)- __connect your `notebooks` and `python` scripts to define a workflow__
- One can execute these pipelines locally. via `kubeflow pipelines` (hey there!) or `apache airflow`
- Running `jupyter notebook`s as batch jobs

`Visual` pipelines and related features are leveraged by `kubeflow` to provide better dev/deploy/visualization experience

![](./images/elyra-notebook-pipeline.png)

> __See [FOSS repository](https://github.com/elyra-ai/elyra) and link for more information__

# Kubeflow Architecture Overview

You might feel overwhelmed looking at this image at first, __but don't worry, we will explain what is going in more and more detail as we will proceed__

![](./images/kubeflow-pipelines-architecture.png)

We already know a few things, namely:
- `k8s` specific part of Orchestration System
- Controllers (driving provided `.yaml` settings from __current__ to __desired__ state)
- `k8s` `Node`s which do the "heavy-lifting" our application requies

Other elements of this image, briefly (going from top to bottom):
- Python SDK - user defined `pipeline` using `dsl`. __This `dsl` is transpiled to `k8s` readable `.yaml` files__
- __Pipeline webserver__ - gathers data from `Service`s with help of [`istio`](https://istio.io/)'s service mesh
- __Pipeline `Service`__ - service used to watch transpiled `config` and `apply` it
- __Pipeline `Persistent Agent`__ - watched `k8s` resources created by `Pipeline Service`:
    - __records `containers` that executed (and their `inputs` and `outputs`)__
    - Above can be either:
        - `container` parameters
        - `URI`s of data artifacts
- __Orchestration controllers__ - control state and deploy `workload resources` accordingly to defined DAG:
    - `Argo` is the core controller (see [here](https://github.com/argoproj/argo-workflows/)); __allows us to schedule complicated pipelines with dependencies__
- __Artifact Storage__ - store relevant data:
    - `metadata` - stored in a `mysql` database used as `PersistentVolume`
    - `artifacts` - [`minio`](https://min.io/) or cloud storage (__used for fast `w/r` access__) used as `PersistentVolume`

# Kubeflow Pipelines

> Platform in `kubeflow`, __defining end-to-end lifecycle of `ML` project__ and helping with re-using components

One can do it using:
- `SDK` (`python` in our case) - defines the pipeline
- `UI` - visualizing created pipelines easily
- `k8s` based engine which transpiles `kubeflow` format to `k8s` specific one (yes, `.yaml` files)

## Concepts

`Pipeline` consists of the following concepts we will understand before:
- `pipeline`
- `component`
- `graph`
- `experiment`
- `run` and `recurring run`
- `run trigger`
- `step`
- `output artifact`

Along each, we will see how to define them using Python's SDK

## Installation

First, start `minikube` cluster, as we saw previously one can do it with `minikube start` (__you should still use it for local development__).

> `kubeflow-pipelines` can be installed as a standalone via `k8s` `kustomize`

We can do it directly from `github`, see three commands below:

In [None]:
!export PIPELINE_VERSION=1.6.0
!kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
!kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
!kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

You might need to wait a little bit for appropriate `POD`s to start.

To see them, one can check available `POD`s and their status in `kubeflow` namespace:

In [None]:
!kubectl get pods --namespace kubeflow

Once all of the above are `Running` (__notice some restarts might happen, don't worry about them__), run the following command to `forward` `80` port of `kubeflow`'s UI `POD` to `localhost:8080`:

In [None]:
!kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

We can now open `localhost:8080` in our web browser to see the UI.

It should look similar to this:

![](./images/kubeflow-ui.png)

In order to delete `kubeflow-pipelines` we can use the following:

In [None]:
# export PIPELINE_VERSION=1.6.0 # Optional, as we exported envvar previously

# kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
# kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"

## Installing `SDK`

> `kubeflow`'s `SDK` is provided as `PyPI` package and named `kfp`

It communicates with `kubernetes` Python `SDK` and in-directly by transpiling graph to `.yaml` files (as we will see)

As per usual, use `pip`  to install (__you should do it within `AiCore`'s `conda` environment__):

In [None]:
!pip install kfp --upgrade

## `SDK` packages

High level overview of provided functionalities after installation:
- __[`kfp.compiler`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.compiler.html) - class and methods for compiling `dsl` to `.yaml`__:
    - [`kfp.compiler.Compiler`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.compiler.html#module-kfp.compiler) - compiles `pipeline` functions to `yaml` workflows
    
An example schematic code using it (__we will see all parts together later__):

In [None]:
@kfp.dsl.pipeline(
  name='name',
  description='description'
)
def my_pipeline(a: int = 1, b: str = "default value"):
  ...

Compiler().compile(my_pipeline, 'path/to/workflow.yaml')


- [`kfp.components`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.components.html) - classes and methods for interacting with pipeline components
- [`kfp.dsl`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html) - contains the domain-specific language that you can use to define and interact with pipelines and components:
    - includes `Pipeline` definition (as seen above)
    - __core package of `SDK` we will use the most often__
- [`kfp.client`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html) - client libraries for [Kubeflow Pipelines API](https://www.kubeflow.org/docs/components/pipelines/reference/api/kubeflow-pipeline-api-spec/) and allows us to:
    - create experiments
    - run pipelines
    - upload pipelines
- [`kfp.cli.diagnose_me`](https://github.com/kubeflow/pipelines/tree/master/sdk/python/kfp/cli/diagnose_me) - ways to debug environment interactively, returning various metadata useful for debugging the setup.

## `kfp` in command line

After installation `kfp` is also available as command-line tool:
- `kfp pipeline <COMMAND>` - managing `pipeline`s, commands include:
    - `get` - Gets detailed information about a Kubeflow pipeline from your Kubeflow Pipelines cluster.
    - `list` - Lists the pipelines that have been uploaded to your Kubeflow Pipelines cluster
    - `upload` - Uploads a pipeline to your Kubeflow Pipelines cluster.
- `kfp run <COMMAND>` - manage `kubeflow`'s runs:
    - `get` - Displays the details of a pipeline run.
    - `list` - Lists recent pipeline runs.
    - `submit` - Submits a pipeline run

For example:

In [2]:
!kfp pipeline --help

Usage: kfp pipeline [OPTIONS] COMMAND [ARGS]...

  Manage pipeline resources

Options:
  --help  Show this message and exit.

Commands:
  delete          Delete an uploaded KFP pipeline
  get             Get detailed information about an uploaded KFP pipeline
  list            List uploaded KFP pipelines
  list-versions   List versions of an uploaded KFP pipeline
  upload          Upload a KFP pipeline
  upload-version  Upload a version of the KFP pipeline


## Component

> __Self-contained piece of code performing one step in `pipeline`__

![](./images/kubeflow-graph.png)

In the above image `Xgboost train` is an example of such component.

> It is analogous to __larger function__ performing __one semantically valid addition__

It also has (just like `functions`):
- name
- parameters/arguments
- return values
- body (code)

> __Each component must be packaged as `Docker` image as they are standalone units of execution__

### Component code

There are two parts of such `python` code:
- `client` - talks to endpoints to submit jobs, e.g. submitting `spark`'s `job`
- `runtime` - actual code, e.g. creating `pyspark.sql.DataFrame` from `SparkSession`.

Convention is to keep component's code within a `package` named by it, for example:
- `/component` - `client` modules 
- `/component/component.py` - `client` code

### Component definition

- `metadata` - name, description etc.
- `interface` - input/output specification (name, type, description, default value, etc)
- `implementation` - actual code, __also defines how to get `outputs` out of it__

> __Component is defined using `python`, hence we don't have to worry about transpiling to `k8s` readable `.yaml` definitions above__

> ### In most cases, __make sure to check `Challenges.Mandatory.Components` for alternative!__

# Python function-based components

> __Python funciton-based components allow us to define any component SOLELY in `python`__

This approach alleviates the additional steps for `component` definition, namely:
- Defining `Docker` image file
- Defining `.yaml` with component's definition

__Above will be generated automatically from `python` code__, additional benefits include:
- More readable as `component` is defined like function
- Quicker to develop (and only `python` knowledge is necessary)

On the other hand, __due to automation__:
- Harder to customize (__although might be done after transpilation was finished by `kfp`__)
- For more complicated use cases (e.g. `Docker` image with custom dependencies) we need to create `Dockerfile` anyway (as a base for automation)

> Check `Challenges.Mandatory.Components` for a direct way of creation `component`! __THIS KNOWLEDGE IS MANDATORY AND WILL BE VERIFIED!__

Before moving on, let's create a [`kfp.Client`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html) instance which will be used from now on:

In [3]:
import kfp

# We are using default values
# Host is automatically inferred from within `jupyter notebook`s hence not specified
# In our case it would be localhost
client = kfp.Client()

## Standalone Functions

Before we start, a few things to `note` about functions __which differ largely from `Python`'s standard practices__:

> __It should not use any code declared outside function__

Something like this is prohibited:

In [None]:
x = 12

def foo():
    return x

> __`import` statements USED WITHIN A FUNCTION__

There is no apparent "best practice", but we suggest `import`ing every necessary dependency at the top of your function and by dividing them with code blocks, e.g.:

In [4]:
# We can't do that anymore!
# import numpy as np


def bar():
    ###########################################################################
    #
    #                               IMPORTS
    #
    ###########################################################################

    import numpy as np
    import this  # Well, now I'm not so sure

    ###########################################################################
    #
    #                                 SRC
    #
    ###########################################################################

    return np.array([1])

> __Helper functions defined within the function__

### Drawbacks

Drawbacks of this approach should be pretty visible right now (for larger/more complicated functions where transforming them into separate microservices is non-sensible):
- Long and semi-readable code (we can only get this far with good practices)

Alternative might be better for the following cases:
- __Longer `Service` doing a lot of heavy-lifting__
- Optimization while preserving code readability (disk `I/O` is expensive, hence we should prohibit it)
- __There is an option to `cache` results__ but this still does not resolve readability problems

One the other hand:
- __One has to create a standalone program and define `.yaml` and `Dockerfile`__

## Simple function-based `component`

Example below outlines the necessary steps:

1. __Create `standalone` function__ (make it `pure`, without any side-effects)

Please note that:
- We should use `typing` Python feature for `type` inference
- __We will later see repercusions during returning multiple values__

In [5]:
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b

2. __Create [`kfp.dsl.ContainerOp`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.ContainerOp) using `kfp.components.create_component_from_func`__

Please note that:
- Factory function will be created (which we can handle in multiple different ways)
- __This `op` should be used within `Pipeline` (described in more detail later)__
- Creates `.yaml` component definition automagically for us

In [7]:
add_op = kfp.components.create_component_from_func(
    add, output_component_file="add_component.yaml"
)

3. __Create `Pipeline` that runs our `op`(s)__

Please note `comments` within the code:

In [8]:
@kfp.dsl.pipeline(
  name='Addition pipeline',
  description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
  a='1',
  b='7',
):
  # Passes a pipeline parameter and a constant value to the `add_op` factory
  # function.
  first_add_task = add_op(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add_op` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add_op(first_add_task.output, b)

4. __Create and `run` pipeline from function__

> __Please run port forwarding of `kubeflow` as described previously to see the `run`__

In [10]:
# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)

RunPipelineResult(run_id=85870662-5aa5-4c56-a73e-3a343237565c)

You should have your run within the `UI`, __take a moment to explore relevant info in different `UI` sections!__

![](./images/example-add-run.png)

## Using packages in functions

> In order to use custom `package`s one has to install them within `Docker` environment

There are a few ways to obtain that, in the order "best" to "worst":
1. Contained within `Docker` image - __choose appropriate base image for your `microservice`__ to alleviate any headaches; __use `base_image` argument of `kfp.components.create_component_from_func`__
2. Install packages within `Docker` image; __useful when `Docker` image has most of the `packages` and a few additional are required__; __use `packages_to_install` argument__
3. Install packages using `subprocess` (from your `function` code); __discouraged__; __only for local packages, SHOULD BE AVOIDED THOUGH__

For example:

In [None]:
kfp.components.create_component_from_func(
    # output_component_file is optional
    my_op,
    output_component_file="add_component.yaml",
    base_image="tensorflow/tensorflow:1.11.0-py3",
    packages_to_install=("torchdata==0.2.0", "torchlayers==0.1.1"),
)

### Additional info about `create_component_from_func`

- Default image: `python:3.7`
- __For larger dependencies create `Docker` base image from scratch and deploy it__ as it:
    - Component runs way faster (no need to download packages)
    - Not as error prone (less likely to be OS-dependent)

## Data

> __`inputs`, `outputs` and data passing in `kubeflow`__

In general:
- `inputs` are `CLI` arguments for `Docker` container within `POD`
- `outputs` are returned as files

Parameters are also passed in different ways:
- basic types (e.g. `float`, `int`, short `str`) __are passed by value__
- parameters passed by file  include things like:
    - `csv` files
    - images
    - datasets
    - __anything larger__
- __Larger `parameters` are stored in specified `PersistentVolume`!__

## Inferring `types`

> `kubeflow`'s components created from functions can infer `dtype`s __via Python's typing features__

Let's see generated `add`'s component `.yaml`: 

In [11]:
!cat ./add_component.yaml

name: Add
description: Calculates sum of two arguments
inputs:
- {name: a, type: Float}
- {name: b, type: Float}
outputs:
- {name: Output, type: Float}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def add(a, b):
        '''Calculates sum of two arguments'''
        return a + b

      def _serialize_float(float_value: float) -> str:
          if isinstance(float_value, str):
              return float_value
          if not isinstance(float_value, (float, int)):
              raise TypeError('Value "{}" has type "{}" instead of float.'.format(str(float_value), str(type(float_value))))
          return str(float_value)

      import argparse
      _parser = argparse.ArgumentParser(prog='Add', description='Calculates sum of two arguments')
      _parser.add_argument("--a", dest="a", type=floa

At the same time, we can see how much code was automatically generated for us by `kubeflow`, which includes, amongst other things:
- argument parsing via `argparse` module
- outputing values __and serializing them to desired type__
- saving data within `POD`s storage (`PersistentVolume`)
- whole `.yaml` structure

> Above code is not meant to be readable as it's automatically generated!

So, in this case, `type`s were inferred based on __function signature__. 

> __If we don't provide function signature it is assumed that we are passing `str` types!__

This function returns one value though, what if we wanted to return multiple values?

> __Use `NamedTuple` from `typing` module to decorate the function appropriately!__

Let's see a more complicated example:

In [None]:
from typing import NamedTuple

def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float),
    ('mlpipeline_ui_metadata', 'UI_metadata'),
    ('mlpipeline_metrics', 'Metrics')
  ]):
  """Example function that demonstrates how to return multiple values."""
  sum_value = a + b
  product_value = a * b

  # Export a sample tensorboard
  metadata = {
    'outputs' : [{
      'type': 'tensorboard',
      'source': 'gs://ml-pipeline-dataset/tensorboard-train',
    }]
  }

  # Export two metrics
  metrics = {
    'metrics': [
      {
        'name': 'sum',
        'numberValue':  float(sum_value),
      },{
        'name': 'product',
        'numberValue':  float(product_value),
      }
    ]
  }

  from collections import namedtuple
  example_output = namedtuple(
      'ExampleOutputs',
      ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  return example_output(sum_value, product_value, metadata, metrics)


This one returns `metadata` for `UI` and `metrics`.

> Special `str` values can be used for these two values __in order for them to be readable by `metrics` and `UI` respectively!__

We have to also return `namedtuple` __and this is the only way to forward multiple arguments!__

> Please note, these values __will be saved to `disk` anyway__

### Caching

A little sidenote:

> ### `kubeflow` provides caching out of the box

How does it work?

- If `component` was run previously __with the same arguments__ this component __will not run!__
- Instead, outputs from `PersistentVolume` of choice will be forwarded to the next `component` within `pipeline`

One can disable this feature or __force recalculation after some period of time__. See [here](https://www.kubeflow.org/docs/components/pipelines/caching/) and [here](https://www.kubeflow.org/docs/components/pipelines/caching-v2/) for `V2` SDK.

## Passing parameters by file

> __In most cases `parameters` will be files (e.g. dataset) that we would like to operate on__

This raises an obvious issue, namely:

> What if we save `data` within the function and __return `None` implicitly__?

In this case, __there is no way to infer from function's signature return WHAT IS ACTUALLY RETURNED!__
(there is, but it is not Python compliant with static checkers like `mypy`).

> ### We can use special `kfp.components` types __to mark what is returned by the function__

First, let's see a simple example:

In [None]:
def split_text_lines(
    source_path: comp.InputPath(str),
    odd_lines_path: comp.OutputPath(str),
    even_lines_path: comp.OutputPath(str),
):
    """Splits a text file into two files, with even lines going to one file
    and odd lines to the other."""

    with open(source_path, "r") as reader:
        with open(odd_lines_path, "w") as odd_writer:
            with open(even_lines_path, "w") as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

This is pretty self-explanatory, but:
- You can find other `Input` definitions (e.g. binary) [here](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html#kfp.components.InputBinaryFile)
- You can find other `Output` definitions (e.g. binary) [here](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html#kfp.components.OutputBinaryFile)

## Components Best Practices

> Below is a compressed set of best practices (one can find full list [here](https://www.kubeflow.org/docs/components/pipelines/sdk/best-practices/))

- Components __should use local files unless not possible__ (Cloud ML Engine and BigQuery require Cloud Storage staging paths)
- Aforementioned __pure components__ (e.g. no side-effects and modifying anything without "telling `kubeflow`")
- __Mix and match programming languages__ - DO NOT STICK TO `python` IF IT IS NOT BENEFICIAL! __How?__:
    - Create `Docker` image containing `golang` app which requires some `input`s and outputs
    - Define `.yaml` directly
    - __Use inter-language formats for data exchange__ (e.g. `JSON`, `CSV`, `ProtoBuf` etc.)
    - One can do small pre-processing for different languages with `shell` scripts (__SMALL ONES!__)
- __One output == one file__
- __Do not pollute with temporary data__. Why? __It might/will be preserved by `PersistentVolume`__
- __STAY LOCAL FOR AS LONG AS POSSIBLE__ - it's always easier and less stressful, e.g. unit testing specific component
- __Test in isolation__ - single container; if not possible, run `minikube` or a-like before going with `kubeadm` and full-blown cluster (way easier to debug)

### Feedback from AiCore

- __Do not use function-based components for more complicated workflows__ as it allows you to use packages and `python` best practices more freely
- __Do not "overkill"__: keep the services within "reasonable size", for example:
    - `microservice` to transform data instead of:
        - `microservice` to load data
        - `microservice` to perform one operation on data
        - `microservice` to rotate image...
    - __Why?__ I/O is expensive and we should try to minimize it if possible

# Pipeline

> __Description of an ML `workflow`, including all of the `component`s in the `workflow` and how they combine in the form of a `graph`__

Usually, it is our code packaged in `Docker` image with some `inputs` and `outputs` (as we will later see)

Looking at the part of this pipeline one can notice that:
- Some of the components can easily run in parallel (in a different `POD` scheduled on some `Node`)
- They are directly dependent on the previous steps (similar to `Airflow`)
- Data is shared via `artifacts` __as `POD`s do not share data directly__

Above `graph` is defined via `SDK` (`python`) and specifically created `dsl` (or rather pseudo-dsl) for this task.

> We will see how to create the whole structure in `python` after all of the concepts are described

## Defining `Pipeline`

Now, let's see how one can define `Pipeline` using `kfp`, but before that, let's define `component`s which:
- First one downloads `.tar.gz` file and returns CSV
- Another one __is not even defined__ and downloads resource from `url`
- Last one will be the `add` component just to showcase the functionality of `conditional` flow in `dsl`

Before we do that, just to brush up `python`, let's make `kfp.components.create_component_from_func` a __configurable decorator__

> `kfp.components.create_component_from_func` can be used as a decorator __but only with default arguments__

In [None]:
import functools

# Wrapper
@functools.wraps(kfp.components.create_component_from_func)
def our_component_from_func(*args, **kwargs)
    def wrapper(function):
        return kfp.components.create_component_from_func(function, *args, **kwargs)

    return wrapper

In [None]:
@our_component_from_func(
    output_component_file="component.yaml",  # This is optional. It saves the component spec for future use.
    base_image="python:3.7",
    packages_to_install=["pandas==1.1.4"],
)
def merge_csv(file_path: comp.InputPath("Tarball"), output_csv: comp.OutputPath("CSV")):
    import glob
    import tarfile

    import pandas as pd

    tarfile.open(name=file_path, mode="r|gz").extractall("data")
    df = pd.concat(
        [pd.read_csv(csv_file, header=None) for csv_file in glob.glob("data/*.csv")]
    )
    df.to_csv(output_csv, index=False, header=False)

We can __reuse components__, simply provide the `URI` resource which contains `.yaml` specification:

In [None]:
web_downloader_op = kfp.components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component.yaml"
)

With everything in place we can define `pipeline`.

In order to do that:
1. Define `pipeline` function
2. Decorate `pipeline` with `dsl.pipeline` and provide necessary information there
3. Pass it to `client.create_run_from_pipeline_func` as seen before __OR__
3. Compile it and run from `UI`

Let's see all of that together

In [None]:
@kfp.dsl.pipeline(name="Example pipeline", description="Shows basics of pipelines")
# Define a pipeline and create a task from a component:
# We don't have to specify types
def my_pipeline(url, run_add: bool):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
  # Only `if`
  with kfp.dsl.Condition(run_add):
      first_add_task = add_op(a, 4)
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

And the first option to run it (non-interactively):

In [None]:
client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        "url": "https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz",
        "run_add": False,
    },
)

Another to run it interactively via `UI` within `pipelines` tab:

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml',
)

# Challenges

## Mandatory

### Components

- __Check how to create custom `Component` WITHOUT `python function`__ (e.g. as a standalone program with custom `Dockerfile` and `yaml` specification) [here](https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/). This approach allows us for:
    - Easier customization of `Dockerfile` (although one can almost always deploy `image` beforehand and use it from within `python`)
    - Custom `.yaml` file - this one is harder to obtain with `python` functions (e.g. dependent values on `envvars` etc.)
    - __Make sure you have read and understood this part!__
- Check what is required to use `recursion` with `kubeflow`'s `dsl` [here](https://www.kubeflow.org/docs/components/pipelines/sdk/dsl-recursion/)

    
### Integrations

- Check out [`sidecar injection`](https://istio.io/latest/blog/2019/data-plane-setup/) pattern (used by `istio`). What is it, why is it useful?
- Check out [`kubeflow-kale`](https://github.com/kubeflow-kale/kale) as an even simpler way to define `kubeflow` pipelines.

### DSL

- Go through [`kfp.dsl`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html) documentation. What others methods could you use to the define the `pipeline`? Check out [parallel loop](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ParallelFor)

### Kubernetes

- Check how to directly manipulate `k8s` objects [here](https://www.kubeflow.org/docs/components/pipelines/sdk/manipulate-resources/). __It should not be done in general__ and `k8s` objects are better provided more "statically", but it is worth knowing one can do this from `kubeflow`.

### Metrics

- Check how to create `metrics` as a part of `pipeline` or as a `component` [here](https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/). Which option (`pipeline` or `component`) should be preferred? 


## Additional

### Components

- Check out [static type checking](https://www.kubeflow.org/docs/components/pipelines/sdk/static-type-checking/). How could one utilize it for increased robustness of the pipeline?

### Visualization

- Check visualization within `Kubeflow UI` [here](https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/). __You might want to use different way for `visualization`s though as `kubeflow`'s target is a little different__

### Integrations

- Check out [`min.io`](https://min.io/) for better understanding of the cloud-native and `k8s` first data foramt
- Read about basics of [`argoproj`](https://github.com/argoproj/argo-workflows/) which is used by `kubeflow` to orchestrate workflows
- `feast` feature store can be used together with `kubeflow`. Check [relevant documentation](https://www.kubeflow.org/docs/external-add-ons/feature-store/overview/) __and use it after `feast` was presented by `AiCore` team!__
- Check out integrated [tools for serving](https://www.kubeflow.org/docs/external-add-ons/serving/) (includes `NVidia`'s `Triton` and `bento` project)
- Check out [`kubeflow fairing`](https://www.kubeflow.org/docs/external-add-ons/fairing/fairing-overview/) for improved hybrid cloud experience with `ML` (also allows to run/debug our runs locally). In addition, one can easily open an `endpoint` with deployed `model`, hence __worth checking out!__