Skip to content

Commit

Permalink
Support Docker in the IR stack.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 343405676
  • Loading branch information
tfx-copybara committed Nov 20, 2020
1 parent 6838ee7 commit d6f5c52
Show file tree
Hide file tree
Showing 8 changed files with 587 additions and 2 deletions.
59 changes: 58 additions & 1 deletion tfx/dsl/component/experimental/executor_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
from __future__ import division
from __future__ import print_function

from typing import List, Text
import functools
import operator
from typing import List, Optional, Text, Union

from tfx import types
from tfx.dsl.component.experimental import placeholders
from tfx.dsl.components.base import executor_spec
from tfx.dsl.placeholder import placeholder
from tfx.proto.orchestration import executable_spec_pb2
from tfx.proto.orchestration import placeholder_pb2

from google.protobuf import message


class TemplatedExecutorContainerSpec(executor_spec.ExecutorSpec):
Expand Down Expand Up @@ -88,3 +96,52 @@ def __eq__(self, other) -> bool:

def __ne__(self, other) -> bool:
return not self.__eq__(other)

def _recursively_encode(
self, command: placeholders.CommandlineArgumentType
) -> Union[str, placeholder.Placeholder]:
if isinstance(command, str):
return command
elif isinstance(command, placeholders.InputValuePlaceholder):
return placeholder.input(command.input_name)[0]
elif isinstance(command, placeholders.InputUriPlaceholder):
return placeholder.input(command.input_name)[0].uri
elif isinstance(command, placeholders.OutputUriPlaceholder):
return placeholder.output(command.output_name)[0].uri
elif isinstance(command, placeholders.ConcatPlaceholder):
# operator.add wil use the overloaded __add__ operator for Placeholder
# instances.
return functools.reduce(
operator.add,
[self._recursively_encode(item) for item in command.items])
else:
raise TypeError(
('Unsupported type of command-line arguments: "{}".'
' Supported types are {}.')
.format(type(command), str(placeholders.CommandlineArgumentType)))

def encode(
self,
component_spec: Optional[types.ComponentSpec] = None) -> message.Message:
"""Encodes ExecutorSpec into an IR proto for compiling.
This method will be used by DSL compiler to generate the corresponding IR.
Args:
component_spec: Optional. The ComponentSpec to help with the encoding.
Returns:
An executor spec proto.
"""
result = executable_spec_pb2.ContainerExecutableSpec()
result.image = self.image
for command in self.command:
cmd = result.commands.add()
str_or_placeholder = self._recursively_encode(command)
if isinstance(str_or_placeholder, str):
expression = placeholder_pb2.PlaceholderExpression()
expression.value.string_value = str_or_placeholder
cmd.CopyFrom(expression)
else:
cmd.CopyFrom(self._recursively_encode(command).encode())
return result
181 changes: 181 additions & 0 deletions tfx/dsl/component/experimental/executor_specs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tfx.dsl.component.experimental.executor_specs."""

import tensorflow as tf

from tfx.dsl.component.experimental import executor_specs
from tfx.dsl.component.experimental import placeholders


class ExecutorSpecsTest(tf.test.TestCase):

def setUp(self):
super(ExecutorSpecsTest, self).setUp()
self._text = 'text'
self._input_value_placeholder = placeholders.InputValuePlaceholder(
'input_artifact')
self._input_uri_placeholder = placeholders.InputUriPlaceholder('input_uri')
self._output_uri_placeholder = placeholders.OutputUriPlaceholder(
'output_uri')
self._concat_placeholder = placeholders.ConcatPlaceholder([
self._text, self._input_value_placeholder, self._input_uri_placeholder,
self._output_uri_placeholder,
])
self._text_concat_placeholder = placeholders.ConcatPlaceholder(
[self._text, 'text1', placeholders.ConcatPlaceholder(['text2']),])

def testEncodeTemplatedExecutorContainerSpec(self):
specs = executor_specs.TemplatedExecutorContainerSpec(
image='image',
command=[
self._text, self._input_value_placeholder,
self._input_uri_placeholder, self._output_uri_placeholder,
self._concat_placeholder
])
encode_result = specs.encode()
self.assertProtoEquals("""
image: "image"
commands {
value {
string_value: "text"
}
}
commands {
operator {
index_op {
expression {
placeholder {
key: "input_artifact"
}
}
}
}
}
commands {
operator {
artifact_uri_op {
expression {
operator {
index_op {
expression {
placeholder {
key: "input_uri"
}
}
index: 0
}
}
}
}
}
}
commands {
operator {
artifact_uri_op {
expression {
operator {
index_op {
expression {
placeholder {
type: OUTPUT_ARTIFACT
key: "output_uri"
}
}
index: 0
}
}
}
}
}
}
commands {
operator {
concat_op {
expressions {
value {
string_value: "text"
}
}
expressions {
operator {
index_op {
expression {
placeholder {
key: "input_artifact"
}
}
index: 0
}
}
}
expressions {
operator {
artifact_uri_op {
expression {
operator {
index_op {
expression {
placeholder {
key: "input_uri"
}
}
index: 0
}
}
}
}
}
}
expressions {
operator {
artifact_uri_op {
expression {
operator {
index_op {
expression {
placeholder {
type: OUTPUT_ARTIFACT
key: "output_uri"
}
}
index: 0
}
}
}
}
}
}
}
}
}""", encode_result)

def testEncodeTemplatedExecutorContainerSpec_withConcatAllText(self):
specs = executor_specs.TemplatedExecutorContainerSpec(
image='image',
command=[
self._text_concat_placeholder
])
encode_result = specs.encode()
self.assertProtoEquals("""
image: "image"
commands {
value {
string_value: "texttext1text2"
}
}""", encode_result)


if __name__ == '__main__':
tf.test.main()
6 changes: 6 additions & 0 deletions tfx/orchestration/portable/beam_dag_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def _build_executable_spec(
result = local_deployment_config_pb2.ExecutableSpec()
if spec.Is(result.python_class_executable_spec.DESCRIPTOR):
spec.Unpack(result.python_class_executable_spec)
elif spec.Is(result.container_executable_spec.DESCRIPTOR):
spec.Unpack(result.container_executable_spec)
else:
raise ValueError(
'executor spec of {} is expected to be of one of the '
Expand Down Expand Up @@ -214,6 +216,10 @@ def run(self, pipeline: Union[pipeline_pb2.Pipeline,
connection_config = self._connection_config_from_deployment_config(
deployment_config)

logging.info('Running pipeline:\n %s', pipeline)
logging.info('Using deployment config:\n %s', deployment_config)
logging.info('Using connection config:\n %s', connection_config)

with telemetry_utils.scoped_labels(
{telemetry_utils.LABEL_TFX_RUNNER: 'beam'}):
with beam.Pipeline() as p:
Expand Down
98 changes: 98 additions & 0 deletions tfx/orchestration/portable/docker_executor_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Lint as: python2, python3
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Docker component launcher which launches a container in docker environment ."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import Optional, cast

from absl import logging
import docker
from tfx.dsl.compiler import placeholder_utils
from tfx.dsl.component.experimental import executor_specs
from tfx.orchestration.config import docker_component_config
from tfx.orchestration.portable import base_executor_operator
from tfx.orchestration.portable import data_types
from tfx.proto.orchestration import executable_spec_pb2
from tfx.proto.orchestration import execution_result_pb2

from google.protobuf import message


class DockerExecutorOperator(base_executor_operator.BaseExecutorOperator):
"""Responsible for launching a container executor."""
SUPPORTED_EXECUTOR_SPEC_TYPE = [executable_spec_pb2.ContainerExecutableSpec]
SUPPORTED_PLATFORM_CONFIG_TYPE = []

def __init__(self,
executor_spec: message.Message,
platform_config: Optional[message.Message] = None):
super().__init__(executor_spec, platform_config)
self._container_executor_spec = cast(
executable_spec_pb2.ContainerExecutableSpec, self._executor_spec)

def run_executor(
self, execution_info: data_types.ExecutionInfo
) -> execution_result_pb2.ExecutorOutput:
"""Execute underlying component implementation."""

context = placeholder_utils.ResolutionContext(
exec_info=execution_info,
executor_spec=self._executor_spec,
platform_config=self._platform_config)

component_executor_spec = (
executor_specs.TemplatedExecutorContainerSpec(
image=self._container_executor_spec.image,
command=[
placeholder_utils.resolve_placeholder_expression(cmd, context)
for cmd in self._container_executor_spec.commands
]))

docker_config = docker_component_config.DockerComponentConfig()

logging.info('Container spec: %s', vars(component_executor_spec))
logging.info('Docker config: %s', vars(docker_config))

# Call client.containers.run and wait for completion.
# ExecutorContainerSpec follows k8s container spec which has different
# names to Docker's container spec. It's intended to set command to docker's
# entrypoint and args to docker's command.
if docker_config.docker_server_url:
client = docker.DockerClient(base_url=docker_config.docker_server_url)
else:
client = docker.from_env()

run_args = docker_config.to_run_args()
container = client.containers.run(
image=component_executor_spec.image,
command=component_executor_spec.command,
detach=True,
**run_args)

# Streaming logs
for log in container.logs(stream=True):
logging.info('Docker: %s', log.decode('utf-8'))
exit_code = container.wait()['StatusCode']
if exit_code != 0:
raise RuntimeError(
'Container exited with error code "{}"'.format(exit_code))
# TODO(b/141192583): Report data to publisher
# - report container digest
# - report replaced command line entrypoints
# - report docker run args
return execution_result_pb2.ExecutorOutput()

0 comments on commit d6f5c52

Please sign in to comment.