From c5c525f3542212dfcd0319234e9be699468cad00 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Thu, 23 Jul 2020 23:28:31 -0700 Subject: [PATCH 01/17] initial untested impl --- .../google_cloud_kubernetes/__init__.py | 13 ++ .../google_cloud_kubernetes/runner.py | 200 ++++++++++++++++++ .../trainer/__init__.py | 13 ++ .../trainer/executor.py | 83 ++++++++ 4 files changed, 309 insertions(+) create mode 100644 tfx/extensions/google_cloud_kubernetes/__init__.py create mode 100644 tfx/extensions/google_cloud_kubernetes/runner.py create mode 100644 tfx/extensions/google_cloud_kubernetes/trainer/__init__.py create mode 100644 tfx/extensions/google_cloud_kubernetes/trainer/executor.py diff --git a/tfx/extensions/google_cloud_kubernetes/__init__.py b/tfx/extensions/google_cloud_kubernetes/__init__.py new file mode 100644 index 0000000000..b179ecb83a --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py new file mode 100644 index 0000000000..8a56c73ed1 --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -0,0 +1,200 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Helper class to start TFX multi-worker training jobs on GKE.""" + +import datetime +import json +import sys +import time +from typing import Any, Dict, List, Optional, Text + +from absl import logging +import tensorflow as tf + +from tfx import types +from tfx import version +from tfx.components.trainer import constants +from tfx.types import artifact_utils +from tfx.utils import telemetry_utils +from tfx.utils import kube_utils +from tfx.orchestration.launcher import kubernetes_component_launcher + +import kubernetes.client as client +from kubernetes.client.rest import ApiException + +#TODO: change +_TFX_IMAGE = "gcr.io/tfx-eric/gpu-tfx" + +_COMMAND = "python /tfx-src/tfx/scripts/run_executor.py" + +def _get_pod_name(name: Text="keras", job: Text="worker", index:int=0): + return name + '-' + job + '-' + str(index) + +def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, + Any], name: Text="keras", job: Text="worker"): + num_workers = training_inputs.get("num_workers", 1) + num_gpus_per_worker = training_inputs.get("num_gpus_per_worker", 0) + api_instance = kube_utils.make_core_v1_api() + worker_hosts = ["{}:5000".format(_get_pod_name(name, job, i)) for i in range(num_workers)] + for i in range(num_workers): + pod = client.V1Pod( + metadata=client.V1ObjectMeta( + name=_get_pod_name(name, job, i), + labels={ + 'name': name, + 'job': job, + 'task': str(i), + }, + ), + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name='worker-pod', + image=_TFX_IMAGE, + # replace with file download + command=_COMMAND, + # add other args + args=job_args, + ports=[ + client.V1ContainerPort( + container_port=5000, + ), + ], + resources=client.V1ResourceRequirements( + limits={ + 'nvidia.com/gpu': num_gpus_per_worker, + } if num_gpus_per_worker > 0 else {}, + ), + ), + ], + restart_policy=kube_utils.RestartPolicy.NEVER.value, + ), + ) + try: + api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) + except ApiException as e: + print("Exception when calling CoreV1Api->create_namespaced_pod: %s\n" % e) + print("created {} worker pods".format(num_workers)) + + +def create_worker_services(training_inputs: Dict[Text, + Any], name="keras", job="worker"): + num_workers = training_inputs.get("num_workers", 1) + api_instance = kube_utils.make_core_v1_api() + for i in range(num_workers): + service = client.V1Service( + metadata=client.V1ObjectMeta( + name=_get_pod_name(name, job, i), + ), + spec=client.V1ServiceSpec( + selector={ + 'name': name, + 'job': job, + 'task': str(i), + }, + ports=[ + client.V1ServicePort( + port=5000, + ), + ], + ), + ) + try: + api_response = api_instance.create_namespaced_service(namespace='default', body=service) + except ApiException as e: + # TODO(ericlege): use absl + print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e) + print("created {} worker services".format(num_workers)) + + +def delete_worker_services(training_inputs: Dict[Text, + Any], name="keras", job="worker"): + num_workers = training_inputs.get("num_workers", 1) + api_instance = kube_utils.make_core_v1_api() + for i in range(num_workers): + service_name = name + '-' + job + '-' + str(i), + api_response = api_instance.delete_namespaced_service(namespace='default', name=service_name) + except ApiException as e: + # TODO(ericlege): use absl + print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e) + print("Deleted {} worker services".format(num_workers)) + +def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], + output_dict: Dict[Text, List[types.Artifact]], + exec_properties: Dict[Text, + Any], executor_class_path: Text, + training_inputs: Dict[Text, + Any]): + """Start a trainer job on Google Kubernetes Engine (GKE). + + This is done by forwarding the inputs/outputs/exec_properties to the + tfx.scripts.run_executor module on a AI Platform training job interpreter. + + Args: + input_dict: Passthrough input dict for tfx.components.Trainer.executor. + output_dict: Passthrough input dict for tfx.components.Trainer.executor. + exec_properties: Passthrough input dict for tfx.components.Trainer.executor. + executor_class_path: class path for TFX core default trainer. + training_inputs: Training input argument for GKE. + 'pythonModule', 'pythonVersion' and 'runtimeVersion' will be inferred. For + the full set of parameters, refer to + https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput + + Returns: + None + Raises: + RuntimeError: if the Google Cloud AI Platform training job failed/cancelled. + """ + training_inputs = training_inputs.copy() + + json_inputs = artifact_utils.jsonify_artifact_dict(input_dict) + logging.info('json_inputs=\'%s\'.', json_inputs) + json_outputs = artifact_utils.jsonify_artifact_dict(output_dict) + logging.info('json_outputs=\'%s\'.', json_outputs) + json_exec_properties = json.dumps(exec_properties, sort_keys=True) + logging.info('json_exec_properties=\'%s\'.', json_exec_properties) + + + # We use custom containers to launch training on GKE, which invokes + # the specified image using the container's entrypoint. The default + # entrypoint for TFX containers is to call scripts/run_executor.py. The + # arguments below are passed to this run_executor entry to run the executor + # specified in `executor_class_path`. + job_args = [ + '--executor_class_path', executor_class_path, '--inputs', json_inputs, + '--outputs', json_outputs, '--exec-properties', json_exec_properties + ] + + # launch the services + create_worker_services(training_inputs=training_inputs) + + # launch the worker pods + create_worker_pods(job_args=job_args, training_inputs=training_inputs) + + # wait for finish. TODO: not use protected members + exit_condition = kubernetes_component_launcher._pod_is_done + kubernetes_component_launcher.KubernetesComponentLauncher()._wait_pod(self, + core_api=kube_utils.make_core_v1_api(), + pod_name=_get_pod_name(), # chief + namespace="default", + exit_condition_lambda=exit_condition, + condition_description="Chief finished", + timeout_sec=1200) # wait for autoscaler + + # clean up + delete_worker_services(training_inputs=training_inputs) + + # GKE training complete + logging.info('Job \'%s\' successful.', job_name) + diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/__init__.py b/tfx/extensions/google_cloud_kubernetes/trainer/__init__.py new file mode 100644 index 0000000000..b179ecb83a --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/trainer/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py new file mode 100644 index 0000000000..7ce298188f --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -0,0 +1,83 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Helper class to start TFX multi-worker training jobs on GKE.""" + +from typing import Any, Dict, List, Text + +import absl + +from tfx import types +from tfx.components.base import base_executor +from tfx.components.trainer import executor as tfx_trainer_executor +from tfx.extensions.google_cloud_kubernetes import runner +from tfx.utils import json_utils + +# Keys to the items in custom_config passed as a part of exec_properties. +TRAINING_ARGS_KEY = 'gke_training_args' +_CUSTOM_CONFIG_KEY = 'custom_config' + + +class GenericExecutor(base_executor.BaseExecutor): + """Start a trainer job on Google Kubernetes Engine using a generic Trainer.""" + + def _GetExecutorClass(self): + return tfx_trainer_executor.GenericExecutor + + def Do(self, input_dict: Dict[Text, List[types.Artifact]], + output_dict: Dict[Text, List[types.Artifact]], + exec_properties: Dict[Text, Any]): + """Starts a trainer job on Google Kubernetes Engine. + + Args: + input_dict: Passthrough input dict for tfx.components.Trainer.executor. + output_dict: Passthrough input dict for tfx.components.Trainer.executor. + exec_properties: Mostly a passthrough input dict for + tfx.components.Trainer.executor. custom_config.gke_training_args + is consumed by this class. + + Returns: + None + Raises: + ValueError: if gke_training_args is not in + exec_properties.custom_config. + RuntimeError: if the Google Kubernetes Engine training job failed. + """ + self._log_startup(input_dict, output_dict, exec_properties) + + custom_config = json_utils.loads( + exec_properties.get(_CUSTOM_CONFIG_KEY, 'null')) + if custom_config is not None and not isinstance(custom_config, Dict): + raise ValueError('custom_config in execution properties needs to be a ' + 'dict.') + + training_inputs = custom_config.get(TRAINING_ARGS_KEY) + if training_inputs is None: + err_msg = '\'%s\' not found in custom_config.' % TRAINING_ARGS_KEY + absl.logging.error(err_msg) + raise ValueError(err_msg) + + executor_class = self._GetExecutorClass() + executor_class_path = '%s.%s' % (executor_class.__module__, + executor_class.__name__) + # Note: exec_properties['custom_config'] here is a dict. + return runner.start_gke_training(input_dict, output_dict, exec_properties, + executor_class_path, training_inputs) + + +class Executor(GenericExecutor): + """Start a trainer job on Google Kubernetes Engine using a default Trainer.""" + + def _GetExecutorClass(self): + # TODO(ericlege): use Executor + return tfx_trainer_executor.GenericExecutor From 79d9302200d2ae8ef7a295aec5a7c09d62493feb Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Fri, 24 Jul 2020 16:23:19 -0700 Subject: [PATCH 02/17] wip --- .../google_cloud_kubernetes/runner.py | 3 +- .../google_cloud_kubernetes/runner_test.py | 76 +++++++++++++ .../trainer/distributed_training_utils.py | 102 ++++++++++++++++++ .../trainer/executor.py | 3 +- .../trainer/executor_test.py | 94 ++++++++++++++++ 5 files changed, 275 insertions(+), 3 deletions(-) create mode 100644 tfx/extensions/google_cloud_kubernetes/runner_test.py create mode 100644 tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py create mode 100644 tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 8a56c73ed1..0e52f42d85 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -124,7 +124,8 @@ def delete_worker_services(training_inputs: Dict[Text, api_instance = kube_utils.make_core_v1_api() for i in range(num_workers): service_name = name + '-' + job + '-' + str(i), - api_response = api_instance.delete_namespaced_service(namespace='default', name=service_name) + try: + api_response = api_instance.delete_namespaced_service(namespace='default', name=service_name) except ApiException as e: # TODO(ericlege): use absl print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e) diff --git a/tfx/extensions/google_cloud_kubernetes/runner_test.py b/tfx/extensions/google_cloud_kubernetes/runner_test.py new file mode 100644 index 0000000000..c0d62f3449 --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/runner_test.py @@ -0,0 +1,76 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for tfx.extensions.google_cloud_kubernetes.runner.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import copy +import os +import sys +from typing import Any, Dict, Text + +import mock +import tensorflow as tf + +from tfx import version +from tfx.extensions.google_cloud_ai_platform import runner +from tfx.extensions.google_cloud_ai_platform.trainer import executor +from tfx.utils import json_utils +from tfx.utils import telemetry_utils + + +class RunnerTest(tf.test.TestCase): + + def setUp(self): + super(RunnerTest, self).setUp() + self._output_data_dir = os.path.join( + os.environ.get('TEST_UNDECLARED_OUTPUTS_DIR', self.get_temp_dir()), + self._testMethodName) + self._mock_k8s_client = mock.Mock() + self._inputs = {} + self._outputs = {} + self._training_inputs = { + '': self._project_id, + } + self._job_id = 'my_jobid' + # Dict format of exec_properties. custom_config needs to be serialized + # before being passed into start_aip_training function. + self._exec_properties = { + 'custom_config': { + executor.TRAINING_ARGS_KEY: self._training_inputs, + }, + } + self._model_name = 'model_name' + self._ai_platform_serving_args = { + 'model_name': self._model_name, + 'project_id': self._project_id, + } + self._executor_class_path = 'my.executor.Executor' + + def _setUpTrainingMocks(self): + # self._mock_create = mock.Mock() + # self._mock_api_client.projects().jobs().create = self._mock_create + # self._mock_get = mock.Mock() + # self._mock_api_client.projects().jobs().get.return_value = self._mock_get + # self._mock_get.execute.return_value = { + # 'state': 'SUCCEEDED', + # } + + def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: + """Converts self._exec_properties['custom_config'] to string.""" + result = copy.deepcopy(self._exec_properties) + result['custom_config'] = json_utils.dumps(result['custom_config']) + return result \ No newline at end of file diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py b/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py new file mode 100644 index 0000000000..31a8992531 --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py @@ -0,0 +1,102 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utilies for distributed training with GKE.""" + + +from tfx.utils import kube_utils +from typing import Text +import kubernetes.client as client +from kubernetes.client.rest import ApiException +import json + +_TFX_IMAGE = "gcr.io/tfx-eric/gpu-keras" + +def create_worker_pods(name: Text="keras", job: Text="worker", num_workers: int=1, num_gpus_per_worker: int=1): + api_instance = kube_utils.make_core_v1_api() + worker_hosts = ["{}-{}-{}:5000".format(name, job, i) for i in range(num_workers)] + for i in range(num_workers): + pod = client.V1Pod( + metadata=client.V1ObjectMeta( + name=name + '-' + job + '-' + str(i), + labels={ + 'name': name, + 'job': job, + 'task': str(i), + }, + ), + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name='worker-pod', + image=_TFX_IMAGE, + # replace with file download + command=["python", "train.py"], + # add other args + args=[ + "--worker_index=" + str(i), + "--worker_hosts=" + json.dumps(worker_hosts) + ], + ports=[ + client.V1ContainerPort( + container_port=5000, + ), + ], + resources=client.V1ResourceRequirements( + limits={ + 'nvidia.com/gpu': num_gpus_per_worker, + } if num_gpus_per_worker > 0 else {}, + ), + ), + ], + restart_policy=kube_utils.RestartPolicy.NEVER.value, + ), + ) + try: + api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) + except ApiException as e: + print("Exception when calling CoreV1Api->create_namespaced_pod: %s\n" % e) + print("created {} worker pods".format(num_workers)) + +def create_worker_services(name="keras", job="worker", num_workers=1): + api_instance = kube_utils.make_core_v1_api() + for i in range(num_workers): + service = client.V1Service( + metadata=client.V1ObjectMeta( + name=name + '-' + job + '-' + str(i), + ), + spec=client.V1ServiceSpec( + selector={ + 'name': name, + 'job': job, + 'task': str(i), + }, + ports=[ + client.V1ServicePort( + port=5000, + ), + ], + ), + ) + try: + api_response = api_instance.create_namespaced_service(namespace='default', body=service) + except ApiException as e: + # TODO(ericlege): use absl + print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e) + print("created {} worker services".format(num_workers)) + + +if __name__ == '__main__': + create_worker_pods(num_workers=8) + create_worker_services(num_workers=8) + # TODO(ericlege): clean up services \ No newline at end of file diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py index 7ce298188f..22e17362c2 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -79,5 +79,4 @@ class Executor(GenericExecutor): """Start a trainer job on Google Kubernetes Engine using a default Trainer.""" def _GetExecutorClass(self): - # TODO(ericlege): use Executor - return tfx_trainer_executor.GenericExecutor + return tfx_trainer_executor.Executor diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py new file mode 100644 index 0000000000..765a57fdbf --- /dev/null +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py @@ -0,0 +1,94 @@ +# Copyright 2020 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for tfx.extensions.google_cloud_kubernetes.trainer.executor.""" + + +import copy +import os +from typing import Any, Dict, Text + +import mock +import tensorflow as tf + +from tfx.components.trainer import executor as tfx_trainer_executor +from tfx.extensions.google_cloud_kubernetes.trainer import executor as gke_trainer_executor +from tfx.utils import json_utils + + +class ExecutorTest(tf.test.TestCase): + + def setUp(self): + super(ExecutorTest, self).setUp() + + self._output_data_dir = os.path.join( + os.environ.get('TEST_UNDECLARED_OUTPUTS_DIR', self.get_temp_dir()), + self._testMethodName) + self._job_dir = os.path.join(self._output_data_dir, 'jobDir') + self._num_workers = 2 + self._num_gpus_per_worker = 1 + self._inputs = {} + self._outputs = {} + # Dict format of exec_properties. custom_config needs to be serialized + # before being passed into Do function. + self._exec_properties = { + 'custom_config': { + gke_trainer_executor.TRAINING_ARGS_KEY: { + 'num_workers': self._num_workers, + 'num_gpus_per_worker': self._num_gpus_per_worker, + }, + }, + } + self._executor_class_path = '%s.%s' % ( + tfx_trainer_executor.Executor.__module__, + tfx_trainer_executor.Executor.__name__) + self._generic_executor_class_path = '%s.%s' % ( + tfx_trainer_executor.GenericExecutor.__module__, + tfx_trainer_executor.GenericExecutor.__name__) + + self.addCleanup(mock.patch.stopall) + self.mock_runner = mock.patch( + 'tfx.extensions.google_cloud_kubernetes.trainer.executor.runner' + ).start() + + def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: + """Converts self._exec_properties['custom_config'] to string.""" + result = copy.deepcopy(self._exec_properties) + result['custom_config'] = json_utils.dumps(result['custom_config']) + return result + + def testDo(self): + executor = gke_trainer_executor.Executor() + executor.Do(self._inputs, self._outputs, + self._serialize_custom_config_under_test()) + self.mock_runner.start_gke_training.assert_called_with( + self._inputs, self._outputs, self._serialize_custom_config_under_test(), + self._executor_class_path, { + 'num_workers': self._num_workers, + 'num_gpus_per_worker': self._num_gpus_per_worker, + }) + + def testDoWithGenericExecutorClass(self): + executor = gke_trainer_executor.GenericExecutor() + executor.Do(self._inputs, self._outputs, + self._serialize_custom_config_under_test()) + self.mock_runner.start_gke_training.assert_called_with( + self._inputs, self._outputs, self._serialize_custom_config_under_test(), + self._generic_executor_class_path, { + 'num_workers': self._num_workers, + 'num_gpus_per_worker': self._num_gpus_per_worker, + }) + + +if __name__ == '__main__': + tf.test.main() From 886574d469282a0cf4d4b871c75ce5c292c569d6 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Mon, 27 Jul 2020 12:10:56 -0700 Subject: [PATCH 03/17] trainer raw --- .../google_cloud_kubernetes/runner.py | 62 ++++++++++------ .../launcher/kubernetes_component_launcher.py | 33 ++++----- tfx/utils/kube_utils.py | 72 ++++++++++++++++++- 3 files changed, 126 insertions(+), 41 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 0e52f42d85..d78f50872f 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -28,26 +28,39 @@ from tfx.types import artifact_utils from tfx.utils import telemetry_utils from tfx.utils import kube_utils -from tfx.orchestration.launcher import kubernetes_component_launcher - +from tfx.extensions.google_cloud_kubernetes.trainer import executor import kubernetes.client as client from kubernetes.client.rest import ApiException #TODO: change _TFX_IMAGE = "gcr.io/tfx-eric/gpu-tfx" -_COMMAND = "python /tfx-src/tfx/scripts/run_executor.py" +_COMMAND = ["python", "-m", "tfx.scripts.run_executor"] def _get_pod_name(name: Text="keras", job: Text="worker", index:int=0): return name + '-' + job + '-' + str(index) +def _pod_is_done(resp: client.V1Pod): + return kube_utils.PodPhase(resp.status.phase).is_done + def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, - Any], name: Text="keras", job: Text="worker"): + Any], exec_properties: Dict[Text,Any], + name: Text="keras", job: Text="worker"): num_workers = training_inputs.get("num_workers", 1) num_gpus_per_worker = training_inputs.get("num_gpus_per_worker", 0) api_instance = kube_utils.make_core_v1_api() worker_hosts = ["{}:5000".format(_get_pod_name(name, job, i)) for i in range(num_workers)] + # since worker_index is passed through custom_config in exec_properties, + # save an original copy to be restored upon function completion. + original_custom_config = exec_properties[constants.CUSTOM_CONFIG_KEY] + custom_config = json.loads(exec_properties[constants.CUSTOM_CONFIG_KEY]) + training_args = custom_config[executor.TRAINING_ARGS_KEY] for i in range(num_workers): + # replace worker_index in training args + training_args['worker_index'] = i + exec_properties[constants.CUSTOM_CONFIG_KEY] = json.dumps(custom_config, sort_keys=True) + json_exec_properties = json.dumps(exec_properties, sort_keys=True) + exec_properties_args = ['--exec-properties', json_exec_properties] pod = client.V1Pod( metadata=client.V1ObjectMeta( name=_get_pod_name(name, job, i), @@ -65,7 +78,7 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, # replace with file download command=_COMMAND, # add other args - args=job_args, + args=job_args + exec_properties_args, ports=[ client.V1ContainerPort( container_port=5000, @@ -85,6 +98,7 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) except ApiException as e: print("Exception when calling CoreV1Api->create_namespaced_pod: %s\n" % e) + exec_properties[constants.CUSTOM_CONFIG_KEY] = original_custom_config print("created {} worker pods".format(num_workers)) @@ -131,6 +145,7 @@ def delete_worker_services(training_inputs: Dict[Text, print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e) print("Deleted {} worker services".format(num_workers)) + def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], output_dict: Dict[Text, List[types.Artifact]], exec_properties: Dict[Text, @@ -171,31 +186,38 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], # the specified image using the container's entrypoint. The default # entrypoint for TFX containers is to call scripts/run_executor.py. The # arguments below are passed to this run_executor entry to run the executor - # specified in `executor_class_path`. + # specified in `executor_class_path`. Note that exec_properties are supplied + # dynamically in create_worker_service since each pod needs a different + # worker_index passed through it. job_args = [ '--executor_class_path', executor_class_path, '--inputs', json_inputs, - '--outputs', json_outputs, '--exec-properties', json_exec_properties + '--outputs', json_outputs, ] # launch the services create_worker_services(training_inputs=training_inputs) # launch the worker pods - create_worker_pods(job_args=job_args, training_inputs=training_inputs) - - # wait for finish. TODO: not use protected members - exit_condition = kubernetes_component_launcher._pod_is_done - kubernetes_component_launcher.KubernetesComponentLauncher()._wait_pod(self, - core_api=kube_utils.make_core_v1_api(), - pod_name=_get_pod_name(), # chief - namespace="default", - exit_condition_lambda=exit_condition, - condition_description="Chief finished", - timeout_sec=1200) # wait for autoscaler - + create_worker_pods(job_args=job_args, + training_inputs=training_inputs, + exec_properties=exec_properties) + + # wait for finish. + resp = kube_utils.wait_pod(core_api=kube_utils.make_core_v1_api(), + pod_name=_get_pod_name(), # chief + namespace="default", + exit_condition_lambda=_pod_is_done, + condition_description="Chief finished", + timeout_sec=1200, # wait for autoscaler + expotential_backoff=True,) + if resp.status.phase == kube_utils.PodPhase.FAILED.value: + raise RuntimeError('Pod "%s:%s" failed with status "%s".' % + ("default", _get_pod_name(), resp.status)) + # clean up delete_worker_services(training_inputs=training_inputs) # GKE training complete - logging.info('Job \'%s\' successful.', job_name) + #logging.info('Job \'%s\' successful.', job_name) + logging.info('Job successful') diff --git a/tfx/orchestration/launcher/kubernetes_component_launcher.py b/tfx/orchestration/launcher/kubernetes_component_launcher.py index ab42441cdf..df3b75fcc8 100644 --- a/tfx/orchestration/launcher/kubernetes_component_launcher.py +++ b/tfx/orchestration/launcher/kubernetes_component_launcher.py @@ -240,13 +240,11 @@ def _get_pod(self, core_api: client.CoreV1Api, pod_name: Text, Raises: RuntimeError: When it sees unexpected errors from Kubernetes API. """ - try: - return core_api.read_namespaced_pod(name=pod_name, namespace=namespace) - except client.rest.ApiException as e: - if e.status != 404: - raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' % - (e.reason, e.body)) - return None + return kube_utils.get_pod( + core_api, + pod_name, + namespace, + ) def _wait_pod(self, core_api: client.CoreV1Api, @@ -273,19 +271,14 @@ def _wait_pod(self, Raises: RuntimeError: when the function times out. """ - start_time = datetime.datetime.utcnow() - while True: - resp = self._get_pod(core_api, pod_name, namespace) - logging.info(resp.status.phase) - if exit_condition_lambda(resp): - return resp - elapse_time = datetime.datetime.utcnow() - start_time - if elapse_time.seconds >= timeout_sec: - raise RuntimeError( - 'Pod "%s:%s" does not reach "%s" within %s seconds.' % - (namespace, pod_name, condition_description, timeout_sec)) - # TODO(hongyes): add exponential backoff here. - time.sleep(1) + return kube_utils.wait_pod( + core_api, + pod_name, + namespace, + exit_condition_lambda, + condition_description, + timeout_sec, + ) def _build_pod_name(self, execution_id: int) -> Text: if self._pipeline_info.run_id: diff --git a/tfx/utils/kube_utils.py b/tfx/utils/kube_utils.py index 694d5cfca8..4519caebbe 100644 --- a/tfx/utils/kube_utils.py +++ b/tfx/utils/kube_utils.py @@ -18,9 +18,12 @@ from __future__ import division from __future__ import print_function +import datetime import enum import os -from typing import Text +import time +from absl import logging +from typing import Callable, Optional, Text from kubernetes import client as k8s_client from kubernetes import config as k8s_config @@ -187,3 +190,70 @@ def get_current_kfp_pod(client: k8s_client.CoreV1Api) -> k8s_client.V1Pod: return client.read_namespaced_pod(name=pod_name, namespace=namespace) except KeyError: raise RuntimeError('Cannot determine KFP pod from the environment.') + + +def get_pod(core_api: k8s_client.CoreV1Api, pod_name: Text, + namespace: Text) -> Optional[k8s_client.V1Pod]: + """Get a pod from Kubernetes metadata API. + + Args: + core_api: Client of Core V1 API of Kubernetes API. + pod_name: The name of the POD. + namespace: The namespace of the POD. + + Returns: + The found POD object. None if it's not found. + + Raises: + RuntimeError: When it sees unexpected errors from Kubernetes API. + """ + try: + return core_api.read_namespaced_pod(name=pod_name, namespace=namespace) + except k8s_client.rest.ApiException as e: + if e.status != 404: + raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' % + (e.reason, e.body)) + return None + +def wait_pod(core_api: k8s_client.CoreV1Api, + pod_name: Text, + namespace: Text, + exit_condition_lambda: Callable[[k8s_client.V1Pod], bool], + condition_description: Text, + timeout_sec: int = 300, + expotential_backoff:bool = False) -> k8s_client.V1Pod: + """Wait for a POD to meet an exit condition. + + Args: + core_api: Client of Core V1 API of Kubernetes API. + pod_name: The name of the POD. + namespace: The namespace of the POD. + exit_condition_lambda: A lambda which will be called intervally to wait + for a POD to exit. The function returns True to exit. + condition_description: The description of the exit condition which will be + set in the error message if the wait times out. + timeout_sec: The seconds for the function to wait. Defaults to 300s. + + Returns: + The POD object which meets the exit condition. + + Raises: + RuntimeError: when the function times out. + """ + start_time = datetime.datetime.utcnow() + # Exponential backoff: https://cloud.google.com/storage/docs/exponential-backoff + backoff_interval = 1 + maximum_backoff = 32 + while True: + resp = get_pod(core_api, pod_name, namespace) + logging.info(resp.status.phase) + if exit_condition_lambda(resp): + return resp + elapse_time = datetime.datetime.utcnow() - start_time + if elapse_time.seconds >= timeout_sec: + raise RuntimeError( + 'Pod "%s:%s" does not reach "%s" within %s seconds.' % + (namespace, pod_name, condition_description, timeout_sec)) + time.sleep(backoff_interval) + if expotential_backoff and backoff_interval < maximum_backoff: + backoff_interval *= 2 From 765ec1020efc861a3fd6e252b7c07fa32b3b0b89 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Mon, 27 Jul 2020 12:22:49 -0700 Subject: [PATCH 04/17] remove outdated utils --- .../trainer/distributed_training_utils.py | 102 ------------------ 1 file changed, 102 deletions(-) delete mode 100644 tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py b/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py deleted file mode 100644 index 31a8992531..0000000000 --- a/tfx/extensions/google_cloud_kubernetes/trainer/distributed_training_utils.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Utilies for distributed training with GKE.""" - - -from tfx.utils import kube_utils -from typing import Text -import kubernetes.client as client -from kubernetes.client.rest import ApiException -import json - -_TFX_IMAGE = "gcr.io/tfx-eric/gpu-keras" - -def create_worker_pods(name: Text="keras", job: Text="worker", num_workers: int=1, num_gpus_per_worker: int=1): - api_instance = kube_utils.make_core_v1_api() - worker_hosts = ["{}-{}-{}:5000".format(name, job, i) for i in range(num_workers)] - for i in range(num_workers): - pod = client.V1Pod( - metadata=client.V1ObjectMeta( - name=name + '-' + job + '-' + str(i), - labels={ - 'name': name, - 'job': job, - 'task': str(i), - }, - ), - spec=client.V1PodSpec( - containers=[ - client.V1Container( - name='worker-pod', - image=_TFX_IMAGE, - # replace with file download - command=["python", "train.py"], - # add other args - args=[ - "--worker_index=" + str(i), - "--worker_hosts=" + json.dumps(worker_hosts) - ], - ports=[ - client.V1ContainerPort( - container_port=5000, - ), - ], - resources=client.V1ResourceRequirements( - limits={ - 'nvidia.com/gpu': num_gpus_per_worker, - } if num_gpus_per_worker > 0 else {}, - ), - ), - ], - restart_policy=kube_utils.RestartPolicy.NEVER.value, - ), - ) - try: - api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) - except ApiException as e: - print("Exception when calling CoreV1Api->create_namespaced_pod: %s\n" % e) - print("created {} worker pods".format(num_workers)) - -def create_worker_services(name="keras", job="worker", num_workers=1): - api_instance = kube_utils.make_core_v1_api() - for i in range(num_workers): - service = client.V1Service( - metadata=client.V1ObjectMeta( - name=name + '-' + job + '-' + str(i), - ), - spec=client.V1ServiceSpec( - selector={ - 'name': name, - 'job': job, - 'task': str(i), - }, - ports=[ - client.V1ServicePort( - port=5000, - ), - ], - ), - ) - try: - api_response = api_instance.create_namespaced_service(namespace='default', body=service) - except ApiException as e: - # TODO(ericlege): use absl - print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e) - print("created {} worker services".format(num_workers)) - - -if __name__ == '__main__': - create_worker_pods(num_workers=8) - create_worker_services(num_workers=8) - # TODO(ericlege): clean up services \ No newline at end of file From d7842b718f0ed0fa056ba383c0d1857c1edc3e6c Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Tue, 28 Jul 2020 22:42:32 -0700 Subject: [PATCH 05/17] pre-lint --- .../google_cloud_kubernetes/runner.py | 97 +++++++++++-------- .../google_cloud_kubernetes/runner_test.py | 76 ++++++++++----- .../trainer/executor.py | 11 ++- 3 files changed, 123 insertions(+), 61 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index d78f50872f..fbc4ac003a 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -37,24 +37,41 @@ _COMMAND = ["python", "-m", "tfx.scripts.run_executor"] -def _get_pod_name(name: Text="keras", job: Text="worker", index:int=0): - return name + '-' + job + '-' + str(index) + +def _build_pod_names(num_workers: int, unique_id: Text) -> List[Text]: + return ['training-worker-{}-{}'.format(unique_id, i) for i in range(num_workers)] + + +def _build_service_names(num_workers: int, unique_id: Text) -> List[Text]: + return ['training-service-{}-{}'.format(unique_id, i) for i in range(num_workers)] + def _pod_is_done(resp: client.V1Pod): return kube_utils.PodPhase(resp.status.phase).is_done + def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, Any], exec_properties: Dict[Text,Any], - name: Text="keras", job: Text="worker"): - num_workers = training_inputs.get("num_workers", 1) - num_gpus_per_worker = training_inputs.get("num_gpus_per_worker", 0) + unique_id: Text): + """Create worker pods for multi-worker training.""" + num_workers = training_inputs.get('num_workers', 1) + num_gpus_per_worker = training_inputs.get('num_gpus_per_worker', 0) api_instance = kube_utils.make_core_v1_api() - worker_hosts = ["{}:5000".format(_get_pod_name(name, job, i)) for i in range(num_workers)] + + service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) + pod_names = _build_pod_names(num_workers=num_workers, unique_id=unique_id) + worker_hosts = ['{}:5000'.format(service_name) for service_name in service_names] + # since worker_index is passed through custom_config in exec_properties, # save an original copy to be restored upon function completion. original_custom_config = exec_properties[constants.CUSTOM_CONFIG_KEY] custom_config = json.loads(exec_properties[constants.CUSTOM_CONFIG_KEY]) training_args = custom_config[executor.TRAINING_ARGS_KEY] + + # set worker_hosts in training args + training_args['worker_hosts'] = worker_hosts + + # TODO(ericlege): consider using a jinja2 template instead for i in range(num_workers): # replace worker_index in training args training_args['worker_index'] = i @@ -63,10 +80,10 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, exec_properties_args = ['--exec-properties', json_exec_properties] pod = client.V1Pod( metadata=client.V1ObjectMeta( - name=_get_pod_name(name, job, i), + name=pod_names[i], labels={ - 'name': name, - 'job': job, + 'name': 'training', + 'id': unique_id, 'task': str(i), }, ), @@ -75,9 +92,7 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, client.V1Container( name='worker-pod', image=_TFX_IMAGE, - # replace with file download command=_COMMAND, - # add other args args=job_args + exec_properties_args, ports=[ client.V1ContainerPort( @@ -97,24 +112,28 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, try: api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) except ApiException as e: - print("Exception when calling CoreV1Api->create_namespaced_pod: %s\n" % e) + logging.error('Exception when calling CoreV1Api->create_namespaced_pod: %s\n' % e) exec_properties[constants.CUSTOM_CONFIG_KEY] = original_custom_config - print("created {} worker pods".format(num_workers)) + logging.info('created {} worker pods'.format(num_workers)) def create_worker_services(training_inputs: Dict[Text, - Any], name="keras", job="worker"): - num_workers = training_inputs.get("num_workers", 1) + Any], unique_id: Text): + """Create worker services for multi-worker training.""" + num_workers = training_inputs.get('num_workers', 1) + service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) api_instance = kube_utils.make_core_v1_api() + + # TODO(ericlege): consider using a jinja2 template instead for i in range(num_workers): service = client.V1Service( metadata=client.V1ObjectMeta( - name=_get_pod_name(name, job, i), + name=service_names[i], ), spec=client.V1ServiceSpec( selector={ - 'name': name, - 'job': job, + 'name': 'training', + 'id': unique_id, 'task': str(i), }, ports=[ @@ -127,23 +146,22 @@ def create_worker_services(training_inputs: Dict[Text, try: api_response = api_instance.create_namespaced_service(namespace='default', body=service) except ApiException as e: - # TODO(ericlege): use absl - print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e) - print("created {} worker services".format(num_workers)) + logging.error('Exception when calling CoreV1Api->create_namespaced_service: %s\n' % e) + logging.info('created {} worker services'.format(num_workers)) def delete_worker_services(training_inputs: Dict[Text, - Any], name="keras", job="worker"): - num_workers = training_inputs.get("num_workers", 1) + Any], unique_id: Text): + """Clean up worker services deployed to the kubernetes cluster.""" + num_workers = training_inputs.get('num_workers', 1) + service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) api_instance = kube_utils.make_core_v1_api() - for i in range(num_workers): - service_name = name + '-' + job + '-' + str(i), + for service_name in service_names: try: api_response = api_instance.delete_namespaced_service(namespace='default', name=service_name) except ApiException as e: - # TODO(ericlege): use absl - print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e) - print("Deleted {} worker services".format(num_workers)) + logging.error('Exception when calling CoreV1Api->delete_namespaced_service: %s\n' % e) + logging.info('Deleted {} worker services'.format(num_workers)) def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], @@ -151,7 +169,7 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], exec_properties: Dict[Text, Any], executor_class_path: Text, training_inputs: Dict[Text, - Any]): + Any], unique_id: Text): """Start a trainer job on Google Kubernetes Engine (GKE). This is done by forwarding the inputs/outputs/exec_properties to the @@ -195,29 +213,32 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], ] # launch the services - create_worker_services(training_inputs=training_inputs) + create_worker_services(training_inputs=training_inputs, unique_id=unique_id) # launch the worker pods create_worker_pods(job_args=job_args, - training_inputs=training_inputs, - exec_properties=exec_properties) + training_inputs=training_inputs, + exec_properties=exec_properties, + unique_id=unique_id) # wait for finish. + num_workers = training_inputs.get('num_workers', 1) + pod_names = _build_pod_names(unique_id=unique_id, + num_workers=num_workers) resp = kube_utils.wait_pod(core_api=kube_utils.make_core_v1_api(), - pod_name=_get_pod_name(), # chief - namespace="default", + pod_name=pod_names[0], # chief + namespace='default', exit_condition_lambda=_pod_is_done, - condition_description="Chief finished", + condition_description='Chief finished', timeout_sec=1200, # wait for autoscaler expotential_backoff=True,) if resp.status.phase == kube_utils.PodPhase.FAILED.value: raise RuntimeError('Pod "%s:%s" failed with status "%s".' % - ("default", _get_pod_name(), resp.status)) + ('default', _get_pod_name(), resp.status)) # clean up - delete_worker_services(training_inputs=training_inputs) + delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) # GKE training complete #logging.info('Job \'%s\' successful.', job_name) logging.info('Job successful') - diff --git a/tfx/extensions/google_cloud_kubernetes/runner_test.py b/tfx/extensions/google_cloud_kubernetes/runner_test.py index c0d62f3449..5d39bf1608 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner_test.py +++ b/tfx/extensions/google_cloud_kubernetes/runner_test.py @@ -13,25 +13,24 @@ # limitations under the License. """Tests for tfx.extensions.google_cloud_kubernetes.runner.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - import copy import os import sys -from typing import Any, Dict, Text +from typing import Any, Dict, Text, List import mock import tensorflow as tf from tfx import version -from tfx.extensions.google_cloud_ai_platform import runner -from tfx.extensions.google_cloud_ai_platform.trainer import executor +from tfx.extensions.google_cloud_kubernetes import runner +from tfx.extensions.google_cloud_kubernetes.trainer import executor from tfx.utils import json_utils from tfx.utils import telemetry_utils +def mock_build_service_names(num_workers: int, unique_id: Text) -> List[Text]: + return ['TEST-SERVICE-{}-{}'.format(unique_id, i) for i in range(num_workers)] + class RunnerTest(tf.test.TestCase): def setUp(self): @@ -39,13 +38,18 @@ def setUp(self): self._output_data_dir = os.path.join( os.environ.get('TEST_UNDECLARED_OUTPUTS_DIR', self.get_temp_dir()), self._testMethodName) - self._mock_k8s_client = mock.Mock() + self._mock_api_client = mock.Mock() + self._mock_pod = mock.Mock() + self._mock_service = mock.Mock() self._inputs = {} self._outputs = {} + self._unique_id = "UNIQUE_ID" + self._num_workers = 5 + self._num_gpus_per_worker = 2 self._training_inputs = { - '': self._project_id, + 'num_workers': self._num_workers, + 'num_gpus_per_worker': self._num_gpus_per_worker } - self._job_id = 'my_jobid' # Dict format of exec_properties. custom_config needs to be serialized # before being passed into start_aip_training function. self._exec_properties = { @@ -54,23 +58,51 @@ def setUp(self): }, } self._model_name = 'model_name' - self._ai_platform_serving_args = { - 'model_name': self._model_name, - 'project_id': self._project_id, - } self._executor_class_path = 'my.executor.Executor' def _setUpTrainingMocks(self): - # self._mock_create = mock.Mock() - # self._mock_api_client.projects().jobs().create = self._mock_create - # self._mock_get = mock.Mock() - # self._mock_api_client.projects().jobs().get.return_value = self._mock_get - # self._mock_get.execute.return_value = { - # 'state': 'SUCCEEDED', - # } + self._mock_create_pod = mock.Mock() + self._mock_api_client.create_namespaced_pod = self._mock_create_pod + self._mock_create_service = mock.Mock() + self._mock_api_client.create_namespaced_service = self._mock_create_service + self._mock_delete_service = mock.Mock() + self._mock_api_client.create_delete_service = self._mock_delete_service def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: """Converts self._exec_properties['custom_config'] to string.""" result = copy.deepcopy(self._exec_properties) result['custom_config'] = json_utils.dumps(result['custom_config']) - return result \ No newline at end of file + return result + + @mock.patch.object(runner, '_build_service_names',mock_build_service_names) + @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.client') + @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.kube_utils') + def testStartKubernetesTraining(self, mock_kube_utils, mock_client): + mock_client.V1Pod.return_value = self._mock_pod + mock_client.V1Service.return_value = self._mock_service + mock_kube_utils.make_core_v1_api.return_value = self._mock_api_client + mock_kube_utils.wait_pod.return_value = mock.Mock() + self._setUpTrainingMocks() + + runner.start_gke_training(self._inputs, self._outputs, + self._serialize_custom_config_under_test(), + self._executor_class_path, + self._training_inputs, self._unique_id) + + self._mock_api_client.create_namespaced_service.assert_called_with( + namespace='default', + body=self._mock_service,) + + self._mock_api_client.create_namespaced_pod.assert_called_with( + namespace='default', + body=self._mock_pod,) + + expected_service_names = mock_build_service_names(self._num_workers, + self._unique_id) + expected_calls = [mock.call(namespace='default', name=expected_service_name) + for expected_service_name in expected_service_names] + self.assertEqual(expected_calls, self._mock_api_client.delete_namespaced_service.mock_calls) + + +if __name__ == '__main__': + tf.test.main() diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py index 22e17362c2..63bc2e5152 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -21,6 +21,7 @@ from tfx.components.base import base_executor from tfx.components.trainer import executor as tfx_trainer_executor from tfx.extensions.google_cloud_kubernetes import runner +from tfx.orchestration import test_utils from tfx.utils import json_utils # Keys to the items in custom_config passed as a part of exec_properties. @@ -70,9 +71,17 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], executor_class = self._GetExecutorClass() executor_class_path = '%s.%s' % (executor_class.__module__, executor_class.__name__) + + unique_id = str(self._unique_id) + if self._unique_id is None: + absl.logging.warning( + "Missing unique_id in executor, using a random id instead.") + unique_id = test_utils.random_id() + # Note: exec_properties['custom_config'] here is a dict. return runner.start_gke_training(input_dict, output_dict, exec_properties, - executor_class_path, training_inputs) + executor_class_path, training_inputs, + unique_id) class Executor(GenericExecutor): From e1002114c78c1e586c80a8fe4ebceca29d37a1e4 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Wed, 29 Jul 2020 11:24:45 -0700 Subject: [PATCH 06/17] draft finished --- tfx/components/base/base_executor.py | 4 + .../google_cloud_kubernetes/runner.py | 167 +++++++++--------- .../google_cloud_kubernetes/runner_test.py | 27 ++- .../trainer/executor.py | 14 +- .../trainer/executor_test.py | 21 ++- .../launcher/kubernetes_component_launcher.py | 20 +-- tfx/utils/kube_utils.py | 6 +- 7 files changed, 134 insertions(+), 125 deletions(-) diff --git a/tfx/components/base/base_executor.py b/tfx/components/base/base_executor.py index b7154c54e5..dbe66ae545 100644 --- a/tfx/components/base/base_executor.py +++ b/tfx/components/base/base_executor.py @@ -59,6 +59,10 @@ def get_tmp_path(self) -> Text: raise RuntimeError('Temp path not available') return os.path.join(self._tmp_dir, str(self._unique_id), '') + @property + def unique_id(self): + return self._unique_id + @abc.abstractmethod def Do(self, input_dict: Dict[Text, List[types.Artifact]], output_dict: Dict[Text, List[types.Artifact]], diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index fbc4ac003a..13e2731aac 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -13,24 +13,18 @@ # limitations under the License. """Helper class to start TFX multi-worker training jobs on GKE.""" -import datetime import json -import sys -import time -from typing import Any, Dict, List, Optional, Text +from typing import Any, Dict, List, Text from absl import logging -import tensorflow as tf from tfx import types -from tfx import version from tfx.components.trainer import constants from tfx.types import artifact_utils -from tfx.utils import telemetry_utils from tfx.utils import kube_utils from tfx.extensions.google_cloud_kubernetes.trainer import executor -import kubernetes.client as client from kubernetes.client.rest import ApiException +import kubernetes.client as client #TODO: change _TFX_IMAGE = "gcr.io/tfx-eric/gpu-tfx" @@ -39,28 +33,32 @@ def _build_pod_names(num_workers: int, unique_id: Text) -> List[Text]: - return ['training-worker-{}-{}'.format(unique_id, i) for i in range(num_workers)] + return ['training-worker-{}-{}'.format(unique_id, + i) for i in range(num_workers)] def _build_service_names(num_workers: int, unique_id: Text) -> List[Text]: - return ['training-service-{}-{}'.format(unique_id, i) for i in range(num_workers)] + return ['training-service-{}-{}'.format(unique_id, + i) for i in range(num_workers)] def _pod_is_done(resp: client.V1Pod): return kube_utils.PodPhase(resp.status.phase).is_done -def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, - Any], exec_properties: Dict[Text,Any], +def create_worker_pods(job_args: List[Text], + training_inputs: Dict[Text, Any], + exec_properties: Dict[Text, Any], unique_id: Text): """Create worker pods for multi-worker training.""" num_workers = training_inputs.get('num_workers', 1) num_gpus_per_worker = training_inputs.get('num_gpus_per_worker', 0) api_instance = kube_utils.make_core_v1_api() - service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) + service_names = _build_service_names(num_workers=num_workers, + unique_id=unique_id) pod_names = _build_pod_names(num_workers=num_workers, unique_id=unique_id) - worker_hosts = ['{}:5000'.format(service_name) for service_name in service_names] + worker_hosts = ['{}:5000'.format(svc_name) for svc_name in service_names] # since worker_index is passed through custom_config in exec_properties, # save an original copy to be restored upon function completion. @@ -75,92 +73,99 @@ def create_worker_pods(job_args:List[Text], training_inputs: Dict[Text, for i in range(num_workers): # replace worker_index in training args training_args['worker_index'] = i - exec_properties[constants.CUSTOM_CONFIG_KEY] = json.dumps(custom_config, sort_keys=True) + exec_properties[constants.CUSTOM_CONFIG_KEY] = json.dumps(custom_config, + sort_keys=True) json_exec_properties = json.dumps(exec_properties, sort_keys=True) exec_properties_args = ['--exec-properties', json_exec_properties] pod = client.V1Pod( - metadata=client.V1ObjectMeta( - name=pod_names[i], - labels={ - 'name': 'training', - 'id': unique_id, - 'task': str(i), - }, - ), - spec=client.V1PodSpec( - containers=[ - client.V1Container( - name='worker-pod', - image=_TFX_IMAGE, - command=_COMMAND, - args=job_args + exec_properties_args, - ports=[ - client.V1ContainerPort( - container_port=5000, - ), + metadata=client.V1ObjectMeta( + name=pod_names[i], + labels={ + 'name': 'training', + 'id': unique_id, + 'task': str(i), + }, + ), + spec=client.V1PodSpec( + containers=[ + client.V1Container( + name='worker-pod', + image=_TFX_IMAGE, + command=_COMMAND, + args=job_args + exec_properties_args, + ports=[ + client.V1ContainerPort( + container_port=5000, + ), + ], + resources=client.V1ResourceRequirements( + limits={ + 'nvidia.com/gpu': num_gpus_per_worker, + } if num_gpus_per_worker > 0 else {}, + ), + ), ], - resources=client.V1ResourceRequirements( - limits={ - 'nvidia.com/gpu': num_gpus_per_worker, - } if num_gpus_per_worker > 0 else {}, - ), - ), - ], - restart_policy=kube_utils.RestartPolicy.NEVER.value, - ), + restart_policy=kube_utils.RestartPolicy.NEVER.value, + ), ) try: - api_response = api_instance.create_namespaced_pod(namespace='default', body=pod) + api_instance.create_namespaced_pod(namespace='default', body=pod) except ApiException as e: - logging.error('Exception when calling CoreV1Api->create_namespaced_pod: %s\n' % e) + logging.error( + 'Exception when calling CoreV1Api->create_namespaced_pod: %s' % e) exec_properties[constants.CUSTOM_CONFIG_KEY] = original_custom_config logging.info('created {} worker pods'.format(num_workers)) -def create_worker_services(training_inputs: Dict[Text, - Any], unique_id: Text): +def create_worker_services(training_inputs: Dict[Text, Any], + unique_id: Text): """Create worker services for multi-worker training.""" num_workers = training_inputs.get('num_workers', 1) - service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) + service_names = _build_service_names(num_workers=num_workers, + unique_id=unique_id) api_instance = kube_utils.make_core_v1_api() # TODO(ericlege): consider using a jinja2 template instead for i in range(num_workers): service = client.V1Service( - metadata=client.V1ObjectMeta( - name=service_names[i], - ), - spec=client.V1ServiceSpec( - selector={ - 'name': 'training', - 'id': unique_id, - 'task': str(i), - }, - ports=[ - client.V1ServicePort( - port=5000, - ), - ], - ), + metadata=client.V1ObjectMeta( + name=service_names[i], + ), + spec=client.V1ServiceSpec( + selector={ + 'name': 'training', + 'id': unique_id, + 'task': str(i), + }, + ports=[ + client.V1ServicePort( + port=5000, + ), + ], + ), ) try: - api_response = api_instance.create_namespaced_service(namespace='default', body=service) + api_instance.create_namespaced_service(namespace='default', body=service) except ApiException as e: - logging.error('Exception when calling CoreV1Api->create_namespaced_service: %s\n' % e) + logging.error( + 'Exception when calling CoreV1Api->create_namespaced_service: %s' % e) logging.info('created {} worker services'.format(num_workers)) -def delete_worker_services(training_inputs: Dict[Text, - Any], unique_id: Text): +def delete_worker_services(training_inputs: Dict[Text, Any], + unique_id: Text): """Clean up worker services deployed to the kubernetes cluster.""" num_workers = training_inputs.get('num_workers', 1) - service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) + service_names = _build_service_names(num_workers=num_workers, + unique_id=unique_id) api_instance = kube_utils.make_core_v1_api() for service_name in service_names: try: - api_response = api_instance.delete_namespaced_service(namespace='default', name=service_name) + api_instance.delete_namespaced_service(namespace='default', + name=service_name) except ApiException as e: - logging.error('Exception when calling CoreV1Api->delete_namespaced_service: %s\n' % e) + logging.error( + 'Exception when calling CoreV1Api->delete_namespaced_service: %s' % e) logging.info('Deleted {} worker services'.format(num_workers)) @@ -181,14 +186,12 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], exec_properties: Passthrough input dict for tfx.components.Trainer.executor. executor_class_path: class path for TFX core default trainer. training_inputs: Training input argument for GKE. - 'pythonModule', 'pythonVersion' and 'runtimeVersion' will be inferred. For - the full set of parameters, refer to - https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput + 'num_workers', and 'num_gpus_per_worker' will be consumed. Returns: None Raises: - RuntimeError: if the Google Cloud AI Platform training job failed/cancelled. + RuntimeError: if the Google Kubernetes Engine training job failed/cancelled. """ training_inputs = training_inputs.copy() @@ -226,15 +229,15 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], pod_names = _build_pod_names(unique_id=unique_id, num_workers=num_workers) resp = kube_utils.wait_pod(core_api=kube_utils.make_core_v1_api(), - pod_name=pod_names[0], # chief - namespace='default', - exit_condition_lambda=_pod_is_done, - condition_description='Chief finished', - timeout_sec=1200, # wait for autoscaler - expotential_backoff=True,) + pod_name=pod_names[0], # chief + namespace='default', + exit_condition_lambda=_pod_is_done, + condition_description='Chief finished', + timeout_sec=1200, # wait for autoscaler + expotential_backoff=True,) if resp.status.phase == kube_utils.PodPhase.FAILED.value: - raise RuntimeError('Pod "%s:%s" failed with status "%s".' % - ('default', _get_pod_name(), resp.status)) + raise RuntimeError('Pod "%s:%s" failed with status "%s".' % + ('default', _get_pod_name(), resp.status)) # clean up delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) diff --git a/tfx/extensions/google_cloud_kubernetes/runner_test.py b/tfx/extensions/google_cloud_kubernetes/runner_test.py index 5d39bf1608..8c4f79b03d 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner_test.py +++ b/tfx/extensions/google_cloud_kubernetes/runner_test.py @@ -15,22 +15,20 @@ import copy import os -import sys from typing import Any, Dict, Text, List import mock import tensorflow as tf -from tfx import version from tfx.extensions.google_cloud_kubernetes import runner from tfx.extensions.google_cloud_kubernetes.trainer import executor from tfx.utils import json_utils -from tfx.utils import telemetry_utils def mock_build_service_names(num_workers: int, unique_id: Text) -> List[Text]: return ['TEST-SERVICE-{}-{}'.format(unique_id, i) for i in range(num_workers)] + class RunnerTest(tf.test.TestCase): def setUp(self): @@ -60,7 +58,7 @@ def setUp(self): self._model_name = 'model_name' self._executor_class_path = 'my.executor.Executor' - def _setUpTrainingMocks(self): + def _set_up_training_mocks(self): self._mock_create_pod = mock.Mock() self._mock_api_client.create_namespaced_pod = self._mock_create_pod self._mock_create_service = mock.Mock() @@ -73,8 +71,8 @@ def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: result = copy.deepcopy(self._exec_properties) result['custom_config'] = json_utils.dumps(result['custom_config']) return result - - @mock.patch.object(runner, '_build_service_names',mock_build_service_names) + + @mock.patch.object(runner, '_build_service_names', mock_build_service_names) @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.client') @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.kube_utils') def testStartKubernetesTraining(self, mock_kube_utils, mock_client): @@ -82,7 +80,7 @@ def testStartKubernetesTraining(self, mock_kube_utils, mock_client): mock_client.V1Service.return_value = self._mock_service mock_kube_utils.make_core_v1_api.return_value = self._mock_api_client mock_kube_utils.wait_pod.return_value = mock.Mock() - self._setUpTrainingMocks() + self._set_up_training_mocks() runner.start_gke_training(self._inputs, self._outputs, self._serialize_custom_config_under_test(), @@ -90,18 +88,19 @@ def testStartKubernetesTraining(self, mock_kube_utils, mock_client): self._training_inputs, self._unique_id) self._mock_api_client.create_namespaced_service.assert_called_with( - namespace='default', - body=self._mock_service,) + namespace='default', + body=self._mock_service,) self._mock_api_client.create_namespaced_pod.assert_called_with( - namespace='default', - body=self._mock_pod,) - + namespace='default', + body=self._mock_pod,) + expected_service_names = mock_build_service_names(self._num_workers, - self._unique_id) + self._unique_id) expected_calls = [mock.call(namespace='default', name=expected_service_name) for expected_service_name in expected_service_names] - self.assertEqual(expected_calls, self._mock_api_client.delete_namespaced_service.mock_calls) + self.assertEqual(expected_calls, + self._mock_api_client.delete_namespaced_service.mock_calls) if __name__ == '__main__': diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py index 63bc2e5152..4f9b73cf7b 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -50,15 +50,14 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], Returns: None Raises: - ValueError: if gke_training_args is not in - exec_properties.custom_config. + ValueError: if gke_training_args is not in exec_properties.custom_config. RuntimeError: if the Google Kubernetes Engine training job failed. """ self._log_startup(input_dict, output_dict, exec_properties) custom_config = json_utils.loads( exec_properties.get(_CUSTOM_CONFIG_KEY, 'null')) - if custom_config is not None and not isinstance(custom_config, Dict): + if custom_config is not None and not isinstance(custom_config, dict): raise ValueError('custom_config in execution properties needs to be a ' 'dict.') @@ -71,11 +70,12 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], executor_class = self._GetExecutorClass() executor_class_path = '%s.%s' % (executor_class.__module__, executor_class.__name__) - - unique_id = str(self._unique_id) - if self._unique_id is None: + + if self._context is not None and self._context.unique_id is not None: + unique_id = str(self._context.unique_id) + else: absl.logging.warning( - "Missing unique_id in executor, using a random id instead.") + "Missing unique_id in executor, using a random id instead.") unique_id = test_utils.random_id() # Note: exec_properties['custom_config'] here is a dict. diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py index 765a57fdbf..daab6b39d8 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py @@ -39,6 +39,7 @@ def setUp(self): self._num_gpus_per_worker = 1 self._inputs = {} self._outputs = {} + self._unique_id = 'UNIQUE_ID' # Dict format of exec_properties. custom_config needs to be serialized # before being passed into Do function. self._exec_properties = { @@ -68,26 +69,30 @@ def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: return result def testDo(self): - executor = gke_trainer_executor.Executor() + executor = gke_trainer_executor.Executor( + gke_trainer_executor.Executor.Context(unique_id=self._unique_id) + ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) self.mock_runner.start_gke_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._executor_class_path, { - 'num_workers': self._num_workers, - 'num_gpus_per_worker': self._num_gpus_per_worker, - }) + 'num_gpus_per_worker': self._num_gpus_per_worker, + 'num_workers': self._num_workers, + }, self._unique_id) def testDoWithGenericExecutorClass(self): - executor = gke_trainer_executor.GenericExecutor() + executor = gke_trainer_executor.GenericExecutor( + tfx_trainer_executor.GenericExecutor.Context(unique_id=self._unique_id) + ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) self.mock_runner.start_gke_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._generic_executor_class_path, { - 'num_workers': self._num_workers, - 'num_gpus_per_worker': self._num_gpus_per_worker, - }) + 'num_gpus_per_worker': self._num_gpus_per_worker, + 'num_workers': self._num_workers, + }, self._unique_id) if __name__ == '__main__': diff --git a/tfx/orchestration/launcher/kubernetes_component_launcher.py b/tfx/orchestration/launcher/kubernetes_component_launcher.py index df3b75fcc8..acd29f9d6f 100644 --- a/tfx/orchestration/launcher/kubernetes_component_launcher.py +++ b/tfx/orchestration/launcher/kubernetes_component_launcher.py @@ -18,9 +18,7 @@ from __future__ import division from __future__ import print_function -import datetime import re -import time from typing import Any, Callable, Dict, List, Optional, Text, cast from absl import logging @@ -241,9 +239,9 @@ def _get_pod(self, core_api: client.CoreV1Api, pod_name: Text, RuntimeError: When it sees unexpected errors from Kubernetes API. """ return kube_utils.get_pod( - core_api, - pod_name, - namespace, + core_api=core_api, + pod_name=pod_name, + namespace=namespace, ) def _wait_pod(self, @@ -272,12 +270,12 @@ def _wait_pod(self, RuntimeError: when the function times out. """ return kube_utils.wait_pod( - core_api, - pod_name, - namespace, - exit_condition_lambda, - condition_description, - timeout_sec, + core_api=core_api, + pod_name=pod_name, + namespace=namespace, + exit_condition_lambda=exit_condition_lambda, + condition_description=condition_description, + timeout_sec=timeout_sec, ) def _build_pod_name(self, execution_id: int) -> Text: diff --git a/tfx/utils/kube_utils.py b/tfx/utils/kube_utils.py index 4519caebbe..646e2794bb 100644 --- a/tfx/utils/kube_utils.py +++ b/tfx/utils/kube_utils.py @@ -53,7 +53,7 @@ class PodPhase(enum.Enum): @property def is_done(self): - return self == self.SUCCEEDED or self == self.FAILED + return self in (self.SUCCEEDED, self.FAILED) class RestartPolicy(enum.Enum): @@ -212,7 +212,7 @@ def get_pod(core_api: k8s_client.CoreV1Api, pod_name: Text, except k8s_client.rest.ApiException as e: if e.status != 404: raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' % - (e.reason, e.body)) + (e.reason, e.body)) return None def wait_pod(core_api: k8s_client.CoreV1Api, @@ -221,7 +221,7 @@ def wait_pod(core_api: k8s_client.CoreV1Api, exit_condition_lambda: Callable[[k8s_client.V1Pod], bool], condition_description: Text, timeout_sec: int = 300, - expotential_backoff:bool = False) -> k8s_client.V1Pod: + expotential_backoff: bool = False) -> k8s_client.V1Pod: """Wait for a POD to meet an exit condition. Args: From 418f026821b72e31cea1c39489073cd18e498808 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Wed, 29 Jul 2020 11:29:40 -0700 Subject: [PATCH 07/17] add reference for dockerfile --- tfx/extensions/google_cloud_kubernetes/runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 13e2731aac..a60528769a 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -26,7 +26,8 @@ from kubernetes.client.rest import ApiException import kubernetes.client as client -#TODO: change +# For maintenance, see: +# https://gist.github.com/Eric-Le-Ge/a7ef6c5ae66d4af9cc886536d6724175 _TFX_IMAGE = "gcr.io/tfx-eric/gpu-tfx" _COMMAND = ["python", "-m", "tfx.scripts.run_executor"] From 2a268521ba6d02d95fbd50bdd9ae35b4836c5fdd Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Sun, 2 Aug 2020 23:16:57 -0700 Subject: [PATCH 08/17] build tf_config into container environment --- .../google_cloud_kubernetes/runner.py | 47 ++++++++----------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index a60528769a..dc0f19105e 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -49,35 +49,25 @@ def _pod_is_done(resp: client.V1Pod): def create_worker_pods(job_args: List[Text], training_inputs: Dict[Text, Any], - exec_properties: Dict[Text, Any], unique_id: Text): """Create worker pods for multi-worker training.""" num_workers = training_inputs.get('num_workers', 1) num_gpus_per_worker = training_inputs.get('num_gpus_per_worker', 0) - api_instance = kube_utils.make_core_v1_api() + api_instance = kube_utils.make_core_v1_api() service_names = _build_service_names(num_workers=num_workers, unique_id=unique_id) pod_names = _build_pod_names(num_workers=num_workers, unique_id=unique_id) worker_hosts = ['{}:5000'.format(svc_name) for svc_name in service_names] - # since worker_index is passed through custom_config in exec_properties, - # save an original copy to be restored upon function completion. - original_custom_config = exec_properties[constants.CUSTOM_CONFIG_KEY] - custom_config = json.loads(exec_properties[constants.CUSTOM_CONFIG_KEY]) - training_args = custom_config[executor.TRAINING_ARGS_KEY] - - # set worker_hosts in training args - training_args['worker_hosts'] = worker_hosts - # TODO(ericlege): consider using a jinja2 template instead for i in range(num_workers): - # replace worker_index in training args - training_args['worker_index'] = i - exec_properties[constants.CUSTOM_CONFIG_KEY] = json.dumps(custom_config, - sort_keys=True) - json_exec_properties = json.dumps(exec_properties, sort_keys=True) - exec_properties_args = ['--exec-properties', json_exec_properties] + tf_config = json.dumps({ + 'cluster': { + 'worker': worker_hosts + }, + 'task': {'type': 'worker', 'index': i} + }) pod = client.V1Pod( metadata=client.V1ObjectMeta( name=pod_names[i], @@ -93,7 +83,13 @@ def create_worker_pods(job_args: List[Text], name='worker-pod', image=_TFX_IMAGE, command=_COMMAND, - args=job_args + exec_properties_args, + args=job_args, + env=[ + client.V1EnvVar( + name='TF_CONFIG', + value=tf_config, + ), + ], ports=[ client.V1ContainerPort( container_port=5000, @@ -102,8 +98,8 @@ def create_worker_pods(job_args: List[Text], resources=client.V1ResourceRequirements( limits={ 'nvidia.com/gpu': num_gpus_per_worker, - } if num_gpus_per_worker > 0 else {}, - ), + }, + ) if num_gpus_per_worker > 0 else None, ), ], restart_policy=kube_utils.RestartPolicy.NEVER.value, @@ -114,7 +110,7 @@ def create_worker_pods(job_args: List[Text], except ApiException as e: logging.error( 'Exception when calling CoreV1Api->create_namespaced_pod: %s' % e) - exec_properties[constants.CUSTOM_CONFIG_KEY] = original_custom_config + logging.info('created {} worker pods'.format(num_workers)) @@ -208,12 +204,10 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], # the specified image using the container's entrypoint. The default # entrypoint for TFX containers is to call scripts/run_executor.py. The # arguments below are passed to this run_executor entry to run the executor - # specified in `executor_class_path`. Note that exec_properties are supplied - # dynamically in create_worker_service since each pod needs a different - # worker_index passed through it. + # specified in `executor_class_path`. job_args = [ '--executor_class_path', executor_class_path, '--inputs', json_inputs, - '--outputs', json_outputs, + '--outputs', json_outputs, '--exec-properties', json_exec_properties ] # launch the services @@ -222,7 +216,6 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], # launch the worker pods create_worker_pods(job_args=job_args, training_inputs=training_inputs, - exec_properties=exec_properties, unique_id=unique_id) # wait for finish. @@ -238,7 +231,7 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], expotential_backoff=True,) if resp.status.phase == kube_utils.PodPhase.FAILED.value: raise RuntimeError('Pod "%s:%s" failed with status "%s".' % - ('default', _get_pod_name(), resp.status)) + ('default', pod_names[0], resp.status)) # clean up delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) From 268143572333224662c7c4ce1dff46bb701fc765 Mon Sep 17 00:00:00 2001 From: Eric Ge <32600494+Eric-Le-Ge@users.noreply.github.com> Date: Fri, 7 Aug 2020 18:04:09 -0700 Subject: [PATCH 09/17] Update kube_utils.py --- tfx/utils/kube_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tfx/utils/kube_utils.py b/tfx/utils/kube_utils.py index 646e2794bb..34ba7b2502 100644 --- a/tfx/utils/kube_utils.py +++ b/tfx/utils/kube_utils.py @@ -215,6 +215,7 @@ def get_pod(core_api: k8s_client.CoreV1Api, pod_name: Text, (e.reason, e.body)) return None + def wait_pod(core_api: k8s_client.CoreV1Api, pod_name: Text, namespace: Text, From 91d5c68b8e64e4d6e993df7a57ddf2cf95a418a6 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Fri, 7 Aug 2020 19:00:45 -0700 Subject: [PATCH 10/17] lint fix --- tfx/extensions/google_cloud_kubernetes/runner.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index dc0f19105e..e1e3fde99a 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -19,10 +19,8 @@ from absl import logging from tfx import types -from tfx.components.trainer import constants from tfx.types import artifact_utils from tfx.utils import kube_utils -from tfx.extensions.google_cloud_kubernetes.trainer import executor from kubernetes.client.rest import ApiException import kubernetes.client as client @@ -63,10 +61,10 @@ def create_worker_pods(job_args: List[Text], # TODO(ericlege): consider using a jinja2 template instead for i in range(num_workers): tf_config = json.dumps({ - 'cluster': { - 'worker': worker_hosts - }, - 'task': {'type': 'worker', 'index': i} + 'cluster': { + 'worker': worker_hosts + }, + 'task': {'type': 'worker', 'index': i} }) pod = client.V1Pod( metadata=client.V1ObjectMeta( @@ -86,8 +84,8 @@ def create_worker_pods(job_args: List[Text], args=job_args, env=[ client.V1EnvVar( - name='TF_CONFIG', - value=tf_config, + name='TF_CONFIG', + value=tf_config, ), ], ports=[ From 327b81af819ef526a4f2cbdb077eff77f6ae6f9d Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Sun, 9 Aug 2020 21:20:31 -0700 Subject: [PATCH 11/17] add privilege security context --- tfx/extensions/google_cloud_kubernetes/runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index e1e3fde99a..1f6b5c2708 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -82,6 +82,9 @@ def create_worker_pods(job_args: List[Text], image=_TFX_IMAGE, command=_COMMAND, args=job_args, + security_context=client.V1SecurityContext( + privileged=True, + ), env=[ client.V1EnvVar( name='TF_CONFIG', From 67f30ab4c5fad6df58b5de82160192d2c8168d0a Mon Sep 17 00:00:00 2001 From: Eric Ge <32600494+Eric-Le-Ge@users.noreply.github.com> Date: Wed, 12 Aug 2020 15:31:10 -0700 Subject: [PATCH 12/17] Update runner.py --- tfx/extensions/google_cloud_kubernetes/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 1f6b5c2708..e2ae207994 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -229,7 +229,7 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], exit_condition_lambda=_pod_is_done, condition_description='Chief finished', timeout_sec=1200, # wait for autoscaler - expotential_backoff=True,) + exponential_backoff=True,) if resp.status.phase == kube_utils.PodPhase.FAILED.value: raise RuntimeError('Pod "%s:%s" failed with status "%s".' % ('default', pod_names[0], resp.status)) From 8e1a53c2c86991958812e8635677adcf9b1ca417 Mon Sep 17 00:00:00 2001 From: Eric Ge <32600494+Eric-Le-Ge@users.noreply.github.com> Date: Wed, 12 Aug 2020 15:31:59 -0700 Subject: [PATCH 13/17] Update kube_utils.py --- tfx/utils/kube_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tfx/utils/kube_utils.py b/tfx/utils/kube_utils.py index 34ba7b2502..bdb6dbeb50 100644 --- a/tfx/utils/kube_utils.py +++ b/tfx/utils/kube_utils.py @@ -222,7 +222,7 @@ def wait_pod(core_api: k8s_client.CoreV1Api, exit_condition_lambda: Callable[[k8s_client.V1Pod], bool], condition_description: Text, timeout_sec: int = 300, - expotential_backoff: bool = False) -> k8s_client.V1Pod: + exponential_backoff: bool = False) -> k8s_client.V1Pod: """Wait for a POD to meet an exit condition. Args: @@ -256,5 +256,5 @@ def wait_pod(core_api: k8s_client.CoreV1Api, 'Pod "%s:%s" does not reach "%s" within %s seconds.' % (namespace, pod_name, condition_description, timeout_sec)) time.sleep(backoff_interval) - if expotential_backoff and backoff_interval < maximum_backoff: + if exponential_backoff and backoff_interval < maximum_backoff: backoff_interval *= 2 From 60e58aa2737db8fdcfed4b2686e5959a0e82d765 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Sun, 23 Aug 2020 23:05:42 -0700 Subject: [PATCH 14/17] allow use for custom image --- .../google_cloud_kubernetes/runner.py | 27 ++++++++++--------- .../trainer/executor.py | 4 +-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 1f6b5c2708..9b78ba8d77 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -19,14 +19,15 @@ from absl import logging from tfx import types +from tfx import version from tfx.types import artifact_utils from tfx.utils import kube_utils from kubernetes.client.rest import ApiException import kubernetes.client as client -# For maintenance, see: -# https://gist.github.com/Eric-Le-Ge/a7ef6c5ae66d4af9cc886536d6724175 -_TFX_IMAGE = "gcr.io/tfx-eric/gpu-tfx" +# Default TFX container image to use in GKE Training. For GPU training, +# specify a custom image in executor.TRAINING_ARGS_KEY. +_TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__) _COMMAND = ["python", "-m", "tfx.scripts.run_executor"] @@ -49,6 +50,7 @@ def create_worker_pods(job_args: List[Text], training_inputs: Dict[Text, Any], unique_id: Text): """Create worker pods for multi-worker training.""" + tfx_image = training_inputs.get('tfx_image', _TFX_IMAGE) num_workers = training_inputs.get('num_workers', 1) num_gpus_per_worker = training_inputs.get('num_gpus_per_worker', 0) @@ -79,7 +81,7 @@ def create_worker_pods(job_args: List[Text], containers=[ client.V1Container( name='worker-pod', - image=_TFX_IMAGE, + image=tfx_image, command=_COMMAND, args=job_args, security_context=client.V1SecurityContext( @@ -184,7 +186,7 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], exec_properties: Passthrough input dict for tfx.components.Trainer.executor. executor_class_path: class path for TFX core default trainer. training_inputs: Training input argument for GKE. - 'num_workers', and 'num_gpus_per_worker' will be consumed. + 'num_workers', 'num_gpus_per_worker' and 'tfx_image' will be consumed. Returns: None @@ -211,15 +213,15 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], '--outputs', json_outputs, '--exec-properties', json_exec_properties ] - # launch the services + # Launch the ClusterIP services. create_worker_services(training_inputs=training_inputs, unique_id=unique_id) - # launch the worker pods + # Launch the worker pods. create_worker_pods(job_args=job_args, training_inputs=training_inputs, unique_id=unique_id) - # wait for finish. + # Wait for finish. num_workers = training_inputs.get('num_workers', 1) pod_names = _build_pod_names(unique_id=unique_id, num_workers=num_workers) @@ -230,13 +232,14 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], condition_description='Chief finished', timeout_sec=1200, # wait for autoscaler expotential_backoff=True,) + + # Clean up the ClusterIP services. + delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) + if resp.status.phase == kube_utils.PodPhase.FAILED.value: raise RuntimeError('Pod "%s:%s" failed with status "%s".' % ('default', pod_names[0], resp.status)) - # clean up - delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) # GKE training complete - #logging.info('Job \'%s\' successful.', job_name) - logging.info('Job successful') + logging.info('Job successful.') diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py index 4f9b73cf7b..dbcac2064c 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -50,8 +50,8 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], Returns: None Raises: - ValueError: if gke_training_args is not in exec_properties.custom_config. - RuntimeError: if the Google Kubernetes Engine training job failed. + ValueError: If gke_training_args is not in exec_properties.custom_config. + RuntimeError: If the Google Kubernetes Engine training job failed. """ self._log_startup(input_dict, output_dict, exec_properties) From 4c5374643ef8f9d078251037e1434c6468985efa Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Wed, 26 Aug 2020 01:38:28 -0700 Subject: [PATCH 15/17] refactoring to kubernetes --- tfx/components/base/base_executor.py | 4 ---- tfx/extensions/google_cloud_kubernetes/trainer/executor.py | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/tfx/components/base/base_executor.py b/tfx/components/base/base_executor.py index dbe66ae545..b7154c54e5 100644 --- a/tfx/components/base/base_executor.py +++ b/tfx/components/base/base_executor.py @@ -59,10 +59,6 @@ def get_tmp_path(self) -> Text: raise RuntimeError('Temp path not available') return os.path.join(self._tmp_dir, str(self._unique_id), '') - @property - def unique_id(self): - return self._unique_id - @abc.abstractmethod def Do(self, input_dict: Dict[Text, List[types.Artifact]], output_dict: Dict[Text, List[types.Artifact]], diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py index dbcac2064c..7ee3623adf 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor.py @@ -71,15 +71,15 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], executor_class_path = '%s.%s' % (executor_class.__module__, executor_class.__name__) - if self._context is not None and self._context.unique_id is not None: - unique_id = str(self._context.unique_id) + if self._context is not None and self._context._unique_id is not None: #pylint: disable=protected-access + unique_id = str(self._context._unique_id) #pylint: disable=protected-access else: absl.logging.warning( "Missing unique_id in executor, using a random id instead.") unique_id = test_utils.random_id() # Note: exec_properties['custom_config'] here is a dict. - return runner.start_gke_training(input_dict, output_dict, exec_properties, + return runnerstart_kubernetes_training(input_dict, output_dict, exec_properties, executor_class_path, training_inputs, unique_id) From 831c40d2a49ff7cda0eaac82f9555efc7f9a0df4 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Wed, 26 Aug 2020 01:39:11 -0700 Subject: [PATCH 16/17] refactoring to kubernetes --- .../google_cloud_kubernetes/runner.py | 34 +++++++++---------- .../google_cloud_kubernetes/runner_test.py | 2 +- .../trainer/executor_test.py | 4 +-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/google_cloud_kubernetes/runner.py index 7192820198..40a66faabd 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/google_cloud_kubernetes/runner.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Helper class to start TFX multi-worker training jobs on GKE.""" +"""Helper class to start TFX multi-worker training jobs on Kubernetes.""" import json from typing import Any, Dict, List, Text @@ -25,8 +25,8 @@ from kubernetes.client.rest import ApiException import kubernetes.client as client -# Default TFX container image to use in GKE Training. For GPU training, -# specify a custom image in executor.TRAINING_ARGS_KEY. +# Default TFX container image to use in Kubernetes Training. For GPU +# training, specify a custom image in executor.TRAINING_ARGS_KEY. _TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__) _COMMAND = ["python", "-m", "tfx.scripts.run_executor"] @@ -112,7 +112,7 @@ def create_worker_pods(job_args: List[Text], api_instance.create_namespaced_pod(namespace='default', body=pod) except ApiException as e: logging.error( - 'Exception when calling CoreV1Api->create_namespaced_pod: %s' % e) + 'Exception when calling CoreV1Api.create_namespaced_pod: %s' % e) logging.info('created {} worker pods'.format(num_workers)) @@ -148,7 +148,7 @@ def create_worker_services(training_inputs: Dict[Text, Any], api_instance.create_namespaced_service(namespace='default', body=service) except ApiException as e: logging.error( - 'Exception when calling CoreV1Api->create_namespaced_service: %s' % e) + 'Exception when calling CoreV1Api.create_namespaced_service: %s' % e) logging.info('created {} worker services'.format(num_workers)) @@ -165,27 +165,27 @@ def delete_worker_services(training_inputs: Dict[Text, Any], name=service_name) except ApiException as e: logging.error( - 'Exception when calling CoreV1Api->delete_namespaced_service: %s' % e) + 'Exception when calling CoreV1Api.delete_namespaced_service: %s' % e) logging.info('Deleted {} worker services'.format(num_workers)) -def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], - output_dict: Dict[Text, List[types.Artifact]], - exec_properties: Dict[Text, - Any], executor_class_path: Text, - training_inputs: Dict[Text, - Any], unique_id: Text): - """Start a trainer job on Google Kubernetes Engine (GKE). +def start_kubernetes_training(input_dict: Dict[Text, List[types.Artifact]], + output_dict: Dict[Text, List[types.Artifact]], + exec_properties: Dict[Text, Any], + executor_class_path: Text, + training_inputs: Dict[Text,Any], + unique_id: Text): + """Start a trainer job on Kubernetes. This is done by forwarding the inputs/outputs/exec_properties to the - tfx.scripts.run_executor module on a AI Platform training job interpreter. + tfx.scripts.run_executor module on a kubernetes pod. Args: input_dict: Passthrough input dict for tfx.components.Trainer.executor. output_dict: Passthrough input dict for tfx.components.Trainer.executor. exec_properties: Passthrough input dict for tfx.components.Trainer.executor. executor_class_path: class path for TFX core default trainer. - training_inputs: Training input argument for GKE. + training_inputs: Training input argument for Kubernetes. 'num_workers', 'num_gpus_per_worker' and 'tfx_image' will be consumed. Returns: @@ -203,7 +203,7 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], logging.info('json_exec_properties=\'%s\'.', json_exec_properties) - # We use custom containers to launch training on GKE, which invokes + # We use custom containers to launch training on Kubernetes, which invokes # the specified image using the container's entrypoint. The default # entrypoint for TFX containers is to call scripts/run_executor.py. The # arguments below are passed to this run_executor entry to run the executor @@ -241,5 +241,5 @@ def start_gke_training(input_dict: Dict[Text, List[types.Artifact]], ('default', pod_names[0], resp.status)) - # GKE training complete + # Kubernetes training complete. logging.info('Job successful.') diff --git a/tfx/extensions/google_cloud_kubernetes/runner_test.py b/tfx/extensions/google_cloud_kubernetes/runner_test.py index 8c4f79b03d..993f6a802d 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner_test.py +++ b/tfx/extensions/google_cloud_kubernetes/runner_test.py @@ -82,7 +82,7 @@ def testStartKubernetesTraining(self, mock_kube_utils, mock_client): mock_kube_utils.wait_pod.return_value = mock.Mock() self._set_up_training_mocks() - runner.start_gke_training(self._inputs, self._outputs, + runnerstart_kubernetes_training(self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._executor_class_path, self._training_inputs, self._unique_id) diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py index daab6b39d8..bf2a3e9211 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py +++ b/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py @@ -74,7 +74,7 @@ def testDo(self): ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) - self.mock_runner.start_gke_training.assert_called_with( + self.mock_runnerstart_kubernetes_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._executor_class_path, { 'num_gpus_per_worker': self._num_gpus_per_worker, @@ -87,7 +87,7 @@ def testDoWithGenericExecutorClass(self): ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) - self.mock_runner.start_gke_training.assert_called_with( + self.mock_runnerstart_kubernetes_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._generic_executor_class_path, { 'num_gpus_per_worker': self._num_gpus_per_worker, From 99cc48ed56d45cc9ad20429fea406ff4389e15a3 Mon Sep 17 00:00:00 2001 From: Eric-Le-Ge Date: Wed, 26 Aug 2020 22:28:04 -0700 Subject: [PATCH 17/17] refactor kubernetes trainer --- .../kubernetes}/__init__.py | 3 ++- .../kubernetes}/runner.py | 21 ++++++++----------- .../kubernetes}/runner_test.py | 16 +++++++------- .../kubernetes/trainer}/__init__.py | 0 .../kubernetes}/trainer/executor.py | 7 ++++--- .../kubernetes}/trainer/executor_test.py | 10 ++++----- 6 files changed, 28 insertions(+), 29 deletions(-) rename tfx/extensions/{google_cloud_kubernetes/trainer => experimental/kubernetes}/__init__.py (87%) rename tfx/extensions/{google_cloud_kubernetes => experimental/kubernetes}/runner.py (93%) rename tfx/extensions/{google_cloud_kubernetes => experimental/kubernetes}/runner_test.py (89%) rename tfx/extensions/{google_cloud_kubernetes => experimental/kubernetes/trainer}/__init__.py (100%) rename tfx/extensions/{google_cloud_kubernetes => experimental/kubernetes}/trainer/executor.py (94%) rename tfx/extensions/{google_cloud_kubernetes => experimental/kubernetes}/trainer/executor_test.py (91%) diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/__init__.py b/tfx/extensions/experimental/kubernetes/__init__.py similarity index 87% rename from tfx/extensions/google_cloud_kubernetes/trainer/__init__.py rename to tfx/extensions/experimental/kubernetes/__init__.py index b179ecb83a..2439a34718 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/__init__.py +++ b/tfx/extensions/experimental/kubernetes/__init__.py @@ -1,4 +1,5 @@ -# Copyright 2020 Google LLC. All Rights Reserved. +# Lint as: python2, python3 +# Copyright 2019 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tfx/extensions/google_cloud_kubernetes/runner.py b/tfx/extensions/experimental/kubernetes/runner.py similarity index 93% rename from tfx/extensions/google_cloud_kubernetes/runner.py rename to tfx/extensions/experimental/kubernetes/runner.py index 40a66faabd..2dd9977a51 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner.py +++ b/tfx/extensions/experimental/kubernetes/runner.py @@ -111,9 +111,7 @@ def create_worker_pods(job_args: List[Text], try: api_instance.create_namespaced_pod(namespace='default', body=pod) except ApiException as e: - logging.error( - 'Exception when calling CoreV1Api.create_namespaced_pod: %s' % e) - + raise RuntimeError('Worker pod creation failed.') from e logging.info('created {} worker pods'.format(num_workers)) @@ -147,8 +145,7 @@ def create_worker_services(training_inputs: Dict[Text, Any], try: api_instance.create_namespaced_service(namespace='default', body=service) except ApiException as e: - logging.error( - 'Exception when calling CoreV1Api.create_namespaced_service: %s' % e) + raise RuntimeError('Worker service creation failed.') from e logging.info('created {} worker services'.format(num_workers)) @@ -190,6 +187,7 @@ def start_kubernetes_training(input_dict: Dict[Text, List[types.Artifact]], Returns: None + Raises: RuntimeError: if the Google Kubernetes Engine training job failed/cancelled. """ @@ -204,10 +202,10 @@ def start_kubernetes_training(input_dict: Dict[Text, List[types.Artifact]], # We use custom containers to launch training on Kubernetes, which invokes - # the specified image using the container's entrypoint. The default - # entrypoint for TFX containers is to call scripts/run_executor.py. The - # arguments below are passed to this run_executor entry to run the executor - # specified in `executor_class_path`. + # the specified image using the container's entrypoint. The entrypoint used + # for the worker conatiner is to call scripts/run_executor.py. The arguments + # below are passed to this run_executor entry to run the executor specified + # in `executor_class_path`. job_args = [ '--executor_class_path', executor_class_path, '--inputs', json_inputs, '--outputs', json_outputs, '--exec-properties', json_exec_properties @@ -221,7 +219,7 @@ def start_kubernetes_training(input_dict: Dict[Text, List[types.Artifact]], training_inputs=training_inputs, unique_id=unique_id) - # Wait for finish. + # Wait indefinitely until training finishes. num_workers = training_inputs.get('num_workers', 1) pod_names = _build_pod_names(unique_id=unique_id, num_workers=num_workers) @@ -230,8 +228,7 @@ def start_kubernetes_training(input_dict: Dict[Text, List[types.Artifact]], namespace='default', exit_condition_lambda=_pod_is_done, condition_description='Chief finished', - timeout_sec=1200, # wait for autoscaler - exponential_backoff=True,) + exponential_backoff=True) # Clean up the ClusterIP services. delete_worker_services(training_inputs=training_inputs, unique_id=unique_id) diff --git a/tfx/extensions/google_cloud_kubernetes/runner_test.py b/tfx/extensions/experimental/kubernetes/runner_test.py similarity index 89% rename from tfx/extensions/google_cloud_kubernetes/runner_test.py rename to tfx/extensions/experimental/kubernetes/runner_test.py index 993f6a802d..0acd8e12d1 100644 --- a/tfx/extensions/google_cloud_kubernetes/runner_test.py +++ b/tfx/extensions/experimental/kubernetes/runner_test.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for tfx.extensions.google_cloud_kubernetes.runner.""" +"""Tests for tfx.extensions.experimental.kubernetes.runner.""" import copy import os @@ -20,8 +20,8 @@ import mock import tensorflow as tf -from tfx.extensions.google_cloud_kubernetes import runner -from tfx.extensions.google_cloud_kubernetes.trainer import executor +from tfx.extensions.experimental.kubernetes import runner +from tfx.extensions.experimental.kubernetes.trainer import executor from tfx.utils import json_utils @@ -73,8 +73,8 @@ def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: return result @mock.patch.object(runner, '_build_service_names', mock_build_service_names) - @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.client') - @mock.patch('tfx.extensions.google_cloud_kubernetes.runner.kube_utils') + @mock.patch('tfx.extensions.experimental.kubernetes.runner.client') + @mock.patch('tfx.extensions.experimental.kubernetes.runner.kube_utils') def testStartKubernetesTraining(self, mock_kube_utils, mock_client): mock_client.V1Pod.return_value = self._mock_pod mock_client.V1Service.return_value = self._mock_service @@ -82,18 +82,18 @@ def testStartKubernetesTraining(self, mock_kube_utils, mock_client): mock_kube_utils.wait_pod.return_value = mock.Mock() self._set_up_training_mocks() - runnerstart_kubernetes_training(self._inputs, self._outputs, + runner.start_kubernetes_training(self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._executor_class_path, self._training_inputs, self._unique_id) self._mock_api_client.create_namespaced_service.assert_called_with( namespace='default', - body=self._mock_service,) + body=self._mock_service) self._mock_api_client.create_namespaced_pod.assert_called_with( namespace='default', - body=self._mock_pod,) + body=self._mock_pod) expected_service_names = mock_build_service_names(self._num_workers, self._unique_id) diff --git a/tfx/extensions/google_cloud_kubernetes/__init__.py b/tfx/extensions/experimental/kubernetes/trainer/__init__.py similarity index 100% rename from tfx/extensions/google_cloud_kubernetes/__init__.py rename to tfx/extensions/experimental/kubernetes/trainer/__init__.py diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py b/tfx/extensions/experimental/kubernetes/trainer/executor.py similarity index 94% rename from tfx/extensions/google_cloud_kubernetes/trainer/executor.py rename to tfx/extensions/experimental/kubernetes/trainer/executor.py index 7ee3623adf..44544ee631 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor.py +++ b/tfx/extensions/experimental/kubernetes/trainer/executor.py @@ -20,7 +20,7 @@ from tfx import types from tfx.components.base import base_executor from tfx.components.trainer import executor as tfx_trainer_executor -from tfx.extensions.google_cloud_kubernetes import runner +from tfx.extensions.experimental.kubernetes import runner from tfx.orchestration import test_utils from tfx.utils import json_utils @@ -49,6 +49,7 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], Returns: None + Raises: ValueError: If gke_training_args is not in exec_properties.custom_config. RuntimeError: If the Google Kubernetes Engine training job failed. @@ -75,11 +76,11 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]], unique_id = str(self._context._unique_id) #pylint: disable=protected-access else: absl.logging.warning( - "Missing unique_id in executor, using a random id instead.") + 'Missing unique_id in executor, using a random id instead.') unique_id = test_utils.random_id() # Note: exec_properties['custom_config'] here is a dict. - return runnerstart_kubernetes_training(input_dict, output_dict, exec_properties, + return runner.start_kubernetes_training(input_dict, output_dict, exec_properties, executor_class_path, training_inputs, unique_id) diff --git a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py b/tfx/extensions/experimental/kubernetes/trainer/executor_test.py similarity index 91% rename from tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py rename to tfx/extensions/experimental/kubernetes/trainer/executor_test.py index bf2a3e9211..c347d27fe6 100644 --- a/tfx/extensions/google_cloud_kubernetes/trainer/executor_test.py +++ b/tfx/extensions/experimental/kubernetes/trainer/executor_test.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for tfx.extensions.google_cloud_kubernetes.trainer.executor.""" +"""Tests for tfx.extensions.experimental.kubernetes.trainer.executor.""" import copy @@ -22,7 +22,7 @@ import tensorflow as tf from tfx.components.trainer import executor as tfx_trainer_executor -from tfx.extensions.google_cloud_kubernetes.trainer import executor as gke_trainer_executor +from tfx.extensions.experimental.kubernetes.trainer import executor as gke_trainer_executor from tfx.utils import json_utils @@ -59,7 +59,7 @@ def setUp(self): self.addCleanup(mock.patch.stopall) self.mock_runner = mock.patch( - 'tfx.extensions.google_cloud_kubernetes.trainer.executor.runner' + 'tfx.extensions.experimental.kubernetes.trainer.executor.runner' ).start() def _serialize_custom_config_under_test(self) -> Dict[Text, Any]: @@ -74,7 +74,7 @@ def testDo(self): ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) - self.mock_runnerstart_kubernetes_training.assert_called_with( + self.mock_runner.start_kubernetes_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._executor_class_path, { 'num_gpus_per_worker': self._num_gpus_per_worker, @@ -87,7 +87,7 @@ def testDoWithGenericExecutorClass(self): ) executor.Do(self._inputs, self._outputs, self._serialize_custom_config_under_test()) - self.mock_runnerstart_kubernetes_training.assert_called_with( + self.mock_runner.start_kubernetes_training.assert_called_with( self._inputs, self._outputs, self._serialize_custom_config_under_test(), self._generic_executor_class_path, { 'num_gpus_per_worker': self._num_gpus_per_worker,