##### Copyright &copy; 2020 Google Inc.

<font size=-1>Licensed under the Apache License, Version 2.0 (the \"License\");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at [https://www.apache.org/licenses/LICENSE-2.0](https://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.</font>
<hr/>

# Managed Pipelines EAP: Create custom functions and containers

## Introduction

[AI Platform Pipelines - Managed (Managed Pipelines)](https://docs.google.com/document/d/1FAyZhXRmZwJ7oCjRZZmzRG-ERYxyZyUQikrjR28Ev4E/edit?ts=5ec30a40#) makes it easier for you to run your ML Pipelines in a scalable and cost-effective way, while offering you ‘no lock-in’ flexibility. You build your pipelines in Python using [TensorFlow Extended (TFX)](tensorflow.org/tfx), and then execute your pipelines on Google Cloud serverlessly. You don’t have to worry about scale and only pay for what you use. (You can also take the same TFX pipelines and run them using Kubeflow Pipelines).

This notebook shows examples of how to create custom AI Platform Pipelines components.  If you find that you need to build your own components, there are three ways to do this : 

1. You can use the [TFX Custom Components SDK](https://www.tensorflow.org/tfx/guide/custom_component)
1. You can convert any Python function into a TFX Custom Component 
1. You can take any container and use that as a component. 

This notebook walks through examples for the 2nd and 3rd options. It also shows an example of how you can specify a pipeline with task-based dependencies.


The notebook is designed to run on AI Platform Notebooks. If you want to run this notebook in your own development environment, you will need to do a bit more setup first.  See [these instructions](<https://docs.google.com/document/d/1FAyZhXRmZwJ7oCjRZZmzRG-ERYxyZyUQikrjR28Ev4E/edit?ts=5ec30a40#heading=h.pyk4nfqsszzz>).  


## Step 1: Follow the 'before you begin' steps in the Managed Pipelines User Guide

Before proceeeding, make sure that you've followed all the steps in the ["Before you Begin" section](https://docs.google.com/document/d/1FAyZhXRmZwJ7oCjRZZmzRG-ERYxyZyUQikrjR28Ev4E/edit?ts=5ec30a40#heading=h.65kbhyyf93x0) of the Managed Pipelines User Guide.  You'll need to use the API key that you created for this notebook.

## Step 2: set up your environment

First, ensure that Python 3 is being used.

In [None]:
import sys
sys.version

### Install the TFX SDK

Next, we'll upgrade pip and install the TFX SDK.

In [None]:
SDK_LOCATION = 'gs://caip-pipelines-sdk/releases/20200727/tfx-0.22.0.caip20200727-py3-none-any.whl'

In [None]:
%%capture
!pip install pip --upgrade
!gsutil cp {SDK_LOCATION} /tmp/tfx-0.22.0.caip20200727-py3-none-any.whl && pip install --no-cache-dir /tmp/tfx-0.22.0.caip20200727-py3-none-any.whl

Next, install Skaffold. 

> Note: if you're running this notebook in a non-linux local development environment, see [these Skaffold installation instructions](https://skaffold.dev/docs/install/) instead.

In [None]:
# Install skaffold.
!curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold && mkdir -p /home/jupyter/.local/bin && mv skaffold /home/jupyter/.local/bin/

# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Ensure that you can import TFX and that its version is >= 0.22.

In [None]:
# Check version
import tfx
tfx.__version__

### Identify or Create a GCS bucket to use for your pipeline

Below, you will need to specify a Google Gloud Storage (GCS) bucket for the Pipelines run to use.  If you do not already have one that you want to use, you can [create a new bucket](https://cloud.google.com/storage/docs/creating-buckets).

### Set up variables

Let's set up some variables used to customize the pipelines below. **Before you execute the following cell, make the indicated 'Change this' edits**.

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
    
USER = 'YOUR_USERNAME'  # Change this to your username.
BUCKET_NAME = 'YOUR_GCS_BUCKET'  # Change this to your GCS bucket name.  Do not include the `gs://`.

# It is not necessary to append your username to the pipeline root, 
# but this may be useful if multiple people are using the same project.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)
PROJECT_ID = 'YOUR_PROJECT_ID' # Change this to your project id
BASE_IMAGE = 'gcr.io/caip-pipelines-assets/tfx:0.22.0.caip20200727'

API_KEY = 'YOUR_API_KEY'  # Change this to the API key that you created during initial setup
# ENDPOINT = 'alpha-ml.googleapis.com'  # this is the default during EAP

PIPELINE_ROOT

## Step 3: Custom Python functions

In this section, we will create components from dummy Python functions (that don't do much).

We begin by writing a dummy 'preprocess' function.

In [None]:
%%writefile my_preprocess.py

import os
import tensorflow as tf  # Used for writing files.

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import OutputArtifact

@component
def MyPreprocess(training_data: OutputArtifact[Dataset]):
  with tf.io.gfile.GFile(os.path.join(training_data.uri, 'training_data_file.txt'), 'w') as f:
    f.write('Dummy training data')
    
  # We'll modify metadata and ensure that it gets passed to downstream components.  
  training_data.set_string_custom_property('my_custom_field', 'my_custom_value')
  training_data.set_string_custom_property('uri_for_output', training_data.uri)


Next, we'll write a second component that uses the training data produced.

In [None]:
%%writefile my_trainer.py

import os
import tensorflow as tf

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import Model
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter


@component
def MyTraining(training_data: InputArtifact[Dataset],
               model: OutputArtifact[Model],
               num_iterations: Parameter[int] = 100):
  # Let's read the contents of training data and write to the metadata.
  with tf.io.gfile.GFile(
      os.path.join(training_data.uri, 'training_data_file.txt'), 'r') as f:
    contents = f.read()
    model.set_string_custom_property('contents_of_training_data', contents)
    model.set_int_custom_property('num_iterations_used', num_iterations)


Then, we'll write a finalizer component that collects all metadata, and dumps it. Ensure that `PIPELINE_ROOT` was set properly above.

In [None]:
collector_template = f"""
import os
import tensorflow as tf
import json

from google.protobuf import json_format

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import Model
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import *
from tfx.utils import json_utils

OUTPUT_LOCATION = '{PIPELINE_ROOT}/python_function_pipeline/metadata.json'

@component
def MetadataCollector(training_data: InputArtifact[Dataset],
                      model: InputArtifact[Model]):
  artifacts = [
      json_format.MessageToDict(x)
      for x in [training_data.mlmd_artifact, model.mlmd_artifact]
  ]
  with tf.io.gfile.GFile(OUTPUT_LOCATION, 'w') as f:
    f.write(json.dumps(artifacts, indent=4))
"""
with open('metadata_collector.py', 'w') as f:
    f.write(collector_template)

Next, let's package the above into a container.   
In future, it will be possible to do this via the TFX CLI. For now, we'll do this using a Dockerfile and Skaffold. 

> Note: If you're running this notebook on AI Platform Notebooks, Docker will be installed.  If you're running the notebook in a local development environment, you'll need to have Docker installed there. Confirm that you have [installed Skaffold](https://skaffold.dev/docs/install/) locally as well.

First, we'll define a `skaffold.yaml` file.  We'll first define a string to use in creating the file.

In [None]:
SK_TEMPLATE = "{{{{.IMAGE_NAME}}}}:{}".format(USER)
print(SK_TEMPLATE)

Now we'll write out the Skaffold yaml file.

In [None]:
skaffold_template = f"""
apiVersion: skaffold/v2beta3
kind: Config
metadata:
  name: my-pipeline
build:
  artifacts:
  - image: 'gcr.io/{PROJECT_ID}/caip-tfx-custom'
    context: .
    docker:
      dockerfile: Dockerfile
  tagPolicy:
    envTemplate:
      template: "{{SK_TEMPLATE}}"
"""
with open('skaffold.yaml', 'w') as f:
    f.write(skaffold_template.format(**globals()))

Next, we'll define the `Dockerfile`.

In [None]:
%%writefile Dockerfile
FROM gcr.io/caip-pipelines-assets/tfx:latest
WORKDIR /pipeline
COPY ./ ./
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"


Next, we'll build the container image. Below, we'll use it as the `default_image` when we run the pipeline.

> Note: if you get a permissions error running the build, try first running
```sh
gcloud auth login
```
in the notebook terminal window.

In [None]:
!skaffold build

Next, let's specify a pipeline that uses the components we just defined.

In [None]:
import os

# Only required for local run.
from tfx.orchestration.metadata import sqlite_metadata_connection_config

from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.ai_platform_pipelines import ai_platform_pipelines_dag_runner

from my_preprocess import MyPreprocess
from my_trainer import MyTraining
from metadata_collector import MetadataCollector

PIPELINE_NAME = "function-based-pipeline-{}".format(USER)

def function_based_pipeline(pipeline_root):
    preprocess = MyPreprocess()

    training = MyTraining(
        training_data=preprocess.outputs['training_data'],
        num_iterations=10000)
    
    collect = MetadataCollector(
        training_data=preprocess.outputs['training_data'],
        model=training.outputs['model'])
    
    pipeline_name = "function-based-pipeline-{}".format(USER)
    return Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        # Only needed for local runs.
        metadata_connection_config=sqlite_metadata_connection_config('metadata.sqlite'),
        components=[preprocess, training, collect])



Let's make sure this pipeline works locally, using the Beam runner:

In [None]:
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

BeamDagRunner().run(function_based_pipeline(pipeline_root='/tmp/pipeline_root'))

Check that the metadata was produced locally.

In [None]:
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2

connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'metadata.sqlite'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)
store.get_artifacts()

Now that we confirmed by the local run that things are working, let's run this pipeline with Managed Pipelines:

In [None]:
config = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunnerConfig(
    project_id=PROJECT_ID,
    display_name='function-based-pipeline-{}'.format(USER),
    default_image='gcr.io/{}/caip-tfx-custom:{}'.format(PROJECT_ID, USER))

runner = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunner(config=config)

# If you want to inspect the pipeline proto, run the following and look at the file contents.
# runner = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunner(config=config, output_filename='pipeline.json')
# runner.compile(function_based_pipeline())

runner.run(function_based_pipeline(pipeline_root=os.path.join(PIPELINE_ROOT, PIPELINE_NAME)), api_key=API_KEY)

When the pipeline is complete, we can check the final output file to see the metadata produced.

In [None]:
MD_URI = 'gs://{}/pipeline_root/{}/python_function_pipeline/metadata.json'.format(BUCKET_NAME, USER)
MD_URI

In [None]:
!gsutil cat {MD_URI}

## Step 4: Custom containers

In this section, we will build custom containers and chain them together as a pipeline.

Again, no real ML is taking place, but this illustrates how we can pass data (using URIs) to custom containers. We will write functions that generate examples and then train using examples of that format, and build a new container for each. Then we'll use them to create a container-based pipeline.

### Container 1: Generate examples

First, we'll define and write out the `generate_examples.py` code:

In [None]:
%%writefile generate_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds


def _serialize_example(example, label):
  example_value = tf.io.serialize_tensor(example).numpy()
  label_value = tf.io.serialize_tensor(label).numpy()
  feature = {
      'examples':
          tf.train.Feature(
              bytes_list=tf.train.BytesList(value=[example_value])),
      'labels':
          tf.train.Feature(bytes_list=tf.train.BytesList(value=[label_value])),
  }
  return tf.train.Example(features=tf.train.Features(
      feature=feature)).SerializeToString()


def _tf_serialize_example(example, label):
  serialized_tensor = tf.py_function(_serialize_example, (example, label),
                                     tf.string)
  return tf.reshape(serialized_tensor, ())


def generate_examples(training_data_uri, test_data_uri, config_file_uri):
  (train_data, test_data), info = tfds.load(
      # Use the version pre-encoded with an ~8k vocabulary.
      'imdb_reviews/subwords8k',
      # Return the train/test datasets as a tuple.
      split=(tfds.Split.TRAIN, tfds.Split.TEST),
      # Return (example, label) pairs from the dataset (instead of a dictionary).
      as_supervised=True,
      with_info=True)

  serialized_train_examples = train_data.map(_tf_serialize_example)
  serialized_test_examples = test_data.map(_tf_serialize_example)

  filename = os.path.join(training_data_uri, "train.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_train_examples)

  filename = os.path.join(test_data_uri, "test.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_test_examples)

  encoder = info.features['text'].encoder
  config = {
      'vocab_size': encoder.vocab_size,
  }
  config_file = os.path.join(config_file_uri, "config")
  with tf.io.gfile.GFile(config_file, 'w') as f:
    f.write(json.dumps(config))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)

  args = parser.parse_args()
  generate_examples(args.training_data_uri, args.test_data_uri,
                    args.config_file_uri)


Next, we'll create a Dockerfile that builds a container to run `generate_examples.py`.  Note that we're also installing the `tensorflow_datasets` library.

In [None]:
%%writefile Dockerfile.generate_examples

FROM gcr.io/caip-pipelines-assets/tfx:latest
WORKDIR /pipeline
COPY generate_examples.py generate_examples.py
RUN pip install tensorflow_datasets
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

Then, we'll write out the Skaffold file and build the container image:

In [None]:
skaffold_template = f"""
apiVersion: skaffold/v2beta3
kind: Config
metadata:
  name: my-pipeline
build:
  artifacts:
  - image: 'gcr.io/{PROJECT_ID}/caip-tfx-custom-container-generate'
    context: .
    docker:
      dockerfile: Dockerfile.generate_examples
  tagPolicy:
    envTemplate:
      template: "{{SK_TEMPLATE}}"
"""
with open('skaffold_generate_examples.yaml', 'w') as f:
    f.write(skaffold_template.format(**globals()))

In [None]:
!skaffold build -f skaffold_generate_examples.yaml

### Container 2: Train Examples

Next, we'll do the same for the 'Train Examples' custom container.  We'll first write out a `train_examples.py` file, then build a container that runs it.

In [None]:
%%writefile train_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf


def _parse_example(record):
  f = {
      'examples': tf.io.FixedLenFeature((), tf.string, default_value=''),
      'labels': tf.io.FixedLenFeature((), tf.string, default_value='')
  }
  return tf.io.parse_single_example(record, f)


def _to_tensor(record):
  examples = tf.io.parse_tensor(record['examples'], tf.int64)
  labels = tf.io.parse_tensor(record['labels'], tf.int64)
  return (examples, labels)


def train_examples(training_data_uri, test_data_uri, config_file_uri,
                   output_model_uri, output_metrics_uri):
  train_examples = tf.data.TFRecordDataset(
      [os.path.join(training_data_uri, 'train.tfrecord')])
  test_examples = tf.data.TFRecordDataset(
      [os.path.join(test_data_uri, 'test.tfrecord')])

  train_batches = train_examples.map(_parse_example).map(_to_tensor)
  test_batches = test_examples.map(_parse_example).map(_to_tensor)

  with tf.io.gfile.GFile(os.path.join(config_file_uri, 'config')) as f:
    config = json.loads(f.read())

  model = tf.keras.Sequential([
      tf.keras.layers.Embedding(config['vocab_size'], 16),
      tf.keras.layers.GlobalAveragePooling1D(),
      tf.keras.layers.Dense(1, activation='sigmoid')
  ])

  model.summary()

  model.compile(
      optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

  train_batches = train_batches.shuffle(1000).padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  test_batches = test_batches.padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  history = model.fit(
      train_batches,
      epochs=10,
      validation_data=test_batches,
      validation_steps=30)

  loss, accuracy = model.evaluate(test_batches)

  metrics = {
      'loss': str(loss),
      'accuracy': str(accuracy),
  }

  model_json = model.to_json()
  with tf.io.gfile.GFile(os.path.join(output_model_uri, 'model.json'),
                         'w') as f:
    f.write(model_json)

  with tf.io.gfile.GFile(os.path.join(output_metrics_uri, 'metrics.json'),
                         'w') as f:
    f.write(json.dumps(metrics))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)
  parser.add_argument('--output_model_uri', type=str)
  parser.add_argument('--output_metrics_uri', type=str)

  args = parser.parse_args()

  train_examples(args.training_data_uri, args.test_data_uri,
                 args.config_file_uri, args.output_model_uri,
                 args.output_metrics_uri)


Next, we'll create a Dockerfile that builds a container to run `train_examples.py`:

In [None]:
%%writefile Dockerfile.train_examples

FROM gcr.io/caip-pipelines-assets/tfx:latest
WORKDIR /pipeline
COPY train_examples.py train_examples.py
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

Then, we'll write out the Skaffold file and build the container image:

In [None]:
skaffold_template = f"""
apiVersion: skaffold/v2beta3
kind: Config
metadata:
  name: my-pipeline
build:
  artifacts:
  - image: gcr.io/{PROJECT_ID}/caip-tfx-custom-container-train
    context: .
    docker:
      dockerfile: Dockerfile.train_examples
  tagPolicy:
    envTemplate:
      template: "{{SK_TEMPLATE}}"
"""
with open('skaffold_train_examples.yaml', 'w') as f:
    f.write(skaffold_template.format(**globals()))

In [None]:
!skaffold build -f skaffold_train_examples.yaml

### Define a container-based pipeline

Now we're ready to define a pipeline that uses these containers.

In [None]:
import os

from tfx.orchestration.pipeline import Pipeline
from tfx.types.standard_artifacts import Model
from tfx.orchestration.ai_platform_pipelines import ai_platform_pipelines_dag_runner
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.experimental.simple_artifacts import File
from tfx.types.experimental.simple_artifacts import Metrics
from tfx.dsl.component.experimental.container_component import create_container_component
from tfx.dsl.component.experimental.placeholders import InputUriPlaceholder
from tfx.dsl.component.experimental.placeholders import OutputUriPlaceholder


import absl
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner


def container_based_pipeline():
  generate = create_container_component(
      name='GenerateExamples',
      outputs={
          'training_data': Dataset,
          'test_data': Dataset,
          'config_file': File,
      },
      image = 'gcr.io/{}/caip-tfx-custom-container-generate:{}'.format(PROJECT_ID, USER),
      command=[
        'python', '/pipeline/generate_examples.py',
        '--training_data_uri', OutputUriPlaceholder('training_data'),
        '--test_data_uri', OutputUriPlaceholder('test_data'),
        '--config_file_uri', OutputUriPlaceholder('config_file'),
      ])

  train = create_container_component(
      name='Train',
      inputs={
          'training_data': Dataset,
          'test_data': Dataset,
          'config_file': File,
      },
      outputs={
          'model': Model,
          'metrics': Metrics,
      },
      image='gcr.io/{}/caip-tfx-custom-container-train:{}'.format(PROJECT_ID, USER),
      command=[
        'python', '/pipeline/train_examples.py',
        '--training_data_uri', InputUriPlaceholder('training_data'),
        '--test_data_uri', InputUriPlaceholder('test_data'),
        '--config_file_uri', InputUriPlaceholder('config_file'),
        '--output_model_uri', OutputUriPlaceholder('model'),
        '--output_metrics_uri', OutputUriPlaceholder('metrics'),
      ])

  generate_component = generate()
  train_component = train(
    training_data=generate_component.outputs['training_data'],
    test_data=generate_component.outputs['test_data'],
    config_file=generate_component.outputs['config_file'])

  pipeline_name = "container_based_pipeline_{}".format(USER)
  return Pipeline(
        pipeline_name=pipeline_name,
        enable_cache=True,
        pipeline_root=os.path.join(PIPELINE_ROOT, pipeline_name),
        components=[generate_component, train_component])

container_based_pipeline = container_based_pipeline()

Now let's run the pipeline!

In [None]:
config = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunnerConfig(
    project_id=PROJECT_ID,
    display_name='container-based-pipeline-{}'.format(USER))

runner = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunner(config=config)

# If you want to inspect the pipeline proto, run the following and look at the file contents.
# runner = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunner(config=config, output_filename='pipeline.json')
# runner.compile(container_based_pipeline())

runner.run(container_based_pipeline, api_key=API_KEY)

##  Step 5: Specifying Task-based dependencies

In this section, we will run two steps of a pipeline using task-based dependencies (rather than I/O dependencies) to schedule them. We'll build and use the same container for both steps.

In [None]:
%%writefile task_based_step.py

import os
import tensorflow as tf

from tfx.types.experimental.simple_artifacts import File
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import OutputArtifact, Parameter


@component
def MyTaskBasedStep(
    output_file: OutputArtifact[File], 
    step_number: Parameter[int] = 0, 
    contents: Parameter[str] = ''):
  # Write out whatever string was passed in to the file.
  
  with tf.io.gfile.GFile(os.path.join(output_file.uri, 'output.txt'), 'w') as f:
    f.write('Step {}: Contents: {}'.format(step_number, contents))
    

Write out the Dockerfile...

In [None]:
%%writefile Dockerfile.task_based

FROM gcr.io/caip-pipelines-assets/tfx:latest
WORKDIR /pipeline
COPY task_based_step.py task_based_step.py
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

...then write out the Skaffold file:

In [None]:
skaffold_template = f"""
apiVersion: skaffold/v2beta3
kind: Config
metadata:
  name: my-pipeline
build:
  artifacts:
  - image: gcr.io/{PROJECT_ID}/caip-tfx-custom-task-based
    context: .
    docker:
      dockerfile: Dockerfile.task_based
  tagPolicy:
    envTemplate:
      template: "{{SK_TEMPLATE}}"
"""
with open('skaffold_task_based.yaml', 'w') as f:
    f.write(skaffold_template.format(**globals()))

Build a container image for the new component.

In [None]:
!skaffold build -f skaffold_task_based.yaml

Let's create a pipeline with this simple component. Note the `add_upstream_node` call.

In [None]:
import os

from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.ai_platform_pipelines import ai_platform_pipelines_dag_runner

from task_based_step import MyTaskBasedStep

def task_based_pipeline():
    step_1 = MyTaskBasedStep(step_number=1, contents="This is step 1")
    step_2 = MyTaskBasedStep(
        step_number=2, 
        contents="This is step 2", 
        instance_name='MyTaskBasedStep2')
    step_2.add_upstream_node(step_1)
     
    pipeline_name = "task_dependency_based_pipeline_{}".format(USER)
    return Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=os.path.join(PIPELINE_ROOT, pipeline_name),
        components=[step_1, step_2])

task_based_pipeline = task_based_pipeline()

Now we'll run our pipeline.  You'll see that `step_2` waits until `step_1` has completed.

In [None]:
config = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunnerConfig(
    project_id=PROJECT_ID,
    display_name='task-based-pipeline-{}'.format(USER),
    default_image='gcr.io/{}/caip-tfx-custom-task-based:{}'.format(PROJECT_ID, USER))

runner = ai_platform_pipelines_dag_runner.AIPlatformPipelinesDagRunner(config=config)
runner.run(task_based_pipeline, api_key=API_KEY)

## Cleanup

If you like, you can do some cleanup to avoid storage costs.

To remove the files from your GCS bucket, run:

In [None]:
!gsutil rm 'gs://{BUCKET_NAME}/**'

You can remove your GCR container images by visiting the [Container Registry](https://console.cloud.google.com/gcr/) panel in the Cloud Console.  Click on an image name to list and remove any of its versions.

## Summary

This notebook showed examples of how to use custom functions and containers on AI Platform Managed Pipelines, and how to specify task-based dependencies between pipeline steps.

You can also explore notebooks that show how to specify TFX pipelines using prebuilt components; and how to run a TFX Templates pipeline on Managed Pipelines. See the EAP guide for the links.

