# Managed Pipelines Experimental: Custom containers and resource specs

This notebook shows how to build and use custom containers for Pipeline components.  It also shows how to pass typed artifact data between component, and how to specify required resources when defining a pipeline.

This example uses one of the TensorFlow Datasets, in particular the [Large Movie Review Dataset](https://www.tensorflow.org/datasets/catalog/imdb_reviews#imdb_reviewssubwords8k), for a binary sentiment classification task: predicting whether a movie review is negative or positive. 

## Setup

Before you run this notebook, ensure that your Google Cloud user account and project are granted access to the Managed Pipelines Experimental. To be granted access to the Managed Pipelines Experimental, fill out this [form](http://go/cloud-mlpipelines-signup) and let your account representative know you have requested access. 

This notebook is intended to be run on either one of:
* [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). See the "AI Platform Notebooks" section in the Experimental [User Guide](https://docs.google.com/document/d/1JXtowHwppgyghnj1N1CT73hwD1caKtWkLcm2_0qGBoI/edit?usp=sharing) for more detail on creating a notebook server instance.
* [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb)

**To run this notebook on AI Platform Notebooks**, click on the **File** menu, then select "Download .ipynb".  Then, upload that notebook from your local machine to AI Platform Notebooks. (In the AI Platform Notebooks left panel, look for an icon of an arrow pointing up, to upload).

We'll first install some libraries and set up some variables.


Set `gcloud` to use your project.  **Edit the following cell before running it**.

In [None]:
PROJECT_ID = 'your-project-id'  # <---CHANGE THIS

Set `gcloud` to use your project.

In [None]:
!gcloud config set project {PROJECT_ID}

If you're running this notebook on colab, authenticate with your user account:

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

-----------------

**If you're on AI Platform Notebooks**, authenticate with Google Cloud before running the next section, by running
```sh
gcloud auth login
```
**in the Terminal window** (which you can open via **File** > **New** in the menu).  You only need to do this once per notebook instance.

### Install the KFP SDK and AI Platform Pipelines client library

For Managed Pipelines Experimental, you'll need to download a special version of the AI Platform client library.

Then, install the libraries and restart the kernel. If you see a permissions error for the Metadata libraries, make sure you've run the `gcloud auth login` command as indicated above.

In [None]:
!gsutil cp gs://cloud-aiplatform-pipelines/releases/20210304/aiplatform_pipelines_client-0.1.0.caip20210304-py3-none-any.whl .
# Get the Metadata SDK to query the produced metadata.
!gsutil cp gs://cloud-aiplatform-metadata/sdk/google-cloud-aiplatform-metadata-0.0.1.tar.gz .

In [None]:
if 'google.colab' in sys.modules:
  USER_FLAG = ''
else:
  USER_FLAG = '--user'

Install the libraries:

In [None]:
!python3 -m pip install {USER_FLAG} kfp==1.4 google-cloud-aiplatform-metadata-0.0.1.tar.gz aiplatform_pipelines_client-0.1.0.caip20210304-py3-none-any.whl --upgrade


In [None]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

The KFP version should be >= 1.4.



In [None]:
# Check the KFP version
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

If you're on colab, re-authorize after the kernel restart. **Edit the following cell for your project ID before running it.**

In [None]:
import sys
if 'google.colab' in sys.modules:
  PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
  !gcloud config set project {PROJECT_ID}
  from google.colab import auth
  auth.authenticate_user()
  USER_FLAG = ''

### Set some variables

**Before you run the next cell**, **edit it** to set variables for your project.  See the "Before you begin" section of the User Guide for information on creating your API key.  For `BUCKET_NAME`, enter the name of a Cloud Storage (GCS) bucket in your project.  Don't include the `gs://` prefix.

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
USER = 'YOUR_USER_NAME' # <---CHANGE THIS
BUCKET_NAME = 'YOUR_BUCKET_NAME'  # <---CHANGE THIS
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

PROJECT_ID = 'YOUR_PROJECT_ID'  # <---CHANGE THIS
REGION = 'us-central1'
API_KEY = 'YOUR_API_KEY'  # <---CHANGE THIS

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

## Build custom container components


We'll first build the two components that we'll use in our pipeline. The first component generates train and test data, and the second component consumes that data to train a model (to predict movie review sentiment).

These components are based on custom Docker container images that we'll build and upload to the Google Container Registry, using Cloud Build.

### Container 1: Generate examples

First, we'll define and write out the `generate_examples.py` code.  It generates train and test set files from the [IMDB review data](https://www.tensorflow.org/datasets/catalog/imdb_reviews#imdb_reviewssubwords8k), in `TFRecord` format.

In [None]:
!mkdir -p generate

In [None]:
%%writefile generate/generate_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds


def _serialize_example(example, label):
  example_value = tf.io.serialize_tensor(example).numpy()
  label_value = tf.io.serialize_tensor(label).numpy()
  feature = {
      'examples':
          tf.train.Feature(
              bytes_list=tf.train.BytesList(value=[example_value])),
      'labels':
          tf.train.Feature(bytes_list=tf.train.BytesList(value=[label_value])),
  }
  return tf.train.Example(features=tf.train.Features(
      feature=feature)).SerializeToString()


def _tf_serialize_example(example, label):
  serialized_tensor = tf.py_function(_serialize_example, (example, label),
                                     tf.string)
  return tf.reshape(serialized_tensor, ())


def generate_examples(training_data_uri, test_data_uri, config_file_uri):
  (train_data, test_data), info = tfds.load(
      # Use the version pre-encoded with an ~8k vocabulary.
      'imdb_reviews/subwords8k',
      # Return the train/test datasets as a tuple.
      split=(tfds.Split.TRAIN, tfds.Split.TEST),
      # Return (example, label) pairs from the dataset (instead of a dictionary).
      as_supervised=True,
      with_info=True)

  serialized_train_examples = train_data.map(_tf_serialize_example)
  serialized_test_examples = test_data.map(_tf_serialize_example)

  filename = os.path.join(training_data_uri, "train.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_train_examples)

  filename = os.path.join(test_data_uri, "test.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_test_examples)

  encoder = info.features['text'].encoder
  config = {
      'vocab_size': encoder.vocab_size,
  }
  config_file = os.path.join(config_file_uri, "config")
  with tf.io.gfile.GFile(config_file, 'w') as f:
    f.write(json.dumps(config))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)

  args = parser.parse_args()
  generate_examples(args.training_data_uri, args.test_data_uri,
                    args.config_file_uri)

Next, we'll create a Dockerfile that builds a container to run `generate_examples.py`. We are using a Google [Deep Learning Container](https://cloud.google.com/ai-platform/deep-learning-containers) image as our base, since the image already includes most of what we need. 
You may use your own image as the base image instead. Note that we're also installing the `tensorflow_datasets` library.

In [None]:
%%writefile generate/Dockerfile

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest
WORKDIR /pipeline
COPY generate_examples.py generate_examples.py
RUN pip install tensorflow_datasets
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

We'll use [Cloud Build](https://cloud.google.com/cloud-build/docs) to build the container image and write it to [GCR](https://cloud.google.com/container-registry).

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/custom-container-generate:{USER} generate

### Container 2: Train Examples

Next, we'll do the same for the 'Train Examples' custom container. We'll first write out a `train_examples.py` file, then build a container that runs it.  This script takes as input training and test data in `TFRecords` format and trains a Keras binary classification model to predict review sentiment. When training has finished, it writes out model and metrics information.

In [None]:
!mkdir -p train

In [None]:
%%writefile train/train_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf


def _parse_example(record):
  f = {
      'examples': tf.io.FixedLenFeature((), tf.string, default_value=''),
      'labels': tf.io.FixedLenFeature((), tf.string, default_value='')
  }
  return tf.io.parse_single_example(record, f)


def _to_tensor(record):
  examples = tf.io.parse_tensor(record['examples'], tf.int64)
  labels = tf.io.parse_tensor(record['labels'], tf.int64)
  return (examples, labels)


def train_examples(training_data_uri, test_data_uri, config_file_uri,
                   output_model_uri, output_metrics_uri):
  train_examples = tf.data.TFRecordDataset(
      [os.path.join(training_data_uri, 'train.tfrecord')])
  test_examples = tf.data.TFRecordDataset(
      [os.path.join(test_data_uri, 'test.tfrecord')])

  train_batches = train_examples.map(_parse_example).map(_to_tensor)
  test_batches = test_examples.map(_parse_example).map(_to_tensor)

  with tf.io.gfile.GFile(os.path.join(config_file_uri, 'config')) as f:
    config = json.loads(f.read())

  model = tf.keras.Sequential([
      tf.keras.layers.Embedding(config['vocab_size'], 16),
      tf.keras.layers.GlobalAveragePooling1D(),
      tf.keras.layers.Dense(1, activation='sigmoid')
  ])

  model.summary()

  model.compile(
      optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

  train_batches = train_batches.shuffle(1000).padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  test_batches = test_batches.padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  history = model.fit(
      train_batches,
      epochs=10,
      validation_data=test_batches,
      validation_steps=30)

  loss, accuracy = model.evaluate(test_batches)

  metrics = {
      'loss': str(loss),
      'accuracy': str(accuracy),
  }

  model_json = model.to_json()
  with tf.io.gfile.GFile(os.path.join(output_model_uri, 'model.json'),
                         'w') as f:
    f.write(model_json)

  with tf.io.gfile.GFile(os.path.join(output_metrics_uri, 'metrics.json'),
                         'w') as f:
    f.write(json.dumps(metrics))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)
  parser.add_argument('--output_model_uri', type=str)
  parser.add_argument('--output_metrics_uri', type=str)

  args = parser.parse_args()

  train_examples(args.training_data_uri, args.test_data_uri,
                 args.config_file_uri, args.output_model_uri,
                 args.output_metrics_uri)


Next, we'll create a Dockerfile that builds a container to run `train_examples.py`.  Again we're using a Google [Deep Learning Container](https://cloud.google.com/ai-platform/deep-learning-containers) image as our base.

In [None]:
%%writefile train/Dockerfile

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest
WORKDIR /pipeline
COPY train_examples.py train_examples.py
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

We'll use [Cloud Build](https://cloud.google.com/cloud-build/docs) to build the container image and write it to [GCR](https://cloud.google.com/container-registry).

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/custom-container-train:{USER} train

### Create pipeline components using the custom container images

Next, we'll define components for the 'generate' and 'train' steps, using the container images we just built.



In [None]:
import time
from kfp import components
from kfp.v2 import dsl
from kfp.v2 import compiler

The 'generate' component specifies three outputs: training and test data, of type `Dataset`, and a config file, of type `File`. 

The component definition uses  `outputPath`  in specifying the `generate_example.py` script args.  These args are set to automatically-generated GCS URIs, and when `generate_examples` writes to those URIs, the outputs are available to downstream components.



In [None]:
generate_op = components.load_component_from_text("""
name: GenerateExamples
outputs:
- {name: training_data, type: Dataset}
- {name: test_data, type: Dataset}
- {name: config_file, type: File}
implementation:
  container:
    image: gcr.io/%s/custom-container-generate:%s
    command:
    - python
    - /pipeline/generate_examples.py
    args:
    - --training_data_uri
    - {outputUri: training_data}
    - --test_data_uri
    - {outputUri: test_data}
    - --config_file_uri
    - {outputUri: config_file}
""" % (PROJECT_ID, USER))

The train component takes as input training and test data of type `Dataset`, and a config `File`: it can consume the outputs of the "generate" component.   It specifies two outputs, one of type `Model` and one of type `Metrics`.

The component definition uses  `inputPath` and `outputPath` when passing args to the `train_examples` script. So, the script's arg values will be GCS URIs, from which it will read its inputs and write its outputs. 

In [None]:
train_op = components.load_component_from_text("""
name: Train
inputs:
- {name: training_data, type: Dataset}
- {name: test_data, type: Dataset}
- {name: config_file, type: File}
outputs:
- {name: model, type: Model}
- {name: metrics, type: Metrics}
implementation:
  container:
    image: gcr.io/%s/custom-container-train:%s
    command:
    - python
    - /pipeline/train_examples.py
    args:
    - --training_data_uri
    - {inputUri: training_data}
    - --test_data_uri
    - {inputUri: test_data}
    - --config_file_uri
    - {inputUri: config_file}
    - --output_model_uri
    - {outputUri: model}
    - --output_metrics_uri
    - {outputUri: metrics}
""" % (PROJECT_ID, USER))

## Define a KFP pipeline that uses the components

Now we're ready to define a pipeline that uses these components. The `train` step takes its inputs from the `generate` step's outputs. 

Note also that we are able to define pipeline *resource* specs, which we do here for the training step, including memory constraints, the number of GPUs to allocate, and the type of accelerator to use.

In [None]:
@dsl.pipeline(name='custom-container-pipeline-{}-{}'.format(USER, str(int(time.time()))))
def pipeline():
  generate = generate_op()
  train = (train_op(
      training_data=generate.outputs['training_data'],
      test_data=generate.outputs['test_data'],
      config_file=generate.outputs['config_file']).
    set_cpu_limit('4').
    set_memory_limit('14Gi').
    add_node_selector_constraint(
      'cloud.google.com/gke-accelerator',
      'nvidia-tesla-k80').
    set_gpu_limit(1))


Compile the pipeline:

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, 
                            pipeline_root=PIPELINE_ROOT,
                            output_path='custom_container_pipeline_spec.json')

### Submit the pipeline job

Here, we'll create an API client using the API key you generated.

Then, we'll submit the pipeline job by passing the compiled spec to the `create_run_from_job_spec()` method. Note that we're passing a `parameter_values` dict that specifies the pipeline input parameters we want to use.

In [None]:
from aiplatform.pipelines import client

api_client = client.Client(project_id=PROJECT_ID, region=REGION, api_key=API_KEY)

response = api_client.create_run_from_job_spec(
    job_spec_path='custom_container_pipeline_spec.json',
    name = 'my-pipeline-run-1'  # <- pipeline run name. Must be unique (change if you rerun)
    # pipeline_root=PIPELINE_ROOT,  # optional- use if want to override compile-time value
    )

## Query the metadata produced by the pipeline.

The set of artifacts and executions produced by the pipeline can also be queried using the AIPlatform Metadata SDK. The following shows a snippet for querying the metadata for a given pipeline run:

In [None]:
from google.cloud import aiplatform

from google import auth
from google.cloud.aiplatform_v1alpha1 import MetadataServiceClient
from google.auth.transport import grpc, requests
from google.cloud.aiplatform_v1alpha1.services.metadata_service.transports import grpc as transports_grpc

import pandas as pd


def _initialize_metadata_service_client() -> MetadataServiceClient:
  scope = 'https://www.googleapis.com/auth/cloud-platform'
  api_uri = 'us-central1-aiplatform.googleapis.com'
  credentials, _ = auth.default(scopes=(scope,))
  request = requests.Request()
  channel = grpc.secure_authorized_channel(credentials, request, api_uri)

  return MetadataServiceClient(
      transport=transports_grpc.MetadataServiceGrpcTransport(channel=channel))

client = _initialize_metadata_service_client()

In [None]:

def get_run_context_name(pipeline_run):
  contexts = client.list_contexts(parent='projects/{}/locations/{}/metadataStores/default'.format(PROJECT_ID, REGION))
  for context in contexts:
    if context.display_name == pipeline_run:
      return context.name
  
run_context_name = get_run_context_name('my-pipeline-run-1')  # <- Name of the pipeline run

client.query_context_lineage_subgraph(context=run_context_name)

### Monitor the pipeline run in the Cloud Console

Once you've deployed the pipeline run, you can monitor it in the [Cloud Console](https://console.cloud.google.com/ai/platform/pipelines) under **AI Platform (Unified)** > **Pipelines**. 

Click in to the pipeline run to see the run graph (for our pipeline, this consists of two steps), and click on a step to view the job detail and the logs for that step.

As you look at the pipeline graph, you'll see that you can inspect the artifacts passed between the pipeline steps.

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/generate_train.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/generate_train.png" width="70%"/></a>

## What next?

Next, try out some of the other notebooks.

- a [KFP intro notebook](https://colab.research.google.com/drive/1mrud9HjsVp5fToHwwNL0RotFtJCKtfZ1#scrollTo=feV62LXyW7cN).
- a simple KFP example that [shows how data can be passed between pipeline steps](https://colab.research.google.com/drive/1NztsGV-FAp71MU7zfMHU0SlfQ8dpw-9u).

- A TFX notebook that [shows the canonical 'Chicago taxi' example](https://colab.research.google.com/drive/1dNLlm21F6f5_4aeIg-Zs_F1iGGRPEvhW), and how to use custom Python functions and custom containers. 

-----------------------------
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.