Attempt to run a Kubeflow Pipeline locally.

Following the instructions here: https://www.kubeflow.org/docs/components/pipelines/sdk-v2/build-pipeline/

In [4]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)
import glob
import pandas as pd
import tarfile
import urllib.request
import urllib

In [5]:
def download_and_merge_csv(url: str, output_csv: str):
  with urllib.request.urlopen(url) as res:
    tarfile.open(fileobj=res, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

In [6]:
download_and_merge_csv(
    url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz', 
    output_csv='merged_data.csv')

In [7]:
!head merged_data.csv

6.3,3.3,6.0,2.5,virginica
5.8,2.7,5.1,1.9,virginica
7.1,3.0,5.9,2.1,virginica
6.3,2.9,5.6,1.8,virginica
6.5,3.0,5.8,2.2,virginica
7.6,3.0,6.6,2.1,virginica
4.9,2.5,4.5,1.7,virginica
7.3,2.9,6.3,1.8,virginica
6.7,2.5,5.8,1.8,virginica
7.2,3.6,6.1,2.5,virginica


## Kubeflow Pipeline

Below we will run the above function for downloading and merging CSVs as a two-step pipeline.

In [13]:
@component(
    packages_to_install=['pandas==1.1.4'],
    output_component_file='component.yaml'
)
def merge_csv(tar_data: Input[Artifact], output_csv: Output[Dataset]):
  import glob
  import pandas as pd
  import tarfile

  tarfile.open(name=tar_data.path, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv.path, index=False, header=False)

In [10]:
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component-sdk-v2.yaml')

In [14]:
# Define a pipeline and create a task from a component:
@dsl.pipeline(
    name='my-pipeline',
    # You can optionally specify your own pipeline_root
    # pipeline_root='gs://my-pipeline-root/example-pipeline',
)
def my_pipeline(url: str):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = merge_csv(tar_data=web_downloader_task.outputs['data'])
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

In [15]:
# Compile and run your pipeline
kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml')

Yey! Managed to run to compiled file `pipeline.yaml` file in the Kubeflow Pipelines Web UI!

Testing option 2 as well: run the pipeline using Kubeflow Pipelines SDK client. Which also works as expected - yay!
The pipeline submitted from this Notebook appeared in the Web UI, and completed successfully.

In [16]:
client = kfp.Client() # change arguments accordingly

In [17]:
client.create_run_from_pipeline_func(
    my_pipeline,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
    # You can optionally override your pipeline_root when submitting the run too:
    # pipeline_root='gs://my-pipeline-root/example-pipeline',
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    })


RunPipelineResult(run_id=d2876641-85a2-44fd-9f47-a5c2edbb89dc)