Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable beam args to be populated from environment variables #4147

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e1cd066
Added env var type and method to placeholder
ConverJens Aug 16, 2021
708e24d
Initial draft of resolving beam args from env
ConverJens Aug 19, 2021
e70d854
Beam args can now be resolved from env when executen on KFP.
ConverJens Aug 20, 2021
ef40c1c
Moved logic to TFX Launcher to not be dependent on specific runners.
ConverJens Aug 20, 2021
3d3b5c9
Reverted container_entrypoint.py and container_entrypoint_test.py.
ConverJens Aug 20, 2021
eb5d789
Fixed failing unit tests.
ConverJens Aug 20, 2021
a927160
Fixed failing proto unit tests.
ConverJens Aug 20, 2021
5803f3d
Removed debug logging. Tested and works well in KFP.
ConverJens Aug 23, 2021
ccff7d0
Merge branch 'master' into enable_beam_to_fetch_args_from_secrets
kennethyang404 Sep 10, 2021
1f5fd95
Update from PR comments: Move logic from launcher to beam_executor_op…
ConverJens Nov 17, 2021
3e727ce
Merge branch 'enable_beam_to_fetch_args_from_secrets' of github.com:C…
ConverJens Nov 17, 2021
2707b8f
Merge branch 'master' into enable_beam_to_fetch_args_from_secrets
ConverJens Nov 17, 2021
05420e3
Update from PR comments: Fix string handling and how to treat env var…
ConverJens Nov 18, 2021
097a676
Fixed typo
ConverJens Nov 19, 2021
86ad813
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Nov 23, 2021
aae2b55
Fixed linting.
ConverJens Nov 26, 2021
f9057ab
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Nov 26, 2021
4e702c4
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Dec 20, 2021
e2dd8a7
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Jan 17, 2022
f593fb4
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Feb 25, 2022
5f8ebd6
Merge branch 'master' into enable_beam_to_fetch_args_from_secrets
ConverJens Mar 8, 2022
f100a4b
Merge branch 'master' into enable_beam_to_fetch_args_from_secrets
ConverJens Mar 9, 2022
2418f38
Merge branch 'tensorflow:master' into enable_beam_to_fetch_args_from_…
ConverJens Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion tfx/dsl/components/base/base_beam_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""Base class for TFX Beam components."""

from typing import Iterable, cast
from typing import Iterable, cast, Dict

from tfx.dsl.components.base import base_component
from tfx.dsl.components.base import executor_spec
Expand Down Expand Up @@ -46,6 +46,22 @@ def with_beam_pipeline_args(
self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
return self

def with_beam_pipeline_args_from_env(
self, beam_pipeline_args_from_env: Dict[str, str]) -> 'BaseBeamComponent':
Comment on lines +49 to +50

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forking/duplication (across several parts of code and proto) seems odd. Could we have only one?

Also what is the merging behavior of these (especially when they disagree) which now becomes a public contract which we have to honour indefinitely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where this code is duplicated, can you specify where?

The merging behaviour is the same as for ordinary beam args, i.e. component specific args override pipeline level args.

And in addition, any arg specified as an ordinary beam arg will take precedence over any kind of beam arg from env.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the merging behavior seems clear now thanks to the explanation in the proto. Thanks!

But I am a little bit worried about the duplication of "beam_pipeline_args" and "beam_pipeline_args_from_env" in general. Because these two things appears together almost always. From the first comment by @zhitaoli (#3455 (comment) ) we might need to explore ideas using some kind of placeholders. If the existing placeholder in TFX doesn't fit well here, we might be able to invent a new simple placeholder mechanism like shell env var expansion.

We might be able to proceed with the current implementation, but I think that we might need to get more inputs on this from other folks. (CC @kennethyang404 )

"""Add per component Beam pipeline args to be populated from environment
variables at runtime.

Args:
beam_pipeline_args_from_env: Dict of Beam pipeline args and environment
variable names to be added to the Beam executor spec.

Returns:
the same component itself.
"""
cast(executor_spec.BeamExecutorSpec, self.executor_spec)\
.add_beam_pipeline_args_from_env(beam_pipeline_args_from_env)
return self

@classmethod
def _validate_component_class(cls):
"""Validate that the SPEC_CLASSES property of this class is set properly."""
Expand Down
14 changes: 14 additions & 0 deletions tfx/dsl/components/base/base_beam_component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from tfx.dsl.components.base import executor_spec

_TestBeamPipelineArgs = ["--my_testing_beam_pipeline_args=foo"]
_TestBeamPipelineArgsFromEnv = {"--my_testing_beam_pipeline_args_from_env":
"FOO"}


class _EmptyComponentSpec(types.ComponentSpec):
Expand All @@ -43,6 +45,18 @@ class BeamComponent(base_beam_component.BaseBeamComponent):
self.assertEqual(beam_component.executor_spec.beam_pipeline_args,
_TestBeamPipelineArgs)

def testWithBeamPipelineArgsFromEnv(self):

class BeamComponent(base_beam_component.BaseBeamComponent):
EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(
base_beam_executor.BaseBeamExecutor)
SPEC_CLASS = _EmptyComponentSpec

beam_component = BeamComponent(spec=_EmptyComponentSpec(
)).with_beam_pipeline_args_from_env(_TestBeamPipelineArgsFromEnv)
self.assertEqual(beam_component.executor_spec.beam_pipeline_args_from_env,
_TestBeamPipelineArgsFromEnv)

def testComponentExecutorClass(self):

class InvalidExecutorComponent(base_beam_component.BaseBeamComponent):
Expand Down
9 changes: 8 additions & 1 deletion tfx/dsl/components/base/executor_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""Executor specifications for defining what to to execute."""

import copy
from typing import cast, Iterable, List, Optional, Type
from typing import cast, Iterable, List, Optional, Type, Dict

from tfx import types
from tfx.dsl.components.base import base_executor
Expand Down Expand Up @@ -148,6 +148,7 @@ class BeamExecutorSpec(ExecutorClassSpec):
def __init__(self, executor_class: Type[base_executor.BaseExecutor]):
super().__init__(executor_class=executor_class)
self.beam_pipeline_args = []
self.beam_pipeline_args_from_env = {}

def encode(
self,
Expand All @@ -156,11 +157,17 @@ def encode(
result.python_executor_spec.CopyFrom(
super().encode(component_spec=component_spec))
result.beam_pipeline_args.extend(self.beam_pipeline_args)
for beam_pipeline_arg_from_env, env_var in self.beam_pipeline_args_from_env.items():
result.beam_pipeline_args_from_env[beam_pipeline_arg_from_env] = env_var
return result

def add_beam_pipeline_args(self, beam_pipeline_args: Iterable[str]) -> None:
self.beam_pipeline_args.extend(beam_pipeline_args)

def add_beam_pipeline_args_from_env(self, beam_pipeline_args_from_env: Dict[str, str]) -> None:
for beam_pipeline_arg_from_env, env_var in beam_pipeline_args_from_env.items():
self.beam_pipeline_args_from_env[beam_pipeline_arg_from_env] = env_var

def copy(self) -> 'BeamExecutorSpec':
return cast(self.__class__, super().copy())

Expand Down
5 changes: 5 additions & 0 deletions tfx/dsl/components/base/executor_spec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def testBeamExecutorSpecCopy(self):
spec = executor_spec.BeamExecutorSpec(_DummyExecutor)
spec.add_extra_flags('a')
spec.add_beam_pipeline_args('b')
spec.add_beam_pipeline_args_from_env({'c': 'd'})
spec_copy = spec.copy()
del spec
self.assertProtoEquals(
Expand All @@ -62,6 +63,10 @@ def testBeamExecutorSpecCopy(self):
extra_flags: "a"
}
beam_pipeline_args: "b"
beam_pipeline_args_from_env: {
key: "c"
value: "d"
}
""", spec_copy.encode())

def testExecutorContainerSpecCopy(self):
Expand Down
35 changes: 34 additions & 1 deletion tfx/orchestration/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import copy
import enum
from typing import Collection, List, Optional, Union, cast
from typing import Collection, List, Optional, Union, Dict, cast
import warnings

from tfx.dsl.compiler import constants
Expand Down Expand Up @@ -74,6 +74,22 @@ def add_beam_pipeline_args_to_component(component, beam_pipeline_args):
component.executor_spec).beam_pipeline_args


def add_beam_pipeline_args_from_env_to_component(component,
beam_pipeline_args_from_env:
Dict[str, str]):
if isinstance(component.executor_spec, executor_spec.BeamExecutorSpec):
# Update pipeline-level beam_pipeline_args_from_env with component specific
# ones to make component-level override pipeline-level args.
beam_pipeline_args_from_env_copy = beam_pipeline_args_from_env.copy()
beam_pipeline_args_from_env_copy.update(cast(
executor_spec.BeamExecutorSpec,
component.executor_spec).beam_pipeline_args_from_env)
cast(
executor_spec.BeamExecutorSpec,
component.executor_spec).beam_pipeline_args_from_env \
= beam_pipeline_args_from_env_copy


class RunOptions:
r"""Run-time options for running a pipeline (such as partial run).

Expand Down Expand Up @@ -220,6 +236,7 @@ def __init__(self,
components: Optional[List[base_node.BaseNode]] = None,
enable_cache: Optional[bool] = False,
beam_pipeline_args: Optional[List[str]] = None,
beam_pipeline_args_from_env: Optional[Dict[str, str]] = None,
platform_config: Optional[message.Message] = None,
execution_mode: Optional[ExecutionMode] = ExecutionMode.SYNC,
**kwargs):
Expand All @@ -235,6 +252,8 @@ def __init__(self,
components: Optional list of components to construct the pipeline.
enable_cache: Whether or not cache is enabled for this run.
beam_pipeline_args: Pipeline arguments for Beam powered Components.
beam_pipeline_args_from_env: Pipeline arguments to be replace with
environment variables for Beam powered Components.
platform_config: Pipeline level platform config, in proto form.
execution_mode: The execution mode of the pipeline, can be SYNC or ASYNC.
**kwargs: Additional kwargs forwarded as pipeline args.
Expand All @@ -257,6 +276,7 @@ def __init__(self,
self.execution_mode = execution_mode

self._beam_pipeline_args = beam_pipeline_args or []
self._beam_pipeline_args_from_env = beam_pipeline_args_from_env or {}

self.platform_config = platform_config

Expand Down Expand Up @@ -287,6 +307,10 @@ def beam_pipeline_args(self):
return self._beam_pipeline_args

@property
def beam_pipeline_args_from_env(self):
"""Beam pipeline args from env used for all components in the pipeline."""
return self._beam_pipeline_args_from_env

@doc_controls.do_not_generate_docs
def dsl_context_registry(self) -> dsl_context_registry.DslContextRegistry: # pylint: disable=g-missing-from-attributes
if self._dsl_context_registry is None:
Expand Down Expand Up @@ -358,6 +382,14 @@ def _set_components(self, components: List[base_node.BaseNode]) -> None:

if self.beam_pipeline_args:
for component in self._components:
add_beam_pipeline_args_to_component(
component, self.beam_pipeline_args)

if self.beam_pipeline_args_from_env:
for component in components:
add_beam_pipeline_args_from_env_to_component(
component, self.beam_pipeline_args_from_env)

add_beam_pipeline_args_to_component(component, self.beam_pipeline_args)

@doc_controls.do_not_generate_docs
Expand All @@ -379,3 +411,4 @@ def _persist_dsl_context_registry(self):
'or interleaved pipeline definitions. Make sure each component '
'belong to exactly one pipeline, and pipeline definitions are '
'separated.')

21 changes: 21 additions & 0 deletions tfx/orchestration/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,27 @@ def testComponentsSetAfterCreationWithBeamPipelineArgs(self):
self.assertEqual(expected_args,
p.components[0].executor_spec.beam_pipeline_args)

def testPipelineWithBeamPipelineArgsFromEnv(self):
first_arg = {'my_first_beam_pipeline_args': 'foo'}
second_arg = {'my_second_beam_pipeline_args': 'bar'}
second_arg_override = {'my_second_beam_pipeline_args': 'baz'}

expected_args = {**first_arg, **second_arg_override}

p = pipeline.Pipeline(
pipeline_name='a',
pipeline_root='b',
log_root='c',
components=[
_make_fake_component_instance(
'component_a', _OutputTypeA, {}, {},
with_beam=True).with_beam_pipeline_args_from_env({**first_arg, **second_arg_override})
],
beam_pipeline_args_from_env=second_arg,
metadata_connection_config=self._metadata_connection_config)
self.assertEqual(expected_args,
p.components[0].executor_spec.beam_pipeline_args_from_env)


if __name__ == '__main__':
tf.test.main()
37 changes: 37 additions & 0 deletions tfx/orchestration/portable/beam_executor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"""Base class to define how to operator a Beam based executor."""

from typing import Any, Callable, Optional, cast
from absl import logging
import os

from tfx.dsl.components.base import base_beam_executor
from tfx.orchestration.portable import base_executor_operator
Expand All @@ -31,6 +33,34 @@
_BeamPipeline = Any


def resolve_beam_args_from_env(beam_pipeline_args,
beam_pipeline_args_from_env) -> list:
resolved_beam_pipeline_args_from_env = []

for beam_pipeline_arg_from_env, env_var \
in beam_pipeline_args_from_env.items():
# If an arg is already present in beam_pipeline_args, it should take precedence
# over env vars.
if any(beam_pipeline_arg.startswith(f"--{beam_pipeline_arg_from_env}=")
for beam_pipeline_arg in beam_pipeline_args):
logging.info('Arg %s already present in '
'beam_pipeline_args and will not be fetched from env.',
beam_pipeline_arg_from_env)
continue

env_var_value = os.getenv(env_var)
if env_var_value is None:
raise ValueError(f"Env var {env_var} not present")
else:
if beam_pipeline_arg_from_env.startswith('--'):
resolved_beam_pipeline_args_from_env.append(
f'{beam_pipeline_arg_from_env}={env_var_value}')
else:
resolved_beam_pipeline_args_from_env\
.append(f'--{beam_pipeline_arg_from_env}={env_var_value}')
return resolved_beam_pipeline_args_from_env


class BeamExecutorOperator(base_executor_operator.BaseExecutorOperator):
"""BeamExecutorOperator handles Beam based executor's init and execution.

Expand Down Expand Up @@ -69,6 +99,13 @@ def __init__(self,
self.beam_pipeline_args = []
self.beam_pipeline_args.extend(beam_executor_spec.beam_pipeline_args)

# Resolve beam_pipeline_args_from_env and consolidate with beam_pipeline_args
resolved_beam_pipeline_args_from_env = resolve_beam_args_from_env(
beam_executor_spec.beam_pipeline_args,
beam_executor_spec.beam_pipeline_args_from_env)

self.beam_pipeline_args.extend(resolved_beam_pipeline_args_from_env)

def run_executor(
self,
execution_info: data_types.ExecutionInfo,
Expand Down
61 changes: 61 additions & 0 deletions tfx/orchestration/portable/beam_executor_operator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,66 @@ def testRunExecutorWithBeamPipelineArgs(self):
}""", executor_output)


class BeamArgsTest(test_case_utils.TfxTest):

def testResolveBeamArgsFromEnv(self):
self.enter_context(test_case_utils.override_env_var('BAR', 'baz'))
self.enter_context(test_case_utils.override_env_var('S3_SECRET_ACCESS_KEY',
'minio123'))
self.enter_context(test_case_utils.override_env_var('S3_VERIFY', '1'))

beam_pipeline_args = ['--s3_endpoint_url=s3_endpoint_url',
'--s3_access_key_id=minio',
'--s3_verify=0'
]
beam_pipeline_args_from_env = {
'--foo': 'BAR',
's3_secret_access_key': 'S3_SECRET_ACCESS_KEY',
's3_verify': 'S3_VERIFY'
}

resolved_beam_pipeline_args_from_env = beam_executor_operator\
.resolve_beam_args_from_env(
beam_pipeline_args=beam_pipeline_args,
beam_pipeline_args_from_env=beam_pipeline_args_from_env)
self.assertEqual(
set(beam_pipeline_args + resolved_beam_pipeline_args_from_env),
{'--s3_endpoint_url=s3_endpoint_url',
'--s3_access_key_id=minio',
'--s3_secret_access_key=minio123',
'--s3_verify=0',
'--foo=baz'
})

def testResolveBeamArgsFromEnvWithMissingEnvVar(self):
beam_pipeline_args = ['--foo=bar']
beam_pipeline_args_from_env = {
'--bar': 'BAR',
}

with self.assertRaises(ValueError):
beam_executor_operator.resolve_beam_args_from_env(
beam_pipeline_args=beam_pipeline_args,
beam_pipeline_args_from_env=beam_pipeline_args_from_env)

def testResolveBeamArgsFromEnvWithEmptyEnvVar(self):
self.enter_context(test_case_utils.override_env_var('BAR', ''))

beam_pipeline_args = ['--foo=bar']
beam_pipeline_args_from_env = {
'--bar': 'BAR',
}

resolved_beam_pipeline_args_from_env = beam_executor_operator\
.resolve_beam_args_from_env(
beam_pipeline_args=beam_pipeline_args,
beam_pipeline_args_from_env=beam_pipeline_args_from_env)

self.assertEqual(
set(beam_pipeline_args + resolved_beam_pipeline_args_from_env),
{'--foo=bar',
'--bar='})


if __name__ == '__main__':
tf.test.main()
26 changes: 26 additions & 0 deletions tfx/proto/orchestration/executable_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ message BeamExecutableSpec {
PythonClassExecutableSpec python_executor_spec = 1;
// Args for Apache Beam pipeline.
repeated string beam_pipeline_args = 2;
// Args for Apache Beam pipeline to be populated by
// environment variables at runtime. The result will be merged with
// beam_pipeline_args.
// Map consists of:
// {beam_arg: environment_variable}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding some comments about the outcome after env variable replacements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elaborated this with examples and merging logic.

//
// Example:
// beam_pipeline_args = [--runner=DirectRunner]
// os.environ[BAR] = "bar"
// beam_pipeline_args_from_env = {foo: BAR}
// beam_pipeline_args at runtime will be:
// beam_pipeline_args = [--runner=DirectRunner,
// --foo=bar]
//
// Args specified in beam_pipeline_args will take precedence
// over args specified in beam_pipeline_args_from_env in case of
// a clash.
// Example:
// beam_pipeline_args = [--runner=DirectRunner,
// --foo=baz]
// os.environ[BAR] = "bar"
// beam_pipeline_args_from_env = {foo: BAR}
// beam_pipeline_args at runtime will be:
// beam_pipeline_args = [--runner=DirectRunner,
// --foo=baz]
map<string, string> beam_pipeline_args_from_env = 3;
}

// Specification for Container based executables.
Expand Down