diff --git a/RELEASE.md b/RELEASE.md index 78edc22030d..1e9ab157225 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -21,6 +21,7 @@ ## Major Features and Improvements * Dropped python 3.8 support. +* Dropped experimental TFX Centralized Kubernetes Orchestrator * Extend GetPipelineRunExecutions, GetPipelineRunArtifacts APIs to support filtering by execution create_time, type. * ExampleValidator and DistributionValidator now support anomalies alert diff --git a/build/BUILD b/build/BUILD index 0d92eb4f8d2..3921a1e9e69 100644 --- a/build/BUILD +++ b/build/BUILD @@ -23,8 +23,6 @@ sh_binary( "//tfx/examples/custom_components/presto_example_gen/proto:presto_config_pb2.py", "//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py", "//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py", - "//tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto:service_pb2.py", - "//tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto:service_pb2_grpc.py", "//tfx/orchestration/experimental/core:component_generated_alert_pb2.py", "//tfx/orchestration/kubeflow/proto:kubeflow_pb2.py", "//tfx/proto:bulk_inferrer_pb2.py", diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/README.md b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/README.md deleted file mode 100644 index 12e042cc24b..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/README.md +++ /dev/null @@ -1,84 +0,0 @@ -# TFX Centralized Kubernetes Orchestrator - -Disclaimer: This orchestrator is experimental and we don't have any plans to -support this officially in production, as of July 2022. - -![image](https://user-images.githubusercontent.com/57027695/184351225-3e9c916b-ebaa-4d85-93a5-a9e7e924d747.png) - -This package aims to provide a centralized orchestrator on kubernetes, without -relying on external orchestration tools such as -[KubeFlow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/). -To try it out, please follow the steps below. - -# Setup - -Follow these step if you are running the orchestrator for the first time. - -## Step 1: Set up a Kubernetes cluster - -Refer to -[this link](https://github.com/tensorflow/tfx/tree/master/tfx/orchestration/experimental/kubernetes#step-1-set-up-a-kubernetes-cluster) -for set up. - -## Step 2: Build a new docker image - -Current official tfx image doesn't support this orchestrator, as `entrypoint.py` -is not included in the image. Thus, you need to build a new image before trying -out examples below. - -To fully utilize the features in the orchestrator, you should build your own -image which includes your code on the components you would like to run. - -Under the root directory of github checkout, run `export -DOCKER_IMAGE_REPO=gcr.io/{your_GKE_project_name}/{image_name} -TFX_DEPENDENCY_SELECTOR=NIGHTLY ./tfx/tools/docker/build_docker_image.sh docker -push ${DOCKER_IMAGE_REPO}` to build and push a docker image to your container. - -Then, change the `tfx_image` parameter of -`kubernetes_job_runner.KubernetesJobRunner` (line 90 of -kubernetes_task_scheduler.py) to the name of your image. - -TODO(b/240237394): Read the image information from the platform config. - -## Step 3: Set up MySQL MLMD - -After checking that you are inside the base TFX directory, use the following -command to deploy the MySQL resources: `kubectl apply -f -tfx/orchestration/experimental/kubernetes/yaml/mysql-pv.yaml kubectl apply -f -tfx/orchestration/experimental/kubernetes/yaml/mysql.yaml` - -## Step 4: Create MySQL Database - -Next, you need to create a database you would use for MLMD. Creating a database -locally using port-fowarding is recommended. - -Run `kubectl port-forward {mysql_pod_name} {your_port}:3306` and in a separate -terminal, run `mysql -h localhost -P {your_port} -u root` to make MySQL -connection. - -Create database by `CREATE DATABASE {database_name};` - -# How to Use - -## Running a sample pipeline. - -1) Run main.py with necessary flags, which serves as the orchestration loop. - -Orchestrator loop runs outside the kubernetes cluster for the current -implementation. Thus, while port-forwarding with above command, run `main.py` -with necessary flags as shown below. - -``` -python tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py ---mysql_port={your_port} --mysql_host={your_host} --mysql_username={your_username} --mysql_database={your_database_name} -``` - -If you are running using localhost, specify mysql_host as 127.0.0.1, not -localhost. - -2) In a separate terminal, execute `run_sample_pipeline.py` with necessary -flags, as shown below. - -Sample command: `python -tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_pipeline.py ---bucket={your_gcs_bucket_name}` diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/__init__.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/__init__.py deleted file mode 100644 index 86883734410..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/data/schema.pbtxt b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/data/schema.pbtxt deleted file mode 100644 index 1cabf7f60b9..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/data/schema.pbtxt +++ /dev/null @@ -1,65 +0,0 @@ -feature { - name: "body_mass_g" - type: FLOAT - presence { - min_fraction: 1.0 - min_count: 1 - } - shape { - dim { - size: 1 - } - } -} -feature { - name: "culmen_depth_mm" - type: FLOAT - presence { - min_fraction: 1.0 - min_count: 1 - } - shape { - dim { - size: 1 - } - } -} -feature { - name: "culmen_length_mm" - type: FLOAT - presence { - min_fraction: 1.0 - min_count: 1 - } - shape { - dim { - size: 1 - } - } -} -feature { - name: "flipper_length_mm" - type: FLOAT - presence { - min_fraction: 1.0 - min_count: 1 - } - shape { - dim { - size: 1 - } - } -} -feature { - name: "species" - type: INT - presence { - min_fraction: 1.0 - min_count: 1 - } - shape { - dim { - size: 1 - } - } -} \ No newline at end of file diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/entrypoint.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/entrypoint.py deleted file mode 100644 index bbb48cd13fa..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/entrypoint.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Entrypoint for the Kubernetes Job Runner. - -Users can use this entrypoint to run pipeline with the centralized kubernetes -orchestrator. -""" - -from absl import app -from tfx.orchestration.python_execution_binary import entrypoint - - -def main(argv): - entrypoint.main(argv) - - -if __name__ == '__main__': - app.run(main) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/__init__.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/__init__.py deleted file mode 100644 index 86883734410..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/client.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/client.py deleted file mode 100644 index 51806e54220..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/client.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Client for orchestrator. - -A simple client to communicate with the orchestrator server. -""" - -from absl import app -from absl import flags -import grpc -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2 -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2_grpc - -# Flags to use in the command line to specifiy the port and the msg. -# Commands can be changed later. -FLAGS = flags.FLAGS -flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') -flags.DEFINE_string('msg', 'Hello World', 'default message') - - -def _echo_message(stub, request): - """Echoes user's message.""" - try: - response = stub.Echo(request) - print(response) - return 0 - except grpc.RpcError as rpc_error: - print(rpc_error) - return -1 - - -def main(unused_argv): - channel_creds = grpc.local_channel_credentials() - with grpc.secure_channel(FLAGS.server, channel_creds) as channel: - grpc.channel_ready_future(channel).result() - stub = service_pb2_grpc.KubernetesOrchestratorStub(channel) - request = service_pb2.EchoRequest(msg=FLAGS.msg) - return _echo_message(stub, request) - - -if __name__ == '__main__': - app.run(main) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_component.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_component.py deleted file mode 100644 index 4610f5dc319..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_component.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Run sample component (ImportSchemaGen) in Kubernetes, useful for debugging. - -Sample command: -``` -python tfx/orchestration/experimental/centralized_kubernetes_orchestrator/ -examples/run_sample_component.py docker_image={your_docker_image} -job_prefix={your_job_name} container_name={your_container_name} -storage_bucket={your_gcs_bucket_name} -``` -""" -from absl import app -from absl import flags -from absl import logging - -from tfx import v1 as tfx -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator import kubernetes_job_runner -from tfx.orchestration.portable import data_types -from tfx.proto.orchestration import pipeline_pb2 - -from google.protobuf import text_format - -FLAGS = flags.FLAGS -flags.DEFINE_string('docker_image', '', 'docker image') -flags.DEFINE_string('job_prefix', 'sample-job', 'job prefix') -flags.DEFINE_string('container_name', 'centralized-orchestrator', - 'container name') -flags.DEFINE_string('storage_bucket', '', 'storage bucket') - - -def _prepare_sample_execution_info(bucket, artifact_path, output_path, - data_path): - """Prepare sample ImportSchemaGen execution info.""" - pipeline_root = f'gs://{bucket}' - sample_artifact = tfx.types.standard_artifacts.Schema() - sample_artifact.uri = pipeline_root + artifact_path - - execution_output_uri = pipeline_root + output_path - stateful_working_dir = pipeline_root + '/workding/dir' - exec_properties = { - 'schema_file': pipeline_root + data_path, - } - pipeline_info = pipeline_pb2.PipelineInfo(id='my_pipeline') - pipeline_node = text_format.Parse( - """ - node_info { - id: 'my_node' - } - """, pipeline_pb2.PipelineNode()) - - original = data_types.ExecutionInfo( - input_dict={}, - output_dict={'schema': [sample_artifact]}, - exec_properties=exec_properties, - execution_output_uri=execution_output_uri, - stateful_working_dir=stateful_working_dir, - pipeline_info=pipeline_info, - pipeline_node=pipeline_node) - - return original - - -def _prepare_sample_executable_spec(): - """Prepare sample ImportSchemaGen executable spec.""" - component = tfx.components.ImportSchemaGen.EXECUTOR_SPEC.encode() - return component - - -def main(unused_argv): - logging.set_verbosity(logging.INFO) - execution_info = _prepare_sample_execution_info(FLAGS.storage_bucket, - '/artifact-output', - '/test-output', - '/data/schema.pbtxt') - executable_spec = _prepare_sample_executable_spec() - - runner = kubernetes_job_runner.KubernetesJobRunner( - tfx_image=FLAGS.docker_image, - job_prefix=FLAGS.job_prefix, - container_name=FLAGS.container_name) - _ = runner.run(execution_info=execution_info, executable_spec=executable_spec) - - -if __name__ == '__main__': - app.run(main) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_pipeline.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_pipeline.py deleted file mode 100644 index 4e9152a2e3e..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/examples/run_sample_pipeline.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Client for orchestrator. - -A simple client to communicate with the orchestrator server. -""" - -import datetime - -from absl import app -from absl import flags -import grpc -from tfx import v1 as tfx -from tfx.dsl.compiler import compiler -from tfx.dsl.compiler import constants -from tfx.orchestration import pipeline -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2 -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2_grpc -from tfx.orchestration.portable import runtime_parameter_utils - -# Flags to use in the command line to specifiy the port and the msg. -# Commands can be changed later. -FLAGS = flags.FLAGS -_SERVER_ADDRESS = flags.DEFINE_string('server', 'dns:///[::1]:10000', - 'server address') -_PIPELINE_NAME = flags.DEFINE_string('name', 'test-ImportSchemaGen2', - 'pipeline name') -_STORAGE_BUCKET = flags.DEFINE_string('bucket', '', 'storage bucket') - - -def main(unused_argv): - prefix = f'gs://{_STORAGE_BUCKET.value}' - sample_pipeline = pipeline.Pipeline( - pipeline_name=_PIPELINE_NAME.value, - pipeline_root=prefix + '/tfx/pipelines', - components=[ - tfx.components.ImportSchemaGen(prefix + '/data/schema.pbtxt') - ], - enable_cache=False) - pipeline_ir = compiler.Compiler().compile(sample_pipeline) - runtime_parameter_utils.substitute_runtime_parameter( - pipeline_ir, { - constants.PIPELINE_RUN_ID_PARAMETER_NAME: - datetime.datetime.now().isoformat(), - }) - - channel_creds = grpc.local_channel_credentials() - with grpc.secure_channel(_SERVER_ADDRESS.value, channel_creds) as channel: - grpc.channel_ready_future(channel).result() - stub = service_pb2_grpc.KubernetesOrchestratorStub(channel) - request = service_pb2.StartPipelineRequest(pipeline=pipeline_ir) - stub.StartPipeline(request) - - -if __name__ == '__main__': - app.run(main) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_job_runner.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_job_runner.py deleted file mode 100644 index eaac36ac8f2..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_job_runner.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubernetes job runner for orchestrator. - -Runner which executes given pipeline components as a Kubernetes job. -""" -import abc -import datetime -import random -import string -import time - -from absl import logging -from kubernetes import client as k8s_client -from tfx.orchestration.experimental.core import task_scheduler -from tfx.orchestration.python_execution_binary import python_execution_binary_utils -from tfx.utils import kube_utils -from tfx.utils import status as status_lib - -_COMMAND = [ - 'python', - '-m', - 'tfx.orchestration.experimental.centralized_kubernetes_orchestrator.entrypoint', -] - -_DEFAULT_POLLING_INTERVAL_SEC = 2 -_JOB_CREATION_TIMEOUT = 300 - - -def _generate_component_name_suffix() -> str: - letters = string.ascii_lowercase - return '-' + ''.join(random.choice(letters) for i in range(10)) - - -class JobExceptionError(Exception): - """Exception error class to handle exceptions while running Kubernetes job.""" - - def __init__(self, message: str): - super().__init__(message) - self.msg = message - - -class KubernetesJobRunner(abc.ABC): - """A Kubernetes job runner that launches and executes pipeline components in kubernetes cluster.""" - - def __init__(self, - tfx_image, - job_prefix, - container_name, - name_space='default', - stream_logs=False): - """Create a kubernetes model server runner. - - Args: - tfx_image: container image for tfx. - job_prefix: prefix for the job. Unique hash will follow as suffix. - container_name: name of the container. - name_space: namespace of the run. - stream_logs: whether to stream logs from the pod. - """ - self._image = tfx_image - self._k8s_core_api = kube_utils.make_core_v1_api() - self._namespace = name_space - self._container_name = container_name - self._job_name = kube_utils.sanitize_pod_name( - job_prefix + _generate_component_name_suffix()) - # Time to delete the job after completion. - self.ttl_seconds = 5 - # Pod name would be populated once creation request sent. - self._pod_name = None - self._stream_pod_logs = stream_logs - - def run(self, execution_info, - executable_spec) -> task_scheduler.TaskSchedulerResult: - """Execute component in the pod.""" - - try: - self._create_job(execution_info, executable_spec) - self._wait_until_pod_is_runnable() - if self._stream_pod_logs: - self._stream_logs() - self._wait_until_completion() - return task_scheduler.TaskSchedulerResult( - status=status_lib.Status(code=status_lib.Code.OK), - output=task_scheduler.ExecutorNodeOutput()) - except k8s_client.rest.ApiException as e: - # TODO(b/240237394): Error type specification. - msg = 'Unable to run job. \nReason: %s\nBody: %s' % ( - e.reason if not None else '', e.body if not None else '') - logging.info(msg) - return task_scheduler.TaskSchedulerResult( - status=status_lib.Status(code=status_lib.Code.CANCELLED, message=msg)) - except JobExceptionError as e: - logging.info(e.msg) - return task_scheduler.TaskSchedulerResult( - status=status_lib.Status( - code=status_lib.Code.CANCELLED, message=e.msg)) - - def _create_job(self, execution_info, executable_spec) -> None: - """Create a job and wait for the pod to be runnable.""" - - assert not self._pod_name, ('You cannot start a job multiple times.') - serialized_execution_info = python_execution_binary_utils.serialize_execution_info( - execution_info) - serialized_executable_spec = python_execution_binary_utils.serialize_executable_spec( - executable_spec) - - run_arguments = [ - '--tfx_execution_info_b64', - serialized_execution_info, - '--tfx_python_class_executable_spec_b64', - serialized_executable_spec, - ] - orchestrator_commands = _COMMAND + run_arguments - - batch_api = kube_utils.make_batch_v1_api() - job = kube_utils.make_job_object( - name=self._job_name, - container_image=self._image, - command=orchestrator_commands, - container_name=self._container_name, - pod_labels={ - 'job-name': self._job_name, - }, - ttl_seconds_after_finished=self.ttl_seconds, - ) - batch_api.create_namespaced_job(self._namespace, job, pretty=True) - logging.info('Job %s created!', self._job_name) - - def _wait_until_pod_is_runnable(self) -> None: - """Wait for the pod to be created and runnable.""" - - assert self._job_name, ('You should first create a job to run.') - orchestrator_pods = [] - start_time = datetime.datetime.utcnow() - # Wait for the kubernetes job to launch a pod. - while (datetime.datetime.utcnow() - - start_time).seconds < _JOB_CREATION_TIMEOUT: - orchestrator_pods = self._k8s_core_api.list_namespaced_pod( - namespace='default', - label_selector='job-name={}'.format(self._job_name)).items - try: - orchestrator_pods = self._k8s_core_api.list_namespaced_pod( - namespace='default', - label_selector='job-name={}'.format(self._job_name)).items - except k8s_client.rest.ApiException as e: - if e.status != 404: - raise e - time.sleep(_DEFAULT_POLLING_INTERVAL_SEC) - if len(orchestrator_pods) != 1: - continue - pod = orchestrator_pods.pop() - pod_phase = kube_utils.PodPhase(pod.status.phase) - if pod_phase == kube_utils.PodPhase.RUNNING and pod.status.pod_ip: - self._pod_name = pod.metadata.name - logging.info('Pod created with name %s', self._pod_name) - return - if pod_phase.is_done: - raise JobExceptionError( - message='Job has been aborted. Please restart for execution.') - time.sleep(_DEFAULT_POLLING_INTERVAL_SEC) - raise JobExceptionError( - message='Deadline exceeded while waiting for pod to be running.') - - def _stream_logs(self) -> None: - """Stream logs from orchestrator pod.""" - logging.info('Start log streaming for pod %s:%s.', self._namespace, - self._pod_name) - logs = self._k8s_core_api.read_namespaced_pod_log( - name=self._pod_name, - namespace='default', - container=self._container_name, - follow=True, - _preload_content=False).stream() - for log in logs: - logging.info(log.decode().rstrip('\n')) - - def _wait_until_completion(self) -> None: - """Wait until the processs is completed.""" - pod = kube_utils.wait_pod( - self._k8s_core_api, - self._pod_name, - self._namespace, - exit_condition_lambda=kube_utils.pod_is_done, - condition_description='done state', - exponential_backoff=True) - pod_phase = kube_utils.PodPhase(pod.status.phase) - if pod_phase == kube_utils.PodPhase.FAILED: - raise JobExceptionError(message='Pod "%s" failed with status "%s".' % - (self._pod_name, pod.status)) - if pod_phase.is_done: - logging.info('Job completed! Ending log streaming for pod %s:%s.', - self._namespace, self._pod_name) - - if self.ttl_seconds: - logging.info('Job %s will be deleted after %d seconds.', self._job_name, - self.ttl_seconds) - else: - logging.info( - 'To delete the job, please run the following command:\n\n' - 'kubectl delete jobs/%s', self._job_name) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_task_scheduler.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_task_scheduler.py deleted file mode 100644 index e67f6f4a2ad..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/kubernetes_task_scheduler.py +++ /dev/null @@ -1,131 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubernetes Task Scheduler. - -First, unpack the deployment config in the given pipeline to obtain an Any type -of executor spec. Since it is an optional value, first check if it’s -None, and proceed to check its type. If it’s either of PythonClassExecutableSpec -or BeamExecutableSpec, obtain executable spec by unpacking executable Any type. - -Then, obtain execution invocation given the pipeline, task, and the node. -Convert execution invocation to execution info, by using from_proto -method in ExecutionInfo class. Finally, return the result of run method in the -Kubernetes runner class, passing the obtained execution info and executable -spec. -""" -import threading - -from tfx.orchestration import data_types_utils -from tfx.orchestration import metadata -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator import kubernetes_job_runner -from tfx.orchestration.experimental.core import task as task_lib -from tfx.orchestration.experimental.core import task_scheduler -from tfx.orchestration.portable import data_types -from tfx.proto.orchestration import executable_spec_pb2 -from tfx.proto.orchestration import execution_invocation_pb2 -from tfx.proto.orchestration import pipeline_pb2 -from tfx.utils import status as status_lib - - -def _create_execution_invocation_proto( - pipeline: pipeline_pb2.Pipeline, task: task_lib.ExecNodeTask, - node: pipeline_pb2.PipelineNode -) -> execution_invocation_pb2.ExecutionInvocation: - """Creates an ExecutionInvocation proto with some initial info.""" - - return execution_invocation_pb2.ExecutionInvocation( - execution_properties=(data_types_utils.build_metadata_value_dict( - task.exec_properties)), - execution_properties_with_schema=( - data_types_utils.build_pipeline_value_dict(task.exec_properties)), - output_metadata_uri=task.executor_output_uri, - input_dict=data_types_utils.build_artifact_struct_dict( - task.input_artifacts), - output_dict=data_types_utils.build_artifact_struct_dict( - task.output_artifacts), - stateful_working_dir=task.stateful_working_dir, - tmp_dir=task.tmp_dir, - pipeline_info=pipeline.pipeline_info, - pipeline_node=node, - execution_id=task.execution_id, - pipeline_run_id=pipeline.runtime_spec.pipeline_run_id.field_value - .string_value) - - -def _get_pipeline_node(pipeline: pipeline_pb2.Pipeline, - node_id: str) -> pipeline_pb2.PipelineNode: - """Gets corresponding pipeline node from IR given the node_id.""" - for node in pipeline.nodes: - if node.pipeline_node and (node.pipeline_node.node_info.id == node_id): - return node.pipeline_node - raise status_lib.StatusNotOkError( - code=status_lib.Code.INVALID_ARGUMENT, - message=f'Failed to find corresponding node in the IR, node id: {node_id}' - ) - - -class KubernetesTaskScheduler( - task_scheduler.TaskScheduler[task_lib.ExecNodeTask]): - """Implementation of Kubernetes Task Scheduler.""" - - def __init__(self, mlmd_handle: metadata.Metadata, - pipeline: pipeline_pb2.Pipeline, task: task_lib.ExecNodeTask): - super().__init__(mlmd_handle, pipeline, task) - self._cancel = threading.Event() - if task.cancel_type: - self._cancel.set() - # TODO(b/240237394): pass tfx_image, job_prefix, container_name as - # arguments. - self._runner = kubernetes_job_runner.KubernetesJobRunner( - tfx_image='', # You need to set tfx_image with your image. - job_prefix='sample-job', - container_name='centralized-orchestrator') - - def schedule(self) -> task_scheduler.TaskSchedulerResult: - """Retreive Executable Spec and Execution Info for run.""" - depl_config = pipeline_pb2.IntermediateDeploymentConfig() - self.pipeline.deployment_config.Unpack(depl_config) - executor_spec_any = depl_config.executor_specs.get( - self.task.node_uid.node_id) - - if not executor_spec_any: - return task_scheduler.TaskSchedulerResult( - status=status_lib.Status( - code=status_lib.Code.INVALID_ARGUMENT, - message='Unknown executable spec type.')) - - if executor_spec_any.Is( - executable_spec_pb2.PythonClassExecutableSpec.DESCRIPTOR): - executable_spec = executable_spec_pb2.PythonClassExecutableSpec() - executor_spec_any.Unpack(executable_spec) - elif executor_spec_any.Is( - executable_spec_pb2.BeamExecutableSpec.DESCRIPTOR): - executable_spec = executable_spec_pb2.BeamExecutableSpec() - executor_spec_any.Unpack(executable_spec) - else: - return task_scheduler.TaskSchedulerResult( - status=status_lib.Status( - code=status_lib.Code.INVALID_ARGUMENT, - message='Unknown executable spec type.')) - - node = _get_pipeline_node(self.pipeline, self.task.node_uid.node_id) - execution_invocation = _create_execution_invocation_proto( - self.pipeline, self.task, node) - execution_info = data_types.ExecutionInfo.from_proto(execution_invocation) - - return self._runner.run(execution_info, executable_spec) - - def cancel(self, cancel_task: task_lib.CancelTask) -> None: - # TODO(b/240237394): implement method. - pass diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py deleted file mode 100644 index d7f3add3071..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Centralized Kubernetes Orchestrator `main`.""" - -from concurrent import futures -import contextlib -import time - -from absl import app -from absl import flags -from absl import logging -import grpc -from tfx.orchestration import metadata -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator import kubernetes_task_scheduler -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service import kubernetes_orchestrator_service -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2_grpc -from tfx.orchestration.experimental.core import event_observer -from tfx.orchestration.experimental.core import pipeline_ops -from tfx.orchestration.experimental.core import pipeline_state -from tfx.orchestration.experimental.core import service_jobs -from tfx.orchestration.experimental.core import task_manager as tm -from tfx.orchestration.experimental.core import task_queue as tq -from tfx.orchestration.experimental.core import task_scheduler as ts - -FLAGS = flags.FLAGS -_MAX_ACTIVE_TASK_SCHEDULERS_FLAG = flags.DEFINE_integer( - 'tflex_max_active_task_schedulers', 100, - 'Maximum number of active task schedulers.') -_INACTIVITY_TTL_SECS_FLAG = flags.DEFINE_float( - 'tflex_inactivity_ttl_secs', 30, 'Orchestrator inactivity TTL. If set, ' - 'orchestrator will exit after ttl seconds of no orchestration activity.') -_DEFAULT_POLLING_INTERVAL_SECS_FLAG = flags.DEFINE_float( - 'tflex_default_polling_interval_secs', 10.0, - 'Default orchestration polling interval.') -_MYSQL_HOST_FLAG = flags.DEFINE_string( - 'mysql_host', '127.0.0.1', - 'The name or network address of the instance of MySQL to connect to.') -_MYSQL_PORT_FLAG = flags.DEFINE_integer( - 'mysql_port', 8888, 'The port MySQL is using to listen for connections.') -_SERVER_PORT_FLAG = flags.DEFINE_integer( - 'server_port', 10000, - 'The port rpc server is using to listen for connections.') -_MYSQL_DATABASE_FLAG = flags.DEFINE_string( - 'mysql_database', '', 'The name of the MySQL database to use.') -_MYSQL_USERNAME_FLAG = flags.DEFINE_string( - 'mysql_username', 'root', 'The MySQL login account being used.') -_MYSQL_PASSWORD_FLAG = flags.DEFINE_string( - 'mysql_password', '', 'The password for the MySQL account being used.') - -_TICK_DURATION_SECS = 1.0 -_MONITORING_INTERVAL_SECS = 30 - - -def _start_grpc_server( - servicer: kubernetes_orchestrator_service.KubernetesOrchestratorServicer -) -> grpc.Server: - """Starts GRPC server.""" - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - service_pb2_grpc.add_KubernetesOrchestratorServicer_to_server( - servicer, server) - server_creds = grpc.local_server_credentials() - server.add_secure_port(f'[::]:{_SERVER_PORT_FLAG.value}', server_creds) - server.start() - return server - - -def _create_mlmd_connection(): - """Creates connection for MLMD.""" - connection_config = metadata.mysql_metadata_connection_config( - host=_MYSQL_HOST_FLAG.value, - port=_MYSQL_PORT_FLAG.value, - username=_MYSQL_USERNAME_FLAG.value, - database=_MYSQL_DATABASE_FLAG.value, - password=_MYSQL_PASSWORD_FLAG.value) - return metadata.Metadata(connection_config=connection_config) - - -def _run() -> None: - """Runs the main orchestration loop.""" - with contextlib.ExitStack() as stack: - stack.enter_context(event_observer.init()) - - mlmd_handle = stack.enter_context(_create_mlmd_connection()) - orchestrator_servicer = kubernetes_orchestrator_service.KubernetesOrchestratorServicer( - mlmd_handle) - - server = _start_grpc_server(orchestrator_servicer) - stack.callback(server.stop, grace=None) - - task_queue = tq.TaskQueue() - - service_job_manager = service_jobs.DummyServiceJobManager() - task_manager = stack.enter_context( - tm.TaskManager( - mlmd_handle, - task_queue, - max_active_task_schedulers=_MAX_ACTIVE_TASK_SCHEDULERS_FLAG.value)) - last_active = time.time() - - iteration = 0 - while not _INACTIVITY_TTL_SECS_FLAG.value or time.time( - ) - last_active <= _INACTIVITY_TTL_SECS_FLAG.value: - try: - iteration += 1 - logging.info('Orchestration loop: iteration #%d (since process start).', - iteration) - event_observer.check_active() - - # Last pipeline state change time is useful to decide if wait period - # between iterations can be short-circuited. - last_state_change_time_secs = ( - pipeline_state.last_state_change_time_secs()) - - if pipeline_ops.orchestrate(mlmd_handle, task_queue, - service_job_manager): - last_active = time.time() - - time_budget = _DEFAULT_POLLING_INTERVAL_SECS_FLAG.value - logging.info( - 'Orchestration loop: waiting %s seconds before next iteration.', - time_budget) - while time_budget > 0.0: - # Task manager should never be "done" unless there was an error. - if task_manager.done(): - if task_manager.exception(): - raise task_manager.exception() - else: - raise RuntimeError( - 'Task manager unexpectedly stalled due to an internal error.') - - # Short-circuit if state change is detected. - if (pipeline_state.last_state_change_time_secs() > - last_state_change_time_secs): - last_state_change_time_secs = ( - pipeline_state.last_state_change_time_secs()) - logging.info( - 'Orchestration loop: detected state change, exiting wait period ' - 'early (with %s of %s seconds remaining).', time_budget, - _DEFAULT_POLLING_INTERVAL_SECS_FLAG.value) - break - - time_budget = _sleep_tick_duration_secs(time_budget) - except Exception: # pylint: disable=broad-except - logging.exception('Exception in main orchestration loop!') - raise - - logging.info('Exiting due to no pipeline run in %s seconds', - _INACTIVITY_TTL_SECS_FLAG.value) - - -def _sleep_tick_duration_secs(time_budget: float) -> float: - """Sleeps and returns new time budget; standalone fn to mock in tests.""" - time.sleep(_TICK_DURATION_SECS) - return time_budget - _TICK_DURATION_SECS - - -def _register_task_schedulers() -> None: - """Registers task schedulers.""" - ts.TaskSchedulerRegistry.register( - 'type.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec', - kubernetes_task_scheduler.KubernetesTaskScheduler) - ts.TaskSchedulerRegistry.register( - 'type.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec', - kubernetes_task_scheduler.KubernetesTaskScheduler) - - -def main(unused_arg): - logging.set_verbosity(logging.INFO) - _register_task_schedulers() - _run() - - -if __name__ == '__main__': - app.run(main) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/__init__.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/__init__.py deleted file mode 100644 index 86883734410..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service.py deleted file mode 100644 index 27265764b0f..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Centralized Kubernetes Orchestrator Service. - -Implementation of a servicer that will be used for Centralized Kubernetes -Orchestrator. -""" - -from typing import Dict - -import grpc -from tfx.orchestration import metadata -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2 -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2_grpc -from tfx.orchestration.experimental.core import pipeline_ops -from tfx.utils import status as status_lib - -_CANONICAL_TO_GRPC_CODES: Dict[int, grpc.StatusCode] = { - status_lib.Code.OK: grpc.StatusCode.OK, - status_lib.Code.CANCELLED: grpc.StatusCode.CANCELLED, - status_lib.Code.UNKNOWN: grpc.StatusCode.UNKNOWN, - status_lib.Code.INVALID_ARGUMENT: grpc.StatusCode.INVALID_ARGUMENT, - status_lib.Code.DEADLINE_EXCEEDED: grpc.StatusCode.DEADLINE_EXCEEDED, - status_lib.Code.NOT_FOUND: grpc.StatusCode.NOT_FOUND, - status_lib.Code.ALREADY_EXISTS: grpc.StatusCode.ALREADY_EXISTS, - status_lib.Code.PERMISSION_DENIED: grpc.StatusCode.PERMISSION_DENIED, - status_lib.Code.RESOURCE_EXHAUSTED: grpc.StatusCode.RESOURCE_EXHAUSTED, - status_lib.Code.FAILED_PRECONDITION: grpc.StatusCode.FAILED_PRECONDITION, - status_lib.Code.ABORTED: grpc.StatusCode.ABORTED, - status_lib.Code.OUT_OF_RANGE: grpc.StatusCode.OUT_OF_RANGE, - status_lib.Code.UNIMPLEMENTED: grpc.StatusCode.UNIMPLEMENTED, - status_lib.Code.INTERNAL: grpc.StatusCode.INTERNAL, - status_lib.Code.UNAVAILABLE: grpc.StatusCode.UNAVAILABLE, - status_lib.Code.DATA_LOSS: grpc.StatusCode.DATA_LOSS, - status_lib.Code.UNAUTHENTICATED: grpc.StatusCode.UNAUTHENTICATED, -} - - -class KubernetesOrchestratorServicer( - service_pb2_grpc.KubernetesOrchestratorServicer): - """A service interface for pipeline orchestration.""" - - def __init__(self, mlmd_handle: metadata.Metadata): - self._mlmd_handle = mlmd_handle - - def Echo(self, request: service_pb2.EchoRequest, - servicer_context: grpc.ServicerContext): - """Echoes the input user message to test the server. - - Args: - request: A service_pb2.Echo object containing the message user wants to - echo. - servicer_context: A grpc.ServicerContext for use during service of the - RPC. - - Returns: - A service_pb2.Echo object containing the message to echo. - """ - return service_pb2.EchoResponse(msg=request.msg) - - def StartPipeline( - self, request: service_pb2.StartPipelineRequest, - context: grpc.ServicerContext) -> service_pb2.StartPipelineResponse: - try: - pipeline_ops.initiate_pipeline_start(self._mlmd_handle, request.pipeline) - except status_lib.StatusNotOkError as e: - context.set_code(_CANONICAL_TO_GRPC_CODES[e.code]) - context.set_details(e.message) - return service_pb2.StartPipelineResponse() diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service_test.py b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service_test.py deleted file mode 100644 index 70a43d296f0..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/kubernetes_orchestrator_service_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright 2022 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.kubernetes_orchestrator_service.""" - -from unittest import mock -import grpc -from grpc.framework.foundation import logging_pool -import portpicker -import tensorflow as tf -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service import kubernetes_orchestrator_service -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2 -from tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service.proto import service_pb2_grpc -from tfx.orchestration.experimental.core import pipeline_ops -from tfx.orchestration.experimental.core import task as task_lib -from tfx.proto.orchestration import pipeline_pb2 -from tfx.utils import status as status_lib - - -class KubernetesOrchestratorServiceTest(tf.test.TestCase): - - @classmethod - def setUpClass(cls): - super().setUpClass() - port = portpicker.pick_unused_port() - - server_pool = logging_pool.pool(max_workers=25) - cls._server = grpc.server(server_pool) - cls._server.add_secure_port(f'[::]:{port}'.format(port), - grpc.local_server_credentials()) - servicer = kubernetes_orchestrator_service.KubernetesOrchestratorServicer( - mock.Mock()) - service_pb2_grpc.add_KubernetesOrchestratorServicer_to_server( - servicer, cls._server) - cls._server.start() - cls._channel = grpc.secure_channel(f'localhost:{port}', - grpc.local_channel_credentials()) - cls._stub = service_pb2_grpc.KubernetesOrchestratorStub(cls._channel) - - @classmethod - def tearDownClass(cls): - cls._channel.close() - cls._server.stop(None) - super().tearDownClass() - - def test_echo(self): - msg = 'This is a test message.' - request = service_pb2.EchoRequest(msg=msg) - response = self._stub.Echo(request) - - self.assertEqual(response.msg, msg) - - def test_start_pipeline_success(self): - pipeline_uid = task_lib.PipelineUid(pipeline_id='foo') - with mock.patch.object(pipeline_ops, - 'initiate_pipeline_start') as mock_start: - mock_start.return_value.pipeline_uid = pipeline_uid - pipeline = pipeline_pb2.Pipeline( - pipeline_info=pipeline_pb2.PipelineInfo(id='pipeline1')) - request = service_pb2.StartPipelineRequest(pipeline=pipeline) - response = self._stub.StartPipeline(request) - self.assertEqual(service_pb2.StartPipelineResponse(), response) - mock_start.assert_called_once_with(mock.ANY, pipeline) - - @mock.patch.object(pipeline_ops, 'initiate_pipeline_start') - def test_start_pipeline_failure_to_initiate(self, mock_start): - mock_start.side_effect = status_lib.StatusNotOkError( - code=status_lib.Code.ALREADY_EXISTS, message='already exists') - request = service_pb2.StartPipelineRequest(pipeline=pipeline_pb2.Pipeline()) - with self.assertRaisesRegex(grpc.RpcError, - 'already exists') as exception_context: - self._stub.StartPipeline(request) - self.assertIs(grpc.StatusCode.ALREADY_EXISTS, - exception_context.exception.code()) - mock_start.assert_called_once() - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/BUILD b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/BUILD deleted file mode 100644 index a934ccda1d7..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/BUILD +++ /dev/null @@ -1,29 +0,0 @@ -load("//tfx:tfx.bzl", "tfx_py_proto_library") - -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) # Apache 2.0 - -exports_files(["LICENSE"]) - -tfx_py_proto_library( - name = "service_py_pb2", - srcs = ["service.proto"], - use_grpc_plugin = True, - deps = [ - "//tfx/proto/orchestration:pipeline_py_pb2", - ], -) diff --git a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/service.proto b/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/service.proto deleted file mode 100644 index ecdfb362403..00000000000 --- a/tfx/orchestration/experimental/centralized_kubernetes_orchestrator/service/proto/service.proto +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022 Google LLC. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -syntax = "proto3"; - -package tfx.orchestration.experimental.centralized_kubernetes_orchestrator.service; - -import "tfx/proto/orchestration/pipeline.proto"; - -message EchoRequest { - string msg = 1; -} - -message EchoResponse { - string msg = 1; -} - -// Request to start a pipeline. -message StartPipelineRequest { - // The pipeline IR proto. A pipeline will be started using this pipeline - // definition if there is no currently active pipeline having the same - // pipeline id. Only a previously stopped or a new pipeline can be started. - .tfx.orchestration.Pipeline pipeline = 1; -} - -message StartPipelineResponse {} - -// Request to stop a pipeline. -message StopPipelineRequest { - // The id of the pipeline to be stopped. - string pipeline_id = 1; - - reserved 2; -} - -message StopPipelineResponse {} - -service KubernetesOrchestrator { - // Response returns the same msg as request. - rpc Echo(EchoRequest) returns (EchoResponse) {} - - // Starts a pipeline. A pipeline will be started using the provided pipeline - // definition if there is no currently active pipeline having the same - // `pipeline_id`. Only a previously stopped or a new pipeline can be started. - // The RPC will fail otherwise. - rpc StartPipeline(StartPipelineRequest) returns (StartPipelineResponse) {} - - // Stops a currently active pipeline. - rpc StopPipeline(StopPipelineRequest) returns (StopPipelineResponse) {} -} \ No newline at end of file diff --git a/tfx/orchestration/experimental/kubernetes/README.md b/tfx/orchestration/experimental/kubernetes/README.md deleted file mode 100644 index 057732bb609..00000000000 --- a/tfx/orchestration/experimental/kubernetes/README.md +++ /dev/null @@ -1,86 +0,0 @@ -# TFX Orchestration on Kubernetes - -This orchestrator is experimental and is not suitable for production use. For -pipeline deployment on Kubernetes, we currently recommend that you use the -Kubeflow Pipelines orchestrator found in `tfx/orchestration/kubeflow` - -This package provides experimental support for executing synchronous TFX -pipelines in an on premise Kubernetes cluster as an alternative to -[KubeFlow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) -. Use the workflow below to set up your cluster for pipeline execution. - -## Step 1: Set up a Kubernetes cluster - -### Kubernetes setup - -To create your own on-premise or cloud-based Kubernetes cluster, follow the -[Kubernetes Getting Started Guide](https://kubernetes.io/docs/setup/) to set up -your Kubernetes environment. - -### Creating a Google Kubernetes Engine cluster on Google Cloud Platform - -If you would like to run a managed Kubernetes cluster on Google Cloud, follow -the -[Google Kubernetes Engine Quickstart Guide](https://cloud.google.com/kubernetes-engine/docs/quickstart). - -## Step 2: Set up Jupyter Notebook Service and MySQL MLMD - -First, ensure that you are in the base TFX directory. Use the following command -to deploy the default Jupyter Notebook and MySQL resources: `kubectl apply -k -tfx/orchestration/experimental/kubernetes/yaml/` **Important: If you are using a -Kubernetes cluster other than GKE, go to -tfx/orchestration/experimental/kubernetes/yaml/mysql-pv.yaml and follow the -instructions to modify the configurations for your cluster.** - -### Using the In-Cluster Jupyter Notebook - -The in-cluster Jupyter Notebook allows you to edit files and run pipelines -directly from within your Kubernetes cluster. Note that the contents of this -notebook server are ephemeral, so we suggest using this for testing only. - -To log on to your Jupyter server, you need the log in token. You may customize a -log in password after the first time you log in. To obtain the log in token, -first use `kubectl get pods` to locate the pod name starting with "jupyter-". -Then, read the pod start-up log to obtain the login password by replacing -$YOUR_POD_NAME with the name of the jupyter pod: `kubectl logs $YOUR_POD_NAME` - -Finally, you may use port forwarding to access the server at `localhost:8888`: -`kubectl port-forward $YOUR_POD_NAME 8888:8888` - -### Using the MySQL MLMD - -The MySQL Service will be used as a -[metadata store](https://www.tensorflow.org/tfx/guide/mlmd) for your TFX -pipelines. You do not need to interact with it by default, but it may be useful -for debugging pipeline executions. - -To access the service from the command line, use: `kubectl run -it --rm ---image=mysql:5.6 --restart=Never mysql-client -- mysql --host mysql` - -To use the MySQL instance as a metadata store in your TFX pipeline or -interactive context, first create a custom metadata connection config: -`_metadata_connection_config = metadata.mysql_metadata_connection_config( -host='mysql', port=3306, username='root', database='mysql', password='')` - -Now, you can use this in your pipeline by passing it into the constructor for -`pipeline.Pipeline`: `pipeline.Pipeline( pipeline_name=pipeline_name, -pipeline_root=pipeline_root, components=[ # ... ], -metadata_connection_config=_metadata_connection_config, -beam_pipeline_args=beam_pipeline_args)` - -Similarly, you can initialize a custom interactive context to use this metadata -store with: `context = -InteractiveContext(metadata_connection_config=_metadata_connection_config)` - -## Step 3: Build and upload your TFX image - -The default container image used for executing TFX pipeline components is -`tensorflow/tfx`. If you would like to use a custom container image, you can -start by creating and a custom Dockerfile, for example: `FROM python:3.7 RUN pip -install tfx # Add your dependencies here.` - -Once you have created your Dockerfile, you can build it while tagging your image -name: `docker build -t $YOUR_IMAGE_NAME .` - -Then, upload the image to your cloud container registry: `docker push -$YOUR_IMAGE_NAME` diff --git a/tfx/orchestration/experimental/kubernetes/__init__.py b/tfx/orchestration/experimental/kubernetes/__init__.py deleted file mode 100644 index ca966a36bf0..00000000000 --- a/tfx/orchestration/experimental/kubernetes/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2019 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tfx/orchestration/experimental/kubernetes/container_entrypoint.py b/tfx/orchestration/experimental/kubernetes/container_entrypoint.py deleted file mode 100644 index e04bd59797c..00000000000 --- a/tfx/orchestration/experimental/kubernetes/container_entrypoint.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Main entrypoint for containers with Kubernetes TFX component executors.""" - -import argparse -import json -import logging -import sys - -from tfx.orchestration import data_types -from tfx.orchestration import metadata -from tfx.orchestration.launcher import base_component_launcher -from tfx.utils import import_utils -from tfx.utils import json_utils -from tfx.utils import telemetry_utils - -from google.protobuf import json_format -from ml_metadata.proto import metadata_store_pb2 - - -def main(): - # Log to the container's stdout so it can be streamed by the orchestrator. - logging.basicConfig(stream=sys.stdout, level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) - - parser = argparse.ArgumentParser() - parser.add_argument('--pipeline_name', type=str, required=True) - parser.add_argument('--pipeline_root', type=str, required=True) - parser.add_argument('--run_id', type=str, required=True) - parser.add_argument('--metadata_config', type=str, required=True) - parser.add_argument('--beam_pipeline_args', type=str, required=True) - parser.add_argument('--additional_pipeline_args', type=str, required=True) - parser.add_argument( - '--component_launcher_class_path', type=str, required=True) - parser.add_argument('--enable_cache', action='store_true') - parser.add_argument('--serialized_component', type=str, required=True) - parser.add_argument('--component_config', type=str, required=True) - - args = parser.parse_args() - - component = json_utils.loads(args.serialized_component) - component_config = json_utils.loads(args.component_config) - component_launcher_class = import_utils.import_class_by_path( - args.component_launcher_class_path) - if not issubclass(component_launcher_class, - base_component_launcher.BaseComponentLauncher): - raise TypeError( - 'component_launcher_class "%s" is not subclass of base_component_launcher.BaseComponentLauncher' - % component_launcher_class) - - metadata_config = metadata_store_pb2.ConnectionConfig() - json_format.Parse(args.metadata_config, metadata_config) - driver_args = data_types.DriverArgs(enable_cache=args.enable_cache) - beam_pipeline_args = json.loads(args.beam_pipeline_args) - additional_pipeline_args = json.loads(args.additional_pipeline_args) - - launcher = component_launcher_class.create( - component=component, - pipeline_info=data_types.PipelineInfo( - pipeline_name=args.pipeline_name, - pipeline_root=args.pipeline_root, - run_id=args.run_id, - ), - driver_args=driver_args, - metadata_connection=metadata.Metadata(connection_config=metadata_config), - beam_pipeline_args=beam_pipeline_args, - additional_pipeline_args=additional_pipeline_args, - component_config=component_config) - - # Attach necessary labels to distinguish different runner and DSL. - with telemetry_utils.scoped_labels({ - telemetry_utils.LABEL_TFX_RUNNER: 'kubernetes', - }): - launcher.launch() - - -if __name__ == '__main__': - main() diff --git a/tfx/orchestration/experimental/kubernetes/examples/__init__.py b/tfx/orchestration/experimental/kubernetes/examples/__init__.py deleted file mode 100644 index ca966a36bf0..00000000000 --- a/tfx/orchestration/experimental/kubernetes/examples/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2019 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tfx/orchestration/experimental/kubernetes/examples/download_grep_print_pipeline_on_kubernetes.py b/tfx/orchestration/experimental/kubernetes/examples/download_grep_print_pipeline_on_kubernetes.py deleted file mode 100644 index 8d3eef8fc4b..00000000000 --- a/tfx/orchestration/experimental/kubernetes/examples/download_grep_print_pipeline_on_kubernetes.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Container-based pipeline on kubernetes sample.""" - -import absl - -from tfx.orchestration import pipeline as pipeline_module -from tfx.orchestration.experimental.kubernetes import kubernetes_dag_runner -from tfx.orchestration.test_pipelines.download_grep_print_pipeline import create_pipeline_component_instances - -_pipeline_name = 'download_grep_print_pipeline' - -# Directory and data locations (uses Google Cloud Storage). -_pipeline_root = 'gs://my-bucket' - -absl.logging.set_verbosity(absl.logging.INFO) - - -def _create_pipeline() -> pipeline_module.Pipeline: - """Create sample container component pipeline.""" - - pipeline_name = _pipeline_name - pipeline_root = _pipeline_root - - text_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/370cbcd/data/tinyshakespeare/input.txt' - pattern = 'art thou' - components = create_pipeline_component_instances(text_url, pattern) - - # Use the default in-cluster MySql metadata config. - config = kubernetes_dag_runner.get_default_kubernetes_metadata_config() - - return pipeline_module.Pipeline( - pipeline_name=pipeline_name, - pipeline_root=pipeline_root, - components=components, - metadata_connection_config=config, - enable_cache=False, - ) - - -def main(): - # First, create the tfx pipeline instance. - pipeline = _create_pipeline() - # Use kubernetes dag runner to run the pipeline. - kubernetes_dag_runner.KubernetesDagRunner().run(pipeline=pipeline) - - -if __name__ == '__main__': - main() diff --git a/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes.py b/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes.py deleted file mode 100644 index c7b6de60b2e..00000000000 --- a/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes.py +++ /dev/null @@ -1,179 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Chicago taxi example using TFX Kubernetes Orchestrator.""" - -import os -from typing import List - -import absl -import tensorflow_model_analysis as tfma -from tfx.components import CsvExampleGen -from tfx.components import Evaluator -from tfx.components import ExampleValidator -from tfx.components import Pusher -from tfx.components import SchemaGen -from tfx.components import StatisticsGen -from tfx.components import Trainer -from tfx.components import Transform -from tfx.dsl.components.common import resolver -from tfx.dsl.experimental import latest_blessed_model_resolver -from tfx.orchestration import pipeline -from tfx.orchestration.experimental.kubernetes import kubernetes_dag_runner -from tfx.proto import pusher_pb2 -from tfx.proto import trainer_pb2 -from tfx.types import Channel -from tfx.types.standard_artifacts import Model -from tfx.types.standard_artifacts import ModelBlessing - -_pipeline_name = 'chicago_taxi_beam' - -# Directory and data locations (uses Google Cloud Storage). -_input_bucket = 'gs://my-bucket' -_output_bucket = 'gs://my-bucket' - -# This example assumes that the taxi data is stored in a google cloud storage -# bucket named taxi under `gs://${_input_bucket}/data` and the taxi utility -# function is stored at `gs://${_input_bucket}/taxi_utils.py`. -# Feel free to customize this as needed. -_data_root = os.path.join(_input_bucket, 'data') -_module_file = os.path.join(_input_bucket, 'taxi_utils.py') - -# Directory for pipeline outputs. -_tfx_root = os.path.join(_output_bucket, 'tfx') -_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name) - -# Path which can be listened to by the model server. Pusher will output the -# trained model here. -_serving_model_dir = os.path.join(_tfx_root, 'serving_model', _pipeline_name) - -# Pipeline arguments for Beam powered Components. -_beam_pipeline_args = [ - '--direct_running_mode=multi_processing', - # 0 means auto-detect based on on the number of CPUs available - # during execution time. - '--direct_num_workers=0', -] - - -def create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str, - module_file: str, serving_model_dir: str, - beam_pipeline_args: List[str]) -> pipeline.Pipeline: - """Implements the chicago taxi pipeline with TFX.""" - - # Brings data into the pipeline or otherwise joins/converts training data. - example_gen = CsvExampleGen(input_base=data_root) - - # Computes statistics over data for visualization and example validation. - statistics_gen = StatisticsGen(examples=example_gen.outputs['examples']) - - # Generates schema based on statistics files. - schema_gen = SchemaGen( - statistics=statistics_gen.outputs['statistics'], - infer_feature_shape=False) - - # Performs anomaly detection based on statistics and data schema. - example_validator = ExampleValidator( - statistics=statistics_gen.outputs['statistics'], - schema=schema_gen.outputs['schema']) - - # Performs transformations and feature engineering in training and serving. - transform = Transform( - examples=example_gen.outputs['examples'], - schema=schema_gen.outputs['schema'], - module_file=module_file) - - # Uses user-provided Python function that implements a model. - trainer = Trainer( - module_file=module_file, - transformed_examples=transform.outputs['transformed_examples'], - schema=schema_gen.outputs['schema'], - transform_graph=transform.outputs['transform_graph'], - train_args=trainer_pb2.TrainArgs(num_steps=10000), - eval_args=trainer_pb2.EvalArgs(num_steps=5000)) - - # Get the latest blessed model for model validation. - model_resolver = resolver.Resolver( - strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver, - model=Channel(type=Model), - model_blessing=Channel( - type=ModelBlessing)).with_id('latest_blessed_model_resolver') - - # Uses TFMA to compute a evaluation statistics over features of a model and - # perform quality validation of a candidate model (compared to a baseline). - eval_config = tfma.EvalConfig( - model_specs=[tfma.ModelSpec(signature_name='eval')], - slicing_specs=[ - tfma.SlicingSpec(), - tfma.SlicingSpec(feature_keys=['trip_start_hour']) - ], - metrics_specs=[ - tfma.MetricsSpec( - thresholds={ - 'accuracy': - tfma.MetricThreshold( - value_threshold=tfma.GenericValueThreshold( - lower_bound={'value': 0.6}), - # Change threshold will be ignored if there is no - # baseline model resolved from MLMD (first run). - change_threshold=tfma.GenericChangeThreshold( - direction=tfma.MetricDirection.HIGHER_IS_BETTER, - absolute={'value': -1e-10})) - }) - ]) - evaluator = Evaluator( - examples=example_gen.outputs['examples'], - model=trainer.outputs['model'], - baseline_model=model_resolver.outputs['model'], - eval_config=eval_config) - - # Checks whether the model passed the validation steps and pushes the model - # to a file destination if check passed. - pusher = Pusher( - model=trainer.outputs['model'], - model_blessing=evaluator.outputs['blessing'], - push_destination=pusher_pb2.PushDestination( - filesystem=pusher_pb2.PushDestination.Filesystem( - base_directory=serving_model_dir))) - - config = kubernetes_dag_runner.get_default_kubernetes_metadata_config() - return pipeline.Pipeline( - pipeline_name=pipeline_name, - pipeline_root=pipeline_root, - components=[ - example_gen, - statistics_gen, - schema_gen, - example_validator, - transform, - trainer, - model_resolver, - evaluator, - pusher, - ], - enable_cache=False, - metadata_connection_config=config, - beam_pipeline_args=beam_pipeline_args) - - -if __name__ == '__main__': - absl.logging.set_verbosity(absl.logging.INFO) - - kubernetes_dag_runner.KubernetesDagRunner().run( - create_pipeline( - pipeline_name=_pipeline_name, - pipeline_root=_pipeline_root, - data_root=_data_root, - module_file=_module_file, - serving_model_dir=_serving_model_dir, - beam_pipeline_args=_beam_pipeline_args)) diff --git a/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes_test.py b/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes_test.py deleted file mode 100644 index abc2317a8ab..00000000000 --- a/tfx/orchestration/experimental/kubernetes/examples/taxi_pipeline_kubernetes_test.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.experimental.kubernetes.examples.taxi_pipeline_kubernetes.""" - -import os -import tensorflow as tf -from tfx.orchestration.experimental.kubernetes.examples import taxi_pipeline_kubernetes - - -class TaxiPipelineKubernetesTest(tf.test.TestCase): - - def setUp(self): - super().setUp() - self._test_dir = os.path.join( - os.environ.get('TEST_UNDECLARED_OUTPUTS_DIR', self.get_temp_dir()), - self._testMethodName) - - def testTaxiPipelineCheckDagConstruction(self): - logical_pipeline = taxi_pipeline_kubernetes.create_pipeline( - pipeline_name='Test', - pipeline_root=self._test_dir, - data_root=self._test_dir, - module_file=self._test_dir, - serving_model_dir=self._test_dir, - beam_pipeline_args=[]) - self.assertEqual(9, len(logical_pipeline.components)) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner.py b/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner.py deleted file mode 100644 index a2482939236..00000000000 --- a/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner.py +++ /dev/null @@ -1,257 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Definition of Kubernetes TFX runner.""" - -import datetime -import json -from typing import List, Optional, Type - -from absl import logging -from tfx.dsl.component.experimental import container_component -from tfx.dsl.components.base import base_node -from tfx.orchestration import data_types -from tfx.orchestration import metadata -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration import tfx_runner -from tfx.orchestration.config import base_component_config -from tfx.orchestration.config import config_utils -from tfx.orchestration.config import pipeline_config -from tfx.orchestration.experimental.kubernetes import kubernetes_remote_runner -from tfx.orchestration.experimental.kubernetes import node_wrapper -from tfx.orchestration.launcher import base_component_launcher -from tfx.orchestration.launcher import in_process_component_launcher -from tfx.orchestration.launcher import kubernetes_component_launcher -from tfx.utils import json_utils -from tfx.utils import kube_utils -from tfx.utils import name_utils - -from google.protobuf import json_format -from ml_metadata.proto import metadata_store_pb2 - -_CONTAINER_COMMAND = [ - 'python', '-m', - 'tfx.orchestration.experimental.kubernetes.container_entrypoint' -] - -# Suffix added to the component id to avoid MLMD conflict when -# registering this component. -_WRAPPER_SUFFIX = '.Wrapper' - -_TFX_IMAGE = 'tensorflow/tfx' - - -def get_default_kubernetes_metadata_config( -) -> metadata_store_pb2.ConnectionConfig: - """Returns the default metadata connection config for a kubernetes cluster. - - Returns: - A config proto that will be serialized as JSON and passed to the running - container so the TFX component driver is able to communicate with MLMD in - a kubernetes cluster. - """ - connection_config = metadata_store_pb2.ConnectionConfig() - connection_config.mysql.host = 'mysql' - connection_config.mysql.port = 3306 - connection_config.mysql.database = 'mysql' - connection_config.mysql.user = 'root' - connection_config.mysql.password = '' - return connection_config - - -def launch_container_component( - component: base_node.BaseNode, - component_launcher_class: Type[ - base_component_launcher.BaseComponentLauncher], - component_config: base_component_config.BaseComponentConfig, - pipeline: tfx_pipeline.Pipeline): - """Use the kubernetes component launcher to launch the component. - - Args: - component: Container component to be executed. - component_launcher_class: The class of the launcher to launch the component. - component_config: component config to launch the component. - pipeline: Logical pipeline that contains pipeline related information. - """ - driver_args = data_types.DriverArgs(enable_cache=pipeline.enable_cache) - metadata_connection = metadata.Metadata(pipeline.metadata_connection_config) - - component_launcher = component_launcher_class.create( - component=component, - pipeline_info=pipeline.pipeline_info, - driver_args=driver_args, - metadata_connection=metadata_connection, - beam_pipeline_args=pipeline.beam_pipeline_args, - additional_pipeline_args=pipeline.additional_pipeline_args, - component_config=component_config) - logging.info('Component %s is running.', component.id) - component_launcher.launch() - logging.info('Component %s is finished.', component.id) - - -class KubernetesDagRunnerConfig(pipeline_config.PipelineConfig): - """Runtime configuration parameters specific to execution on Kubernetes.""" - - def __init__(self, - tfx_image: Optional[str] = None, - supported_launcher_classes: Optional[List[Type[ - base_component_launcher.BaseComponentLauncher]]] = None, - **kwargs): - """Creates a KubernetesDagRunnerConfig object. - - Args: - tfx_image: The TFX container image to use in the pipeline. - supported_launcher_classes: Optional list of component launcher classes - that are supported by the current pipeline. List sequence determines the - order in which launchers are chosen for each component being run. - **kwargs: keyword args for PipelineConfig. - """ - supported_launcher_classes = supported_launcher_classes or [ - in_process_component_launcher.InProcessComponentLauncher, - kubernetes_component_launcher.KubernetesComponentLauncher, - ] - super().__init__( - supported_launcher_classes=supported_launcher_classes, **kwargs) - self.tfx_image = tfx_image or _TFX_IMAGE - - -class KubernetesDagRunner(tfx_runner.TfxRunner): - """TFX runner on Kubernetes.""" - - def __init__(self, config: Optional[KubernetesDagRunnerConfig] = None): - """Initializes KubernetesDagRunner as a TFX orchestrator. - - Args: - config: Optional pipeline config for customizing the launching of each - component. Defaults to pipeline config that supports - InProcessComponentLauncher and KubernetesComponentLauncher. - """ - if config is None: - config = KubernetesDagRunnerConfig() - super().__init__(config) - - def run(self, pipeline: tfx_pipeline.Pipeline) -> None: - """Deploys given logical pipeline on Kubernetes. - - Args: - pipeline: Logical pipeline containing pipeline args and components. - """ - if not pipeline.pipeline_info.run_id: - pipeline.pipeline_info.run_id = datetime.datetime.now().isoformat() - - if not kube_utils.is_inside_cluster(): - kubernetes_remote_runner.run_as_kubernetes_job( - pipeline=pipeline, tfx_image=self._config.tfx_image) - return - # TODO(ericlege): Support running components in parallel. - ran_components = set() - - # Runs component in topological order. - for component in pipeline.components: - # Verify that components are in topological order. - if hasattr(component, 'upstream_nodes') and component.upstream_nodes: - for upstream_node in component.upstream_nodes: - assert upstream_node in ran_components, ('Components is not in ' - 'topological order') - - (component_launcher_class, - component_config) = config_utils.find_component_launch_info( - self._config, component) - - # Check if the component is launchable as a container component. - if kubernetes_component_launcher.KubernetesComponentLauncher.can_launch( - component.executor_spec, component_config): - launch_container_component(component, component_launcher_class, - component_config, pipeline) - # Otherwise, the component should be launchable with the in process - # component launcher. wrap the component to a container component. - elif in_process_component_launcher.InProcessComponentLauncher.can_launch( - component.executor_spec, component_config): - wrapped_component = self._wrap_container_component( - component=component, - component_launcher_class=component_launcher_class, - component_config=component_config, - pipeline=pipeline) - - # Component launch info is updated by wrapping the component into a - # container component. Therefore, these properties need to be reloaded. - (wrapped_component_launcher_class, - wrapped_component_config) = config_utils.find_component_launch_info( - self._config, wrapped_component) - - launch_container_component(wrapped_component, - wrapped_component_launcher_class, - wrapped_component_config, pipeline) - else: - raise ValueError('Can not find suitable launcher for component.') - - ran_components.add(component) - - def _wrap_container_component( - self, - component: base_node.BaseNode, - component_launcher_class: Type[ - base_component_launcher.BaseComponentLauncher], - component_config: Optional[base_component_config.BaseComponentConfig], - pipeline: tfx_pipeline.Pipeline, - ) -> base_node.BaseNode: - """Wrapper for container component. - - Args: - component: Component to be executed. - component_launcher_class: The class of the launcher to launch the - component. - component_config: component config to launch the component. - pipeline: Logical pipeline that contains pipeline related information. - - Returns: - A container component that runs the wrapped component upon execution. - """ - - component_launcher_class_path = name_utils.get_full_name( - component_launcher_class) - - serialized_component = json_utils.dumps(node_wrapper.NodeWrapper(component)) - - arguments = [ - '--pipeline_name', - pipeline.pipeline_info.pipeline_name, - '--pipeline_root', - pipeline.pipeline_info.pipeline_root, - '--run_id', - pipeline.pipeline_info.run_id, - '--metadata_config', - json_format.MessageToJson( - message=get_default_kubernetes_metadata_config(), - preserving_proto_field_name=True), - '--beam_pipeline_args', - json.dumps(pipeline.beam_pipeline_args), - '--additional_pipeline_args', - json.dumps(pipeline.additional_pipeline_args), - '--component_launcher_class_path', - component_launcher_class_path, - '--serialized_component', - serialized_component, - '--component_config', - json_utils.dumps(component_config), - ] - - # Outputs/Parameters fields are not used as they are contained in - # the serialized component. - return container_component.create_container_component( - name=component.__class__.__name__, - outputs={}, - parameters={}, - image=self._config.tfx_image, - command=_CONTAINER_COMMAND + arguments)().with_id(component.id + - _WRAPPER_SUFFIX) diff --git a/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner_test.py b/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner_test.py deleted file mode 100644 index 378c21daac5..00000000000 --- a/tfx/orchestration/experimental/kubernetes/kubernetes_dag_runner_test.py +++ /dev/null @@ -1,201 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.kubernetes.kubernetes_dag_runner.""" - -from unittest import mock -import tensorflow as tf -from tfx import types -from tfx.dsl.components.base import base_component -from tfx.dsl.components.base import base_executor -from tfx.dsl.components.base import base_node -from tfx.dsl.components.base import executor_spec -from tfx.orchestration import pipeline -from tfx.orchestration.experimental.kubernetes import kubernetes_dag_runner -from tfx.types.component_spec import ChannelParameter - -from ml_metadata.proto import metadata_store_pb2 - -_executed_components = [] - - -class _ArtifactTypeA(types.Artifact): - TYPE_NAME = 'ArtifactTypeA' - - -class _ArtifactTypeB(types.Artifact): - TYPE_NAME = 'ArtifactTypeB' - - -class _ArtifactTypeC(types.Artifact): - TYPE_NAME = 'ArtifactTypeC' - - -class _ArtifactTypeD(types.Artifact): - TYPE_NAME = 'ArtifactTypeD' - - -class _ArtifactTypeE(types.Artifact): - TYPE_NAME = 'ArtifactTypeE' - - -def _initialize_executed_components(): - global _executed_components - _executed_components = [] - - -def _mock_launch_container_component(component: base_node.BaseNode, *_): - _executed_components.append(component.id) - - -# We define fake component spec classes below for testing. Note that we can't -# programmatically generate component using anonymous classes for testing -# because of a limitation in the "dill" pickler component used by Apache Beam. -# An alternative we considered but rejected here was to write a function that -# returns anonymous classes within that function's closure (as is done in -# tfx/orchestration/pipeline_test.py), but that strategy does not work here -# as these anonymous classes cannot be used with Beam, since they cannot be -# pickled with the "dill" library. -class _FakeComponentSpecA(types.ComponentSpec): - PARAMETERS = {} - INPUTS = {} - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeA)} - - -class _FakeComponentSpecB(types.ComponentSpec): - PARAMETERS = {} - INPUTS = {'a': ChannelParameter(type=_ArtifactTypeA)} - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeB)} - - -class _FakeComponentSpecC(types.ComponentSpec): - PARAMETERS = {} - INPUTS = {'a': ChannelParameter(type=_ArtifactTypeA)} - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeC)} - - -class _FakeComponentSpecD(types.ComponentSpec): - PARAMETERS = {} - INPUTS = { - 'b': ChannelParameter(type=_ArtifactTypeB), - 'c': ChannelParameter(type=_ArtifactTypeC), - } - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeD)} - - -class _FakeComponentSpecE(types.ComponentSpec): - PARAMETERS = {} - INPUTS = { - 'a': ChannelParameter(type=_ArtifactTypeA), - 'b': ChannelParameter(type=_ArtifactTypeB), - 'd': ChannelParameter(type=_ArtifactTypeD), - } - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeE)} - - -class _FakeComponentSpecF(types.ComponentSpec): - PARAMETERS = {} - INPUTS = { - 'a': ChannelParameter(type=_ArtifactTypeA), - } - OUTPUTS = {} - - -class _FakeComponent(base_component.BaseComponent): - - SPEC_CLASS = types.ComponentSpec - EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(base_executor.BaseExecutor) - - def __init__(self, spec: types.ComponentSpec): - super().__init__(spec=spec) - self._id = spec.__class__.__name__.replace('_FakeComponentSpec', '').lower() - - -class KubernetesDagRunnerTest(tf.test.TestCase): - - @mock.patch.object( - kubernetes_dag_runner, - 'launch_container_component', - _mock_launch_container_component, - ) - @mock.patch.object(kubernetes_dag_runner, 'kube_utils') - def testRun(self, mock_kube_utils): - _initialize_executed_components() - mock_kube_utils.is_inside_cluster.return_value = True - - component_a = _FakeComponent( - spec=_FakeComponentSpecA(output=types.Channel(type=_ArtifactTypeA))) - component_b = _FakeComponent( - spec=_FakeComponentSpecB( - a=component_a.outputs['output'], - output=types.Channel(type=_ArtifactTypeB))) - component_c = _FakeComponent( - spec=_FakeComponentSpecC( - a=component_a.outputs['output'], - output=types.Channel(type=_ArtifactTypeC))) - component_c.add_upstream_node(component_b) - component_d = _FakeComponent( - spec=_FakeComponentSpecD( - b=component_b.outputs['output'], - c=component_c.outputs['output'], - output=types.Channel(type=_ArtifactTypeD))) - component_e = _FakeComponent( - spec=_FakeComponentSpecE( - a=component_a.outputs['output'], - b=component_b.outputs['output'], - d=component_d.outputs['output'], - output=types.Channel(type=_ArtifactTypeE))) - - test_pipeline = pipeline.Pipeline( - pipeline_name='x', - pipeline_root='y', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[ - component_d, component_c, component_a, component_b, component_e - ]) - - kubernetes_dag_runner.KubernetesDagRunner().run(test_pipeline) - self.assertEqual( - _executed_components, - ['a.Wrapper', 'b.Wrapper', 'c.Wrapper', 'd.Wrapper', 'e.Wrapper']) - - @mock.patch.object( - kubernetes_dag_runner, - 'launch_container_component', - _mock_launch_container_component, - ) - @mock.patch.object(kubernetes_dag_runner, 'kube_utils') - def testRunWithSameSpec(self, mock_kube_utils): - _initialize_executed_components() - mock_kube_utils.is_inside_cluster.return_value = True - - component_a = _FakeComponent( - spec=_FakeComponentSpecA(output=types.Channel(type=_ArtifactTypeA))) - component_f1 = _FakeComponent( - spec=_FakeComponentSpecF(a=component_a.outputs['output'])).with_id('f1') - component_f2 = _FakeComponent( - spec=_FakeComponentSpecF(a=component_a.outputs['output'])).with_id('f2') - component_f2.add_upstream_node(component_f1) - - test_pipeline = pipeline.Pipeline( - pipeline_name='x', - pipeline_root='y', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[component_f1, component_f2, component_a]) - kubernetes_dag_runner.KubernetesDagRunner().run(test_pipeline) - self.assertEqual(_executed_components, - ['a.Wrapper', 'f1.Wrapper', 'f2.Wrapper']) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner.py b/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner.py deleted file mode 100644 index 496a641cae5..00000000000 --- a/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner.py +++ /dev/null @@ -1,265 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubernetes TFX runner for out-of-cluster orchestration.""" - -import datetime -import json -import time -from typing import Dict, List - -from absl import logging -from kubernetes import client -from tfx.dsl.components.base import base_node -from tfx.dsl.context_managers import dsl_context_registry -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.experimental.kubernetes import node_wrapper -from tfx.utils import json_utils -from tfx.utils import kube_utils - -from google.protobuf import json_format -from ml_metadata.proto import metadata_store_pb2 - -_ORCHESTRATOR_COMMAND = [ - 'python', '-m', - 'tfx.orchestration.experimental.kubernetes.orchestrator_container_entrypoint' -] - -# Number of seconds to wait for a Kubernetes job to spawn a pod. -# This is expected to take only a few seconds. -JOB_CREATION_TIMEOUT = 300 - - -def run_as_kubernetes_job(pipeline: tfx_pipeline.Pipeline, - tfx_image: str) -> None: - """Submits and runs a TFX pipeline from outside the cluster. - - Args: - pipeline: Logical pipeline containing pipeline args and components. - tfx_image: Container image URI for the TFX container. - - Raises: - RuntimeError: When an error is encountered running the Kubernetes Job. - """ - - # TODO(ccy): Look for alternative serialization schemes once available. - serialized_pipeline = _serialize_pipeline(pipeline) - arguments = [ - '--serialized_pipeline', - serialized_pipeline, - '--tfx_image', - tfx_image, - ] - batch_api = kube_utils.make_batch_v1_api() - job_name = 'Job_' + pipeline.pipeline_info.run_id - pod_label = kube_utils.sanitize_pod_name(job_name) - container_name = 'pipeline-orchestrator' - job = kube_utils.make_job_object( - name=job_name, - container_image=tfx_image, - command=_ORCHESTRATOR_COMMAND + arguments, - container_name=container_name, - pod_labels={ - 'job-name': pod_label, - }, - service_account_name=kube_utils.TFX_SERVICE_ACCOUNT, - ) - try: - batch_api.create_namespaced_job('default', job, pretty=True) - except client.rest.ApiException as e: - raise RuntimeError('Failed to submit job! \nReason: %s\nBody: %s' % - (e.reason, e.body)) - - # Wait for pod to start. - orchestrator_pods = [] - core_api = kube_utils.make_core_v1_api() - start_time = datetime.datetime.utcnow() - - # Wait for the kubernetes job to launch a pod. - while not orchestrator_pods and (datetime.datetime.utcnow() - - start_time).seconds < JOB_CREATION_TIMEOUT: - try: - orchestrator_pods = core_api.list_namespaced_pod( - namespace='default', - label_selector='job-name={}'.format(pod_label)).items - except client.rest.ApiException as e: - if e.status != 404: - raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' % - (e.reason, e.body)) - time.sleep(1) - - # Transient orchestrator should only have 1 pod. - if len(orchestrator_pods) != 1: - raise RuntimeError('Expected 1 pod launched by Kubernetes job, found %d' % - len(orchestrator_pods)) - orchestrator_pod = orchestrator_pods.pop() - pod_name = orchestrator_pod.metadata.name - - logging.info('Waiting for pod "default:%s" to start.', pod_name) - kube_utils.wait_pod( - core_api, - pod_name, - 'default', - exit_condition_lambda=kube_utils.pod_is_not_pending, - condition_description='non-pending status') - - # Stream logs from orchestrator pod. - logging.info('Start log streaming for pod "default:%s".', pod_name) - try: - logs = core_api.read_namespaced_pod_log( - name=pod_name, - namespace='default', - container=container_name, - follow=True, - _preload_content=False).stream() - except client.rest.ApiException as e: - raise RuntimeError( - 'Failed to stream the logs from the pod!\nReason: %s\nBody: %s' % - (e.reason, e.body)) - - for log in logs: - logging.info(log.decode().rstrip('\n')) - - resp = kube_utils.wait_pod( - core_api, - pod_name, - 'default', - exit_condition_lambda=kube_utils.pod_is_done, - condition_description='done state', - exponential_backoff=True) - - if resp.status.phase == kube_utils.PodPhase.FAILED.value: - raise RuntimeError('Pod "default:%s" failed with status "%s".' % - (pod_name, resp.status)) - - -def _extract_downstream_ids( - components: List[base_node.BaseNode]) -> Dict[str, List[str]]: - """Extract downstream component ids from a list of components. - - Args: - components: List of TFX Components. - - Returns: - Mapping from component id to ids of its downstream components for - each component. - """ - - downstream_ids = {} - for component in components: - downstream_ids[component.id] = [ - downstream_node.id for downstream_node in component.downstream_nodes - ] - return downstream_ids - - -def _serialize_pipeline(pipeline: tfx_pipeline.Pipeline) -> str: - """Serializes a TFX pipeline. - - To be replaced with the the TFX Intermediate Representation: - tensorflow/community#271. This serialization procedure extracts from - the pipeline properties necessary for reconstructing the pipeline instance - from within the cluster. For properties such as components and metadata - config that can not be directly dumped with JSON, we use NodeWrapper and - MessageToJson to serialize them beforehand. - - Args: - pipeline: Logical pipeline containing pipeline args and components. - - Returns: - Pipeline serialized as JSON string. - """ - serialized_components = [] - for component in pipeline.components: - serialized_components.append( - json_utils.dumps(node_wrapper.NodeWrapper(component))) - # Extract and pass pipeline graph information which are lost during the - # serialization process. The orchestrator container uses downstream_ids - # to reconstruct pipeline graph. - downstream_ids = _extract_downstream_ids(pipeline.components) - return json.dumps({ - 'pipeline_name': - pipeline.pipeline_info.pipeline_name, - 'pipeline_root': - pipeline.pipeline_info.pipeline_root, - 'enable_cache': - pipeline.enable_cache, - 'components': - serialized_components, - 'downstream_ids': - downstream_ids, - 'metadata_connection_config': - json_format.MessageToJson( - message=pipeline.metadata_connection_config, - preserving_proto_field_name=True, - ), - 'beam_pipeline_args': - pipeline.beam_pipeline_args, - }) - - -def deserialize_pipeline(serialized_pipeline: str) -> tfx_pipeline.Pipeline: - """Deserializes a TFX pipeline. - - To be replaced with the the TFX Intermediate Representation: - tensorflow/community#271. This deserialization procedure reverses the - serialization procedure and reconstructs the pipeline instance. - - Args: - serialized_pipeline: Pipeline JSON string serialized with the procedure from - _serialize_pipeline. - - Returns: - Original pipeline containing pipeline args and components. - """ - - pipeline = json.loads(serialized_pipeline) - components = [ - json_utils.loads(component) for component in pipeline['components'] - ] - for c in components: - dsl_context_registry.get().put_node(c) - - metadata_connection_config = metadata_store_pb2.ConnectionConfig() - json_format.Parse(pipeline['metadata_connection_config'], - metadata_connection_config) - - # Restore component dependencies. - downstream_ids = pipeline['downstream_ids'] - if not isinstance(downstream_ids, dict): - raise ValueError("downstream_ids needs to be a 'dict'.") - if len(downstream_ids) != len(components): - raise ValueError( - 'Wrong number of items in downstream_ids. Expected: %s. Actual: %d' % - len(components), len(downstream_ids)) - - id_to_component = {component.id: component for component in components} - for component in components: - # Since downstream and upstream node attributes are discarded during the - # serialization process, we initialize them here. - component._upstream_nodes = set() # pylint: disable=protected-access - component._downstream_nodes = set() # pylint: disable=protected-access - - for upstream_id, downstream_id_list in downstream_ids.items(): - upstream_component = id_to_component[upstream_id] - for downstream_id in downstream_id_list: - upstream_component.add_downstream_node(id_to_component[downstream_id]) - - return tfx_pipeline.Pipeline( - pipeline_name=pipeline['pipeline_name'], - pipeline_root=pipeline['pipeline_root'], - components=components, - enable_cache=pipeline['enable_cache'], - metadata_connection_config=metadata_connection_config, - beam_pipeline_args=pipeline['beam_pipeline_args'], - ) diff --git a/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner_test.py b/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner_test.py deleted file mode 100644 index 9a3b46cbbb7..00000000000 --- a/tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner_test.py +++ /dev/null @@ -1,158 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubernetes TFX runner for out-of-cluster orchestration.""" - -import json - -import tensorflow as tf -from tfx import types -from tfx.dsl.components.base import base_component -from tfx.dsl.components.base import base_executor -from tfx.dsl.components.base import executor_spec -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.experimental.kubernetes import kubernetes_remote_runner -from tfx.types.component_spec import ChannelParameter -from tfx.utils import json_utils - -from google.protobuf import json_format -from ml_metadata.proto import metadata_store_pb2 - - -class _ArtifactTypeA(types.Artifact): - TYPE_NAME = 'ArtifactTypeA' - - -class _ArtifactTypeB(types.Artifact): - TYPE_NAME = 'ArtifactTypeB' - - -class _ArtifactTypeC(types.Artifact): - TYPE_NAME = 'ArtifactTypeC' - - -class _FakeComponentSpecA(types.ComponentSpec): - PARAMETERS = {} - INPUTS = {} - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeA)} - - -class _FakeComponentSpecB(types.ComponentSpec): - PARAMETERS = {} - INPUTS = {'a': ChannelParameter(type=_ArtifactTypeA)} - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeB)} - - -class _FakeComponentSpecC(types.ComponentSpec): - PARAMETERS = {} - INPUTS = { - 'a': ChannelParameter(type=_ArtifactTypeA), - 'b': ChannelParameter(type=_ArtifactTypeB) - } - OUTPUTS = {'output': ChannelParameter(type=_ArtifactTypeC)} - - -class _FakeComponent(base_component.BaseComponent): - SPEC_CLASS = types.ComponentSpec - EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(base_executor.BaseExecutor) - - def __init__(self, spec: types.ComponentSpec): - super().__init__(spec=spec) - self._id = spec.__class__.__name__.replace('_FakeComponentSpec', '').lower() - - -class KubernetesRemoteRunnerTest(tf.test.TestCase): - - def setUp(self): - super().setUp() - self.component_a = _FakeComponent( - _FakeComponentSpecA(output=types.Channel(type=_ArtifactTypeA))) - self.component_b = _FakeComponent( - _FakeComponentSpecB( - a=self.component_a.outputs['output'], - output=types.Channel(type=_ArtifactTypeB))) - self.component_c = _FakeComponent( - _FakeComponentSpecC( - a=self.component_a.outputs['output'], - b=self.component_b.outputs['output'], - output=types.Channel(type=_ArtifactTypeC))) - self.test_pipeline = tfx_pipeline.Pipeline( - pipeline_name='x', - pipeline_root='y', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[self.component_c, self.component_a, self.component_b]) - - def testSerialization(self): - serialized_pipeline = kubernetes_remote_runner._serialize_pipeline( # pylint: disable=protected-access - self.test_pipeline) - - pipeline = json.loads(serialized_pipeline) - components = [ - json_utils.loads(component) for component in pipeline['components'] - ] - metadata_connection_config = metadata_store_pb2.ConnectionConfig() - json_format.Parse(pipeline['metadata_connection_config'], - metadata_connection_config) - expected_downstream_ids = { - 'a': ['b', 'c'], - 'b': ['c'], - 'c': [], - } - self.assertEqual(self.test_pipeline.pipeline_info.pipeline_name, - pipeline['pipeline_name']) - self.assertEqual(self.test_pipeline.pipeline_info.pipeline_root, - pipeline['pipeline_root']) - self.assertEqual(self.test_pipeline.enable_cache, pipeline['enable_cache']) - self.assertEqual(self.test_pipeline.beam_pipeline_args, - pipeline['beam_pipeline_args']) - self.assertEqual(self.test_pipeline.metadata_connection_config, - metadata_connection_config) - self.assertListEqual([ - component.executor_spec.executor_class - for component in self.test_pipeline.components - ], [component.executor_spec.executor_class for component in components]) - self.assertEqual(self.test_pipeline.metadata_connection_config, - metadata_connection_config) - # Enforce order of downstream ids for comparison. - for downstream_ids in pipeline['downstream_ids'].values(): - downstream_ids.sort() - self.assertEqual(expected_downstream_ids, pipeline['downstream_ids']) - - def testDeserialization(self): - serialized_pipeline = kubernetes_remote_runner._serialize_pipeline( # pylint: disable=protected-access - self.test_pipeline) - pipeline = kubernetes_remote_runner.deserialize_pipeline( - serialized_pipeline) - - self.assertEqual(self.test_pipeline.pipeline_info.pipeline_name, - pipeline.pipeline_info.pipeline_name) - self.assertEqual(self.test_pipeline.pipeline_info.pipeline_root, - pipeline.pipeline_info.pipeline_root) - self.assertEqual(self.test_pipeline.enable_cache, pipeline.enable_cache) - self.assertEqual(self.test_pipeline.beam_pipeline_args, - pipeline.beam_pipeline_args) - self.assertEqual(self.test_pipeline.metadata_connection_config, - pipeline.metadata_connection_config) - self.assertListEqual([ - component.executor_spec.executor_class - for component in self.test_pipeline.components - ], [ - component.executor_spec.executor_class - for component in pipeline.components - ]) - self.assertEqual(self.test_pipeline.metadata_connection_config, - pipeline.metadata_connection_config) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/experimental/kubernetes/node_wrapper.py b/tfx/orchestration/experimental/kubernetes/node_wrapper.py deleted file mode 100644 index 6654967e120..00000000000 --- a/tfx/orchestration/experimental/kubernetes/node_wrapper.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A wrapper to pass a node without its type information.""" - -from typing import Any, Dict - -from tfx.dsl.components.base import base_node - - -class NodeWrapper(base_node.BaseNode): - """Wrapper of a node. - - The wrapper is needed for container entrypoint to deserialize a component - wihtout knowning it's original python class. This enables users - to use container base component without re-compiling the tfx base image every - time they change the component and spec definitions. - """ - - def __init__(self, node: base_node.BaseNode): - self.executor_spec = node.executor_spec - self.driver_class = node.driver_class - self._type = node.type - self._id = node.id - self._inputs = node.inputs - self._outputs = node.outputs - self._exec_properties = node.exec_properties - # Currently the NodeExecutionOptions in tfx.dsl.experiment.utils is for the - # experimental orchestrator, but we need to set the field here anyways so - # the property can be accessed properly. - self._node_execution_options = None - - @property - def type(self) -> str: - return self._type - - @property - def id(self) -> str: - return self._id - - @property - def inputs(self) -> Dict[str, Any]: - return self._inputs - - @property - def outputs(self) -> Dict[str, Any]: - return self._outputs - - @property - def exec_properties(self) -> Dict[str, Any]: - return self._exec_properties diff --git a/tfx/orchestration/experimental/kubernetes/orchestrator_container_entrypoint.py b/tfx/orchestration/experimental/kubernetes/orchestrator_container_entrypoint.py deleted file mode 100644 index 2c1b0678354..00000000000 --- a/tfx/orchestration/experimental/kubernetes/orchestrator_container_entrypoint.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Main entrypoint for orchestrator container on Kubernetes.""" - -import argparse -import logging -import sys - -from tfx.orchestration.experimental.kubernetes import kubernetes_dag_runner -from tfx.orchestration.experimental.kubernetes import kubernetes_remote_runner - - -def main(): - # Log to the container's stdout so it can be streamed by the client. - logging.basicConfig(stream=sys.stdout, level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) - - parser = argparse.ArgumentParser() - - # Pipeline is serialized via a json format. - # See kubernetes_remote_runner._serialize_pipeline for details. - parser.add_argument('--serialized_pipeline', type=str, required=True) - parser.add_argument('--tfx_image', type=str, required=True) - args = parser.parse_args() - - kubernetes_dag_runner.KubernetesDagRunner( - config=kubernetes_dag_runner.KubernetesDagRunnerConfig( - tfx_image=args.tfx_image)).run( - kubernetes_remote_runner.deserialize_pipeline( - args.serialized_pipeline)) - - -if __name__ == '__main__': - main() diff --git a/tfx/orchestration/experimental/kubernetes/yaml/jupyter.yaml b/tfx/orchestration/experimental/kubernetes/yaml/jupyter.yaml deleted file mode 100644 index 7085a2a456e..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/jupyter.yaml +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: jupyter -spec: - selector: - matchLabels: - app: jupyter - replicas: 1 - template: - metadata: - labels: - app: jupyter - spec: - containers: - - name: jupyter - image: jupyter/tensorflow-notebook:ubuntu-18.04 - ports: - - containerPort: 8888 diff --git a/tfx/orchestration/experimental/kubernetes/yaml/kustomization.yaml b/tfx/orchestration/experimental/kubernetes/yaml/kustomization.yaml deleted file mode 100644 index 9fe16cf8c53..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/kustomization.yaml +++ /dev/null @@ -1,6 +0,0 @@ -resources: -- jupyter.yaml -- mysql.yaml -- mysql-pv.yaml -- roles.yaml -- service-account.yaml diff --git a/tfx/orchestration/experimental/kubernetes/yaml/mysql-pv.yaml b/tfx/orchestration/experimental/kubernetes/yaml/mysql-pv.yaml deleted file mode 100644 index 183aec47f95..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/mysql-pv.yaml +++ /dev/null @@ -1,33 +0,0 @@ -# Uncomment the following lines when running Kubernetes outside -# Google Kubernetes Engine (see -# https://kubernetes.io/docs/tasks/configure-pod-container/configure-persistent-volume-storage/ -# and https://github.com/kubernetes/website/issues/10697) - -# apiVersion: v1 -# kind: PersistentVolume -# metadata: -# name: mysql-pv-volume -# labels: -# type: local -# spec: -# storageClassName: manual -# capacity: -# storage: 20Gi -# accessModes: -# - ReadWriteOnce -# hostPath: -# path: "/mnt/data" -# --- -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: mysql-pv-claim -spec: -# Uncomment the following line when running Kubernetes outside -# Google Kubernetes Engine. -# storageClassName: manual - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 20Gi diff --git a/tfx/orchestration/experimental/kubernetes/yaml/mysql.yaml b/tfx/orchestration/experimental/kubernetes/yaml/mysql.yaml deleted file mode 100644 index e317c4064cb..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/mysql.yaml +++ /dev/null @@ -1,58 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: mysql -spec: - ports: - - port: 3306 - selector: - app: mysql - clusterIP: None ---- -# For Kubeflow compatibility, we forward the MySql service to the -# kubeflow namespace so that resources in this namespace can access the -# same MLMD. Commenting out as it can not be define with above service. -# Interested users can uncomment this part and try it out, after -# commenting above service. -# apiVersion: v1 -# kind: Service -# metadata: -# name: mysql -# namespace: kubeflow -# spec: -# type: ExternalName -# externalName: mysql.default.svc.cluster.local -# ports: -# - port: 3306 -# --- -apiVersion: apps/v1 # for versions before 1.9.0 use apps/v1beta2 -kind: Deployment -metadata: - name: mysql -spec: - selector: - matchLabels: - app: mysql - strategy: - type: Recreate - template: - metadata: - labels: - app: mysql - spec: - containers: - - image: gcr.io/ml-pipeline/mysql:5.6 - name: mysql - env: - - name: MYSQL_ALLOW_EMPTY_PASSWORD - value: "true" - ports: - - containerPort: 3306 - name: mysql - volumeMounts: - - name: mysql-persistent-storage - mountPath: /var/lib/mysql - volumes: - - name: mysql-persistent-storage - persistentVolumeClaim: - claimName: mysql-pv-claim diff --git a/tfx/orchestration/experimental/kubernetes/yaml/roles.yaml b/tfx/orchestration/experimental/kubernetes/yaml/roles.yaml deleted file mode 100644 index 0146e86e8c0..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/roles.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: rbac.authorization.k8s.io/v1 -# This cluster role binding allows the tfx service account to edit pods -# For Kubeflow compatibility, we bind this role to both the default and -# kubeflow namespace. This may be removed in a future version. -kind: ClusterRoleBinding -metadata: - name: tfx-edit -subjects: -- kind: ServiceAccount - name: tfx-service-account - namespace: default -- kind: ServiceAccount - name: tfx-service-account - namespace: kubeflow -roleRef: - kind: ClusterRole - name: edit - apiGroup: rbac.authorization.k8s.io diff --git a/tfx/orchestration/experimental/kubernetes/yaml/service-account.yaml b/tfx/orchestration/experimental/kubernetes/yaml/service-account.yaml deleted file mode 100644 index 53e3380a5f4..00000000000 --- a/tfx/orchestration/experimental/kubernetes/yaml/service-account.yaml +++ /dev/null @@ -1,15 +0,0 @@ -# For Kubeflow compatibility, we add the service account to both -# the default and kubeflow namespace. This may be removed in a -# future version. -apiVersion: v1 -kind: ServiceAccount -metadata: - name: tfx-service-account - namespace: default -# Uncomment below if you want to add kubeflow service account. -# --- -# apiVersion: v1 -# kind: ServiceAccount -# metadata: -# name: tfx-service-account -# namespace: kubeflow