# Building a Pipeline

## Overview

This tutorial describes how to create a component for Kubeflow Pipelines and how to combine components into a pipeline. For an easier start, experiment with [the Kubeflow Pipelines samples](https://www.kubeflow.org/docs/pipelines/tutorials/build-pipeline/).


## Before you begin

1. Run the following command to install Kubeflow Pipelines SDK.

In [None]:
!pip install kfp --upgrade

2. Import the `kfp` and `kfp.components` packages.

In [None]:
import kfp

3. Create an instance of the [kfp.Client class](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client).

In [None]:
# If you run this command on a Jupyter notebook running on Kubeflow, you can
# exclude the host parameter.
# client = kfp.Client()
client = kfp.Client(host='<your-kubeflow-pipelines-host-name>')

## Understanding pipelines

### Component

A pipeline component is an implementation of a pipeline task. A component represents a step in the workflow. Each component takes zero or more inputs and may produce zero or more outputs. A component consists of an interface (inputs/outputs), the implementation (a Docker container image and command-line arguments) and metadata (name, description).

### Task

A pipeline task is an instance of a component. In KFP DSL, a task is a [`ContainerOp`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp) object.

### Pipeline

A pipeline is a description of a machine learning (ML) workflow, including all of the components of the workflow and how they work together. The pipeline includes the definition of the inputs (parameters) required to run the pipeline and the inputs and outputs of each component.

## Designing your pipeline

When designing your pipeline, think about how to split an ML workflow into pipeline steps. In general you should design your components with composability in mind. Think about upstream and downstream components. What formats to consume as inputs from the upstream components. What formats to use for output data so that downstream components can consume it.

You can learn more about how to build components and reusable components from [compoents in KFP GitHub repo](
https://github.com/kubeflow/pipelines/tree/master/components).

### Code sample

In the following example, we have a Python function that downloads from a public website a tarball (`.tar.gz` file) that contains multiple `.csv` files, and merge these `.csv` files into a single file.

In [None]:
def download_and_merge_csv(url: str, output_csv: str):
  import glob
  import pandas as pd
  import tarfile
  import urllib.request

  with urllib.request.urlopen(url) as res:
    tarfile.open(fileobj=res, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

We can test the function using the sample code and data shown below. 

In [None]:
download_and_merge_csv(
    url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz', 
    output_csv='merged_data.csv')

!head merged_data.csv

Next, we want to run this script in the Kubeflow Pipelines system.

While we can keep it a single step in a pipeline, for the demo purpose, we can refactor the function by striping away the web download logic and reuse the existing [Web Download component](https://github.com/kubeflow/pipelines/blob/master/components/web/Download/component.yaml) available from Kubeflow Pipelines GitHub repo. 

Other than removing the file download logic, we also type annotate the function parameters with [`kfp.components.InputPath`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=inputpath#kfp.components.InputPath) and [`kfp.components.OutputPath`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=outputpath#kfp.components.OutputPath) respectively. The [`kfp.components.InputPath`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=inputpath#kfp.components.InputPath) annotation informs the system to pass the data file path to the function instead of the actual data. This is necessary to match the output from the upstream component given [Web Download component](https://github.com/kubeflow/pipelines/blob/master/components/web/Download/component.yaml) outputs the file path as well.
Similarly, [`kfp.components.OutputPath`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=outputpath#kfp.components.OutputPath) tells the systems to output the file path instead of the file content.

Learn more about component data passing (TODO: link)

In [None]:
def merge_csv(file_path: kfp.components.InputPath('Tarball'),
              output_csv: kfp.components.OutputPath('CSV')):
  import glob
  import pandas as pd
  import tarfile

  tarfile.open(name=file_path, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

We then call [`kfp.components.create_component_from_func()`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html#kfp.components.create_component_from_func) to convert the function into a pipeline component.
[`kfp.components.create_component_from_func()`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html#kfp.components.create_component_from_func) returns a factory function that you can use to create a [`kfp.dsl.ContainerOp`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp) class instance -- a pipeline task.
Optionally, as demonstrated in the code below, you can save the [component specification](https://www.kubeflow.org/docs/pipelines/reference/component-spec/) into a reusable YMAL file which can be loaded later.

In [None]:
merge_csv_op = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='component.yaml', # This is optional. It saves the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

## Building your pipeline

We build a pipeline that uses the above component and the [Web Download component](https://github.com/kubeflow/pipelines/blob/master/components/web/Download/component.yaml). 

We can load the [Web Download component](https://github.com/kubeflow/pipelines/blob/master/components/web/Download/component.yaml) from URL using [`kfp.components.load_component_from_url()`](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=load_component_from_url#kfp.components.load_component_from_url). The function return a factory function, similar to `merge_csv_op`, that we can use to instantiate a pipeline task.


In [None]:
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component.yaml')

Next, we define a pipeline function that uses these factory functions.
A pipeline function defines a collection of tasks and the dependencies among the tasks if there's any. 
In the example below, our pipeline contains two tasks: `web_downloader_task` and `merge_csv_task`. 

`web_downloader_task` gets its input value, the url to the Tarball file from the function parameter, aka pipeline parameter, while its output is passed as input to `merge_csv_task`. 


In [None]:
# Define a pipeline and create a task from a component:
def my_pipeline(url):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = merge_csv_op(file=web_downloader_task.outputs['data'])
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

Learn more about advanced features: 
- [DSL Recursion](https://www.kubeflow.org/docs/pipelines/sdk/dsl-recursion/)
- [Data passing](#) (TODO: link)
- [Manipulate Kubernetes Resources as Part of a Pipeline](https://www.kubeflow.org/docs/pipelines/sdk/manipulate-resources/) (Experimental)

## Compiling and running your pipeline

After defining the pipeline in Python as described above, you can compile the pipeline to an workflow YAML spec before you submit it to the Kubeflow Pipelines service. 


In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml')

We can submit the compiled workflow specification `pipeline.yaml` using the SDK client we initiated earlier. 


In [None]:
client.create_run_from_pipeline_package(
    pipeline_file='pipeline.yaml',
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    })

Alternatively, you can upload `pipeline.yaml` through the Kubeflow Pipelines UI. See the guide to [getting started with the UI](https://www.kubeflow.org/docs/pipelines/pipelines-quickstart).