-
Notifications
You must be signed in to change notification settings - Fork 701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable beam args to be populated from environment variables #4147
Enable beam args to be populated from environment variables #4147
Conversation
@kennethyang404 @zhitaoli Perhaps we could start a discussion on implementation here? |
@jiyongjung0 @kennethyang404 @zhitaoli Instead, the new idea is that I'm extending the BeamExecutableSpec in executable_spec.proto with an An upside of this is that it is safer, we don't touch the WDYT? |
@jiyongjung0 @kennethyang404 @zhitaoli Basically what I've done is:
Protolint fails on pre-existing issues. Let me know if you think this solution works or if you have any objections. Quick feedback would be greatly appreciated :) |
@jiyongjung0 There is no placeholder stuff in this implementation, maybe you could review? |
@ConverJens - This is totally off-topic, but I'd like to reach out to you directly. Could you email me? robertcrowe@google.com |
@rcrowe-google Will do! |
@jiyongjung0 Removed WIP. Only review comments (if there are any?) remains. |
Ping @jiyongjung0? |
Ping @kennethyang404 ? Review please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I'm a little bit hesitated seeing we are adding a new arg to the Pipeline constructor. (It's easier to add than removing from a public interface). We probably need to refactor the beam_pipeline_args some time in the future (Maybe by moving it into the platform_config). We can move beam_pipeline_args_from_env as well when the time comes.
@kennethyang404 The build was canceled during the install python deps step. Should it be restarted or given more time somehow? Is there anything else needed from my side? |
Ping @kennethyang404 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR and sorry for the late review. I left some comments. This PR touches a few TFX internal modules and we might need some more approvals from related folks.
for beam_pipeline_arg_from_env, env_var in beam_pipeline_args_from_env.items(): | ||
# If an arg is already present in beam_pipeline_args, it should take precedence | ||
# over env vars. | ||
if any(beam_pipeline_arg_from_env in beam_pipeline_arg for beam_pipeline_arg in beam_pipeline_args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check seems a bit loose and might result in false positive. For example, if we want to set 'runner' with env_variable, it might collide with other flags like "experiments=use_runner_v2".
One suggestion would be make prefix string with '--{}=' first, and testing with beam_pipeline_arg.startswith(beam_pipeline_arg_prefix).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Implemented your suggestion.
@@ -193,6 +221,14 @@ def __init__( | |||
|
|||
self._executor_operator = None | |||
if executor_spec: | |||
# Resolve beam_pipeline_args_from_env and consolidate with beam_pipeline_args | |||
if isinstance(executor_spec, executable_spec_pb2.BeamExecutableSpec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about handling this in BeamExecutorOperator? For example, [1].
It seems a bit unnatural to process beam-dependent logic here. And there is no guarantee that launcher and the executor run in the same process.
[1]
self.beam_pipeline_args.extend(beam_executor_spec.beam_pipeline_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the code and test to BeamExecutorOperator.
tfx/orchestration/pipeline.py
Outdated
if self.beam_pipeline_args_from_env: | ||
for component in components: | ||
add_beam_pipeline_args_from_env_to_component(component, beam_pipeline_args_from_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some test cases for changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test added.
def _set_required_env_vars(self, env_vars): | ||
for k, v in env_vars.items(): | ||
os.environ[k] = v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider using env var overriding utilities not to pollute environment variables.
tfx/tfx/utils/test_case_utils.py
Line 30 in 807e9ce
def override_env_var(name: str, value: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to using this method.
// Args for Apache Beam pipeline to be populated by | ||
// environment variables at runtime. | ||
// Map consists of: | ||
// {beam_arg: environment_variable} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding some comments about the outcome after env variable replacements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elaborated this with examples and merging logic.
def with_beam_pipeline_args_from_env( | ||
self, beam_pipeline_args_from_env: Dict[str, str]) -> 'BaseBeamComponent': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This forking/duplication (across several parts of code and proto) seems odd. Could we have only one?
Also what is the merging behavior of these (especially when they disagree) which now becomes a public contract which we have to honour indefinitely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure where this code is duplicated, can you specify where?
The merging behaviour is the same as for ordinary beam args, i.e. component specific args override pipeline level args.
And in addition, any arg specified as an ordinary beam arg will take precedence over any kind of beam arg from env.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the merging behavior seems clear now thanks to the explanation in the proto. Thanks!
But I am a little bit worried about the duplication of "beam_pipeline_args" and "beam_pipeline_args_from_env" in general. Because these two things appears together almost always. From the first comment by @zhitaoli (#3455 (comment) ) we might need to explore ideas using some kind of placeholders. If the existing placeholder in TFX doesn't fit well here, we might be able to invent a new simple placeholder mechanism like shell env var expansion.
We might be able to proceed with the current implementation, but I think that we might need to get more inputs on this from other folks. (CC @kennethyang404 )
@jiyongjung0 Thanks for your answer and your comments, I'll start working on them in the coming days. |
@jiyongjung0 Update? |
@jiyongjung0 @kennethyang404 |
@jiyongjung0 Why hast thou forsaken me? But seriously, what is the progress on the review? |
I'm so sorry for the long delay. The PR is still stuck in the internal review process. Let me contact relevant folks. (CC @kennethyang404 @ruoyu90 ) |
Thanks for the response! |
@jiyongjung0 Ping! Update? |
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days |
@jiyongjung0 @kennethyang404 @ruoyu90 Internal review time is over. You've had more than four months of "internal review". We need to get this merged or you need to state that we aren't doing this because of reasons X , Y and Z. Beam requires one to pass credentials as args for some connections. TFX needs them at pipeline compile time. Ergo, passwords end up in clear text in the pipeline definition. This PR handles that. Why not just merge it? This functionality is safe. It doesn't interfere with anything else. If it's the "new public API" thing then maybe it can be decorated or placed in an "experimental" package and then it can be kept until you go in another direction. Why keep TFX open source if the community can't fix the issue with it? |
Hi @ConverJens, Sorry for the delay. We have discussed this issue today but still felt the implementation in current PR is not ideal for TFX, because it changes a major public interface, Pipeline, that is already messy enough. We even have a plan to refactor beam_pipeline_args out of the Pipeline constructor. It is always easy to increase the API surface than maintaining it. We would be happy to review this PR if it is implemented using the previously mentioned Placeholder approach. However, because there is no need for this feature for internal users, it is not a high priority task for us and unfortunately we don't have enough resource at this moment to implement it from our side. Let me know if you need some pointers to help with the implementation. My apologies again for not communicating the internal discussions earlier. |
@kennethyang404 cc: @jiyongjung0 Well, obviously not the answer I had hoped for but I figured it was leaning that way. Can we have a short design discussion about this? Do you still propose:
or is there any less involved approach? To me, it seems that 1. is by far the most difficult part. One issue I had when investigating this initially was that I only found usage do placeholders in the container components. I will start investigation for a new PR regarding this. |
Thanks @ConverJens for taking the time to push this forward. May I suggest that you heavily bump the priority of this? The only way to use TFX with s3 object storage (such as minio bundled in by Kubeflow) is to hardcode credentials in Git/CIs and proceed to leak them all over the place. That's kind of insane to me in 2022, especially given the consequences of a somebody stealing/deleting/pushing to production as a result of this. Last I checked the moto of TFX was "ML Pipelines for production", where hard-coding secrets like this is unacceptable. Im not sure how the internal users use TFX, but if you are to believe the Kubeflow + TFX marketing it should just work. |
@vaskozl Thank you very much for providing your opinion which I completely agree with! I think an issue is that while the image projected is TFX + KubeFlow but in reality it seems to be TFX + KFP which is a significant difference. Traces of this can be seen with the kubeflow_runner_v2 which can't be used with kubeflow since the kubeflow metadata config is no longer available. In short, TFX is unfortunately geared towards orchestration using KFP, not to be run on KubeFlow. When using GS as storage on GCP, authentication is handled by service accounts. While this is really the best practice solution, it is simply not available for the general S3 setup. Anyways, I will be having a design meeting with some members of the TFX team and try to find a viable solution forward. |
@kennethyang404 @jiyongjung0 |
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days |
Currently, beam args are compiled into the pipeline spec which leaves any connection information in clear text. This is a problem for users that want to use S3 as artifact dir for beam.
This PR aims to solve this problem by:
Together this allows a user to pass credentials to beam using standard k8s secrets that can be mounted using pipeline_operator_funcs.