# Kubeflow Pipeline SDK Demo

This notebook is based on [Kubeflow doc](https://www.kubeflow.org/docs/components/pipelines/v1/sdk/build-pipeline/).
We will create a pipeline YAML specification via python function-based method with the KFP SDK v1.

First let's install the kfp sdk package.

In [1]:
!pip install kfp==1.8.22 -q

In [2]:
import kfp
import kfp.components as comp

The function’s arguments are decorated with the `kfp.components.InputPath` and the `kfp.components.OutputPath` annotations. These annotations let Kubeflow Pipelines know to provide the path to the zipped tar file and to create a path where your function stores the merged CSV file.

In [3]:
def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
  import glob
  import pandas as pd
  import tarfile

  tarfile.open(name=file_path, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

Use `kfp.components.create_component_from_func` to return a factory function that you can use to create pipeline steps. This example also specifies the base container image to run this function in, the path to save the component specification to, and a list of PyPI packages that need to be installed in the container at runtime.

In [4]:
create_step_merge_csv = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='tmp/component.yaml', # This is optional. It saves the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4']
)

Use `kfp.components.load_component_from_url` to load the component specification YAML for any components that you are reusing in this pipeline.


Define your pipeline as a Python function.

Your pipeline function’s arguments define your pipeline’s parameters. Use pipeline parameters to experiment with different hyperparameters, such as the learning rate used to train a model, or pass run-level inputs, such as the path to an input file, into a pipeline run.

Use the factory functions created by `kfp.components.create_component_from_func` and `kfp.components.load_component_from_url` to create your pipeline’s tasks. The inputs to the component factory functions can be pipeline parameters, the outputs of other tasks, or a constant value. In this case, the web_downloader_task task uses the url pipeline parameter, and the merge_csv_task uses the data output of the web_downloader_task.



In [5]:
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')

# Define a pipeline and create a task from a component:
def my_pipeline(url):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

Run the following to compile your pipeline and save it as `pipeline.yaml`

In [6]:
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='tmp/pipeline.yaml')

Upload it to pipeline via UI.
Then you can execute a run (under an experiment) with this example csv `https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz`

-----

Additional method: If you setup the client already, you can upload the pipeline via SDK:

In [None]:
# client = kfp.Client() # change arguments accordingly
# client.create_run_from_pipeline_func(
#     my_pipeline,
#     arguments={
#         'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
#     })