Skip to content

Commit

Permalink
Enable users to specify arbitrary ContainerOp modifying functions in
Browse files Browse the repository at this point in the history
KubeflowConfigRunner. This enables users to perform operations suchs as
specifying custom k8s secrets or mounting volumes when running TFX pipelines
under Kubeflow.

Fixes #201 and also solves the issue in PR #202.

PiperOrigin-RevId: 251677229
  • Loading branch information
tfx-team committed Jun 5, 2019
1 parent 813d67b commit 224c470
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 24 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* Updated tfx docker image to use Python 3.
* Added example for how to define and add a custom component.
* Added Parquet executor for ExampleGen component.
* Enables Kubeflow Pipelines users to specify arbitrary operators that can be
applied to each pipeline step.

## Bug fixes and other changes
* Declared 'cmle_training_args' on trainer and 'cmle_serving_args' on
Expand All @@ -25,6 +27,7 @@
* TfxType has been renamed to TfxArtifact.
* Fixes issue #185 preventing the Airflow UI from visualizing the component's
subdag operators and logs.
* Fixes issue #201 so GCP credentials are optional.

## Breaking changes

Expand Down
3 changes: 1 addition & 2 deletions tfx/orchestration/kubeflow/base_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import collections
import json
from kfp import dsl
from kfp import gcp
from kubernetes import client as k8s_client
from typing import Any, Dict, List, Optional, Text

Expand Down Expand Up @@ -123,7 +122,7 @@ def __new__(
image=pipeline_properties.tfx_image,
arguments=arguments,
file_outputs=file_outputs,
).apply(gcp.use_gcp_secret('user-gcp-sa')) # Adds GCP authentication.
)

# Add the Argo workflow ID to the container's environment variable so it
# can be used to uniquely place pipeline outputs under the pipeline_root.
Expand Down
54 changes: 52 additions & 2 deletions tfx/orchestration/kubeflow/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,75 @@

from kfp import compiler
from kfp import dsl
from typing import Optional, Text
from kfp import gcp
from typing import Callable, List, Optional, Text

from tfx.components.base import base_component as tfx_base_component
from tfx.orchestration import pipeline as tfx_pipeline
from tfx.orchestration import tfx_runner
from tfx.orchestration.kubeflow import base_component

# OpFunc represents the type of a function that takes as input a
# dsl.ContainerOp and returns the same object. Common operations such as adding
# k8s secrets, mounting volumes, specifying the use of TPUs and so on can be
# specified as an OpFunc.
# See example usage here:
# https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/gcp.py
OpFunc = Callable[[dsl.ContainerOp], dsl.ContainerOp]


class KubeflowRunnerConfig(object):
"""Runtime configuration parameters specific to execution on Kubeflow."""

def __init__(self, pipeline_operator_funcs: Optional[List[OpFunc]] = None):
"""Creates a KubeflowRunnerConfig object.
The user can use pipeline_operator_funcs to apply modifications to
ContainerOps used in the pipeline. For example, to ensure the pipeline
steps mount a GCP secret, and a Persistent Volume, one can create config
object like so:
from kfp import gcp, onprem
mount_secret_op = gcp.use_secret('my-secret-name)
mount_volume_op = onprem.mount_pvc(
"my-persistent-volume-claim",
"my-volume-name",
"/mnt/volume-mount-path")
config = KubeflowRunnerConfig(
pipeline_operator_funcs=[mount_secret_op, mount_volume_op]
)
Args:
pipeline_operator_funcs: A list of ContainerOp modifying functions that
will be applied to every container step in the pipeline.
"""
self.pipeline_operator_funcs = pipeline_operator_funcs or [
# Enables authentication for GCP services in a typical GKE Kubeflow
# Pipelines installation.
gcp.use_gcp_secret('user-gcp-sa'),
]


class KubeflowRunner(tfx_runner.TfxRunner):
"""Kubeflow Pipelines runner.
Constructs a pipeline definition YAML file based on the TFX logical pipeline.
"""

def __init__(self, output_dir: Optional[Text] = None):
def __init__(self,
output_dir: Optional[Text] = None,
config: Optional[KubeflowRunnerConfig] = None):
"""Initializes KubeflowRunner for compiling a Kubeflow Pipeline.
Args:
output_dir: An optional output directory into which to output the pipeline
definition files. Defaults to the current working directory.
config: An optional KubeflowRunnerConfig object to specify runtime
configuration when running the pipeline under Kubeflow.
"""
self._output_dir = output_dir or os.getcwd()
self._config = config or KubeflowRunnerConfig()

def _prepare_output_dict(self, outputs: tfx_base_component.ComponentOutputs):
return dict((k, v.get()) for k, v in outputs.get_all().items())
Expand Down Expand Up @@ -104,6 +151,9 @@ def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline):
executor_class_path=executor_class_path,
pipeline_properties=pipeline_properties)

for operator in self._config.pipeline_operator_funcs:
kfp_component.container_op.apply(operator)

for channel_name, channel in component.outputs.get_all().items():
producers[channel] = {}
producers[channel]['component'] = kfp_component
Expand Down
110 changes: 90 additions & 20 deletions tfx/orchestration/kubeflow/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import shutil
import tarfile
import tempfile
from kfp import onprem
import tensorflow as tf
import yaml

Expand Down Expand Up @@ -57,8 +58,7 @@ def test_two_step_pipeline(self):
"""Sanity-checks the construction and dependencies for a 2-step pipeline.
"""
runner.KubeflowRunner().run(_two_step_pipeline())
file_path = os.path.join(self.test_dir,
'two_step_pipeline.tar.gz')
file_path = os.path.join(self.test_dir, 'two_step_pipeline.tar.gz')
self.assertTrue(tf.gfile.Exists(file_path))

with tarfile.TarFile.open(file_path).extractfile(
Expand All @@ -72,9 +72,7 @@ def test_two_step_pipeline(self):
self.assertEqual(2, len(containers))

big_query_container = [
c
for c in containers
if c['name'] == 'bigqueryexamplegen'
c for c in containers if c['name'] == 'bigqueryexamplegen'
]
self.assertEqual(1, len(big_query_container))
self.assertEqual([
Expand All @@ -83,9 +81,7 @@ def test_two_step_pipeline(self):
], big_query_container[0]['container']['command'])

statistics_gen_container = [
c
for c in containers
if c['name'] == 'statisticsgen'
c for c in containers if c['name'] == 'statisticsgen'
]
self.assertEqual(1, len(statistics_gen_container))

Expand All @@ -100,20 +96,94 @@ def test_two_step_pipeline(self):
'tasks': [{
'name': 'bigqueryexamplegen',
'template': 'bigqueryexamplegen'
},
{
'name': 'statisticsgen',
'template': 'statisticsgen',
'dependencies': ['bigqueryexamplegen'],
'arguments': {
'parameters': [{
'name': 'bigqueryexamplegen-examples',
'value': parameter_value
}],
},
}]
}, {
'name': 'statisticsgen',
'template': 'statisticsgen',
'dependencies': ['bigqueryexamplegen'],
'arguments': {
'parameters': [{
'name': 'bigqueryexamplegen-examples',
'value': parameter_value
}],
},
}]
}, dag[0]['dag'])

def test_default_pipeline_operator_funcs(self):
runner.KubeflowRunner().run(_two_step_pipeline())
file_path = os.path.join(self.test_dir, 'two_step_pipeline.tar.gz')
self.assertTrue(tf.gfile.Exists(file_path))

with tarfile.TarFile.open(file_path).extractfile(
'pipeline.yaml') as pipeline_file:
self.assertIsNotNone(pipeline_file)
pipeline = yaml.load(pipeline_file)

containers = [
c for c in pipeline['spec']['templates'] if 'container' in c
]
self.assertEqual(2, len(containers))

# Check that each container has default GCP credentials.

container_0 = containers[0]
env = [
env for env in container_0['container']['env']
if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS'
]
self.assertEqual(1, len(env))
self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json',
env[0]['value'])

container_1 = containers[0]
env = [
env for env in container_1['container']['env']
if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS'
]
self.assertEqual(1, len(env))
self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json',
env[0]['value'])

def test_volume_mounting_pipeline_operator_funcs(self):
mount_volume_op = onprem.mount_pvc('my-persistent-volume-claim',
'my-volume-name',
'/mnt/volume-mount-path')
config = runner.KubeflowRunnerConfig(
pipeline_operator_funcs=[mount_volume_op])

runner.KubeflowRunner(config=config).run(_two_step_pipeline())
file_path = os.path.join(self.test_dir, 'two_step_pipeline.tar.gz')
self.assertTrue(tf.gfile.Exists(file_path))

with tarfile.TarFile.open(file_path).extractfile(
'pipeline.yaml') as pipeline_file:
self.assertIsNotNone(pipeline_file)
pipeline = yaml.load(pipeline_file)

containers = [
c for c in pipeline['spec']['templates'] if 'container' in c
]
self.assertEqual(2, len(containers))

# Check that each container has the volume mounted.
self.assertEqual([{
'name': 'my-volume-name',
'mountPath': '/mnt/volume-mount-path'
}], containers[0]['container']['volumeMounts'])

self.assertEqual([{
'name': 'my-volume-name',
'mountPath': '/mnt/volume-mount-path'
}], containers[1]['container']['volumeMounts'])

# Check that the PVC is specified.
self.assertEqual([{
'name': 'my-volume-name',
'persistentVolumeClaim': {
'claimName': 'my-persistent-volume-claim'
}
}], pipeline['spec']['volumes'])


if __name__ == '__main__':
tf.test.main()

0 comments on commit 224c470

Please sign in to comment.