Skip to content

Commit

Permalink
n/a
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 647013745
  • Loading branch information
tfx-copybara committed Jun 26, 2024
1 parent 3dfc4c1 commit 4e71a35
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions tfx/orchestration/portable/python_executor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import sys
from typing import Optional, cast

from tfx import types
from tfx.dsl.components.base import base_executor
from tfx.dsl.io import fileio
from tfx.orchestration.portable import base_executor_operator
Expand All @@ -31,6 +32,39 @@
_STATEFUL_WORKING_DIR = 'stateful_working_dir'


def hydrate_value_artifacts(input_artifacts: dict[str, list[types.Artifact]]):
"""Reads value of ValueArtifacts into memory."""
for _, artifact_list in input_artifacts.items():
for artifact in artifact_list:
if isinstance(artifact, ValueArtifact):
# Read ValueArtifact into memory.
artifact.read()


def construct_executor_output(
execution_info: data_types.ExecutionInfo,
output_dict: dict[str, list[types.Artifact]],
) -> execution_result_pb2.ExecutorOutput:
"""Constructs final executor output."""
# If result is not returned from the Do function, then try to
# read from the executor_output_uri.
if fileio.exists(execution_info.execution_output_uri):
return execution_result_pb2.ExecutorOutput.FromString(
fileio.open(execution_info.execution_output_uri, 'rb').read()
)
else:
# Old style TFX executor doesn't return executor_output, but modify
# output_dict and exec_properties in place. For backward compatibility,
# we use their executor_output and exec_properties to construct
# ExecutorOutput.
result = execution_result_pb2.ExecutorOutput()
outputs_utils.populate_output_artifact(result, output_dict)
outputs_utils.populate_exec_properties(
result, execution_info.exec_properties
)
return result


def run_with_executor(
execution_info: data_types.ExecutionInfo,
executor: base_executor.BaseExecutor
Expand All @@ -44,31 +78,15 @@ def run_with_executor(
Returns:
The output from executor.
"""
for _, artifact_list in execution_info.input_dict.items():
for artifact in artifact_list:
if isinstance(artifact, ValueArtifact):
# Read ValueArtifact into memory.
artifact.read()
hydrate_value_artifacts(execution_info.input_dict)

output_dict = copy.deepcopy(execution_info.output_dict)
result = executor.Do(execution_info.input_dict, output_dict,
execution_info.exec_properties)
if not result:
# If result is not returned from the Do function, then try to
# read from the executor_output_uri.
if fileio.exists(execution_info.execution_output_uri):
result = execution_result_pb2.ExecutorOutput.FromString(
fileio.open(execution_info.execution_output_uri, 'rb').read())
else:
# Old style TFX executor doesn't return executor_output, but modify
# output_dict and exec_properties in place. For backward compatibility,
# we use their executor_output and exec_properties to construct
# ExecutorOutput.
result = execution_result_pb2.ExecutorOutput()
outputs_utils.populate_output_artifact(result, output_dict)
outputs_utils.populate_exec_properties(result,
execution_info.exec_properties)
return result
result = executor.Do(
execution_info.input_dict, output_dict, execution_info.exec_properties
)
if result:
return result
return construct_executor_output(execution_info, output_dict)


class PythonExecutorOperator(base_executor_operator.BaseExecutorOperator):
Expand Down

0 comments on commit 4e71a35

Please sign in to comment.