# KFP SDK: Component I/O and passing data between pipeline components



In this notebook, we'll build an example that shows how data can be passed between pipeline steps.



## Setup

Before you run this notebook, ensure that your Google Cloud user account and project are granted access to the Managed Pipelines Experimental. To be granted access to the Managed Pipelines Experimental, fill out this [form](http://go/cloud-mlpipelines-signup) and let your account representative know you have requested access. 

This notebook is intended to be run on either one of:
* [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). See the "AI Platform Notebooks" section in the Experimental [User Guide](https://docs.google.com/document/d/1JXtowHwppgyghnj1N1CT73hwD1caKtWkLcm2_0qGBoI/edit?usp=sharing) for more detail on creating a notebook server instance.
* [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb)

**To run this notebook on AI Platform Notebooks**, click on the **File** menu, then select "Download .ipynb".  Then, upload that notebook from your local machine to AI Platform Notebooks. (In the AI Platform Notebooks left panel, look for an icon of an arrow pointing up, to upload).


We'll first install some libraries and set up some variables.

Set `gcloud` to use your project.  **Edit the following cell before running it**.

In [None]:
PROJECT_ID = 'your-project-id'  # <---CHANGE THIS

In [None]:
!gcloud config set project {PROJECT_ID}

If you're running this notebook on colab, authenticate with your user account:

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

-----------------

**If you're on AI Platform Notebooks**, authenticate with Google Cloud before running the next section, by running
```sh
gcloud auth login
```
**in the Terminal window** (which you can open via **File** > **New** in the menu). You only need to do this once per notebook instance.

### Install the KFP SDK and AI Platform Pipelines client library

For Managed Pipelines Experimental, you'll need to download a special version of the AI Platform client library.

In [None]:
!gsutil cp gs://cloud-aiplatform-pipelines/aiplatform_pipelines_client-0.1.0.caip20201109-py3-none-any.whl .

Then, install the libraries and restart the kernel.

In [None]:
if 'google.colab' in sys.modules:
  USER_FLAG = ''
else:
  USER_FLAG = '--user'

In [None]:
!python3 -m pip install {USER_FLAG} kfp==1.1.1 --upgrade
!python3 -m pip install {USER_FLAG} aiplatform_pipelines_client-0.1.0.caip20201109-py3-none-any.whl --upgrade

In [None]:
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

The KFP version should be == 1.1.1.



In [None]:
# Check the KFP version
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

### Set some variables

**Before you run the next cell**, **edit it** to set variables for your project.  See the "Before you begin" section of the User Guide for information on creating your API key.  For `BUCKET_NAME`, enter the name of a Cloud Storage (GCS) bucket in your project.  Don't include the `gs://` prefix.

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
USER = 'your-user-name' # <---CHANGE THIS
BUCKET_NAME = 'your-bucket-name'  # <---CHANGE THIS
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
REGION = 'us-central1'
API_KEY = 'your-api-key'  # <---CHANGE THIS

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

## Simple two-step pipeline with 'producer' and 'consumer' steps

Now we're ready to define and run a pipeline.  We'll build an example that shows how data can be passed between pipeline steps.


We'll first do some imports:

In [None]:
import time
from kfp.v2 import components
from kfp.v2 import compiler
from kfp.v2 import dsl

Next, we'll define the pipeline components.  We'll do this via the `load_component_from_text` method, which expects a string in `yaml` syntax. 

Both components use as their base container image the `google/cloud-sdk` image, and run a series of shell commands.


### Producer component

The **Producer** component takes as inputs an `inputValue` parameter (`input_text`, of type `String`), and as output an `outputPath` parameter (`output_value`) and an `outputURI` parameter (`output_artifact`).

When the op runs, a Cloud Storage (GCS) path will be generated automatically for `output_value`, which for this component is the second argument passed to the shell command (`$1`). Similarly, a GCS path will be generated for the `output_artifact` (`$2`).

When the component op runs, the input values are echoed, and then the `input_text` value is copied via `gsutil` to those paths, making those outputs available for consumption by other components. 

In [None]:
producer_op = components.load_component_from_text("""
name: Producer
inputs:
- {name: input_text, type: String, description: 'Represents an input parameter.'}
outputs:
- {name: output_value, type: String, description: 'Represents an output paramter.'}
- {name: output_artifact, description: 'Represents an output artifact.'}
implementation:
  container:
    image: google/cloud-sdk:latest
    command:
    - sh
    - -c
    - |
      set -e -x
      echo "$0, this is an output parameter" | gsutil cp - "$1"
      echo "$0, this is an output artifact" | gsutil cp - "$2"
    - {inputValue: input_text}
    - {outputPath: output_value}
    - {outputUri: output_artifact}
""")


### Consumer component

The **Consumer** component has two inputs. When the component op runs, the input values are echoed.  We'll define our pipeline so that the `input_value` and `input_artifact` of the Consumer step are obtained from the outputs of the **Producer** step.

In [None]:
consumer_op = components.load_component_from_text("""
name: Consumer
inputs:
- {name: input_value, type: String, description: 'Represents an input parameter. It connects to an upstream output parameter.'}
- {name: input_artifact, description: 'Represents an input artifact. It connects to an upstream output artifact.'}
implementation:
  container:
    image: google/cloud-sdk:latest
    command:
    - sh
    - -c
    - |
      set -e -x
      echo "Read from an input parameter: " && echo "$0"
      echo "Read from an input artifact: " && gsutil cat "$1"
    - {inputValue: input_value}
    - {inputUri: input_artifact}
""")


## Define a pipeline using the components and submit a run

Next, we'll define a two-step pipeline that uses the producer and consumer ops.  We're setting the Consumer op's `input_value` arg to the output of the Producer op; specifically, the `output_value` output.  Under the hood, this value is obtained by automatically reading the GCS file path to which the Producer step wrote.

In [None]:
@dsl.pipeline(name='simple-two-step-pipeline-{}-{}'.format(USER, str(int(time.time()))))
def two_step_pipeline(
    text = 'Hello world'
):
  producer = producer_op(input_text=text)
  consumer = consumer_op(input_value=producer.outputs['output_value'], 
                         input_artifact=producer.outputs['output_artifact'])


Compile the pipeline:

In [None]:
compiler.Compiler().compile(pipeline_func=two_step_pipeline, 
                            pipeline_root=PIPELINE_ROOT,
                            output_path='two_step_pipeline_job.json')

### Submit the pipeline job

Here, we'll create an API client using the API key you generated.

Then, we'll submit the pipeline job by passing the compiled spec to the `create_run_from_job_spec()` method. Note that we're passing a `parameter_values` dict that specifies the pipeline input parameters we want to use.

In [None]:
from aiplatform.pipelines import client

api_client = client.Client(project_id=PROJECT_ID, region=REGION, api_key=API_KEY)

response = api_client.create_run_from_job_spec(
    job_spec_path='two_step_pipeline_job.json',
    # pipeline_root=PIPELINE_ROOT,  # optional- use if want to override compile-time value
    parameter_values={'text': 'This is some input text'})

### Monitor the pipeline run in the Cloud Console

Once you've deployed the pipeline run, you can monitor it in the [Cloud Console](https://console.cloud.google.com/ai/platform/pipelines) under **AI Platform (Unified)** > **Pipelines**. 

Click in to the pipeline run to see the run graph (for our simple pipeline, this consists of two steps), and click on a step to view the job detail and the logs for that step.

<!-- <a href="https://storage.googleapis.com/amy-jo/images/kf-pls/producer_consumer.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/producer_consumer.png" width="50%"/></a> -->

## What next?

Next, try out some of the other notebooks.

- a [KFP intro notebook](https://colab.research.google.com/drive/1mrud9HjsVp5fToHwwNL0RotFtJCKtfZ1#scrollTo=feV62LXyW7cN).
- a KFP example that [shows building custom components for data processing and training](https://colab.research.google.com/drive/1CV5SgrhRp0bgJcFKGc0G5oWwGTHc7bqt?usp=sharing). It also shows how to pass typed artifact data between component, and how to specify required resources when defining a pipeline.

- A TFX notebook that [shows the canonical 'Chicago taxi' example](https://colab.research.google.com/drive/1dNLlm21F6f5_4aeIg-Zs_F1iGGRPEvhW), and how to use custom Python functions and custom containers.

-----------------------------
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.