diff --git a/README.md b/README.md index fe2fa2ca..49f3b479 100644 --- a/README.md +++ b/README.md @@ -5,33 +5,42 @@ [![Docker Pulls](https://img.shields.io/docker/pulls/frameworkcontroller/frameworkcontroller.svg)](https://hub.docker.com/u/frameworkcontroller) [![License](https://img.shields.io/github/license/microsoft/frameworkcontroller.svg)](https://github.com/microsoft/frameworkcontroller/blob/master/LICENSE) -As one standalone component of [Microsoft OpenPAI](https://github.com/microsoft/pai), FrameworkController is built to orchestrate all kinds of applications on [Kubernetes](https://kubernetes.io) by a single controller. +As one standalone component of [Microsoft OpenPAI](https://github.com/microsoft/pai), FrameworkController (FC) is built to orchestrate all kinds of applications on [Kubernetes](https://kubernetes.io) by a single controller, especially for DeepLearning applications. These kinds of applications include but not limited to: -- __Stateless and Stateful Service__ (Nginx, TensorFlow Serving, HBase, Kafka, etc) -- __Stateless and Stateful Batch__ (KD-Tree Building, Batch Data Processing, etc) -- __Any combination of above applications__ (Distributed TensorFlow Training, Stream Data Processing, etc) +- __Stateless and Stateful Service__: + - DeepLearning Serving: [TensorFlow Serving](https://www.tensorflow.org/tfx/guide/serving), etc. + - Big Data Serving: HDFS, HBase, Kafka, Nginx, etc. +- __Stateless and Stateful Batch__: + - DeepLearning Elastic Training without Server: [PyTorch Elastic Training with whole cluster shared etcd](https://pytorch.org/elastic), etc. + - DeepLearning Batch/Offline Inference: [PyTorch Inference](https://pytorch.org/tutorials/recipes/recipes/saving_and_loading_models_for_inference.html), etc. + - Automated Machine Learning: [NNI](https://nni.readthedocs.io), etc. + - Big Data Batch Processing: [Standalone Spark](http://spark.apache.org/docs/latest/spark-standalone.html), KD-Tree Building, etc. +- __Any combination of above applications__: + - DeepLearning MPI Training: [Horovod MPI Training](https://horovod.readthedocs.io/en/stable/mpirun.html), etc. + - DeepLearning ParameterServer Training: [TensorFlow ParameterServer Training](https://www.tensorflow.org/guide/distributed_training#parameterserverstrategy), etc. + - DeepLearning Interactive Training: [TensorFlow with Jupyter Notebook](https://www.tensorflow.org/tensorboard/tensorboard_in_notebooks), etc. + - DeepLearning Elastic Training with Server: [PyTorch Elastic Training with per-application dedicated etcd](https://pytorch.org/elastic), etc. + - DeepLearning Streaming/Online Inference: [TensorFlow Inference with Streaming I/O](https://www.tensorflow.org/io), etc. + - DeepLearning Incremental/Online Training: [TensorFlow Training with Streaming I/O](https://www.tensorflow.org/io), etc. + - Big Data Stream Processing: [Standalone Flink](https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/cluster_setup.html), etc. ## Why Need It ### Problem -In the open source community, there are so many specialized Kubernetes Pod controllers which are built for a specific kind of application, such as [Kubernetes StatefulSet Controller](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset), [Kubernetes Job Controller](https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion), [KubeFlow TFJob Operator](https://www.kubeflow.org/docs/guides/components/tftraining). However, no one is built for all kinds of applications and combination of the existing ones still cannot support some kinds of applications. So, we have to learn, use, develop, deploy and maintain so many Pod controllers. +In the open source community, there are so many specialized Kubernetes Pod controllers which are built for a specific kind of application, such as [Kubernetes StatefulSet Controller](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset), [Kubernetes Job Controller](https://kubernetes.io/docs/concepts/workloads/controllers/job), [KubeFlow TensorFlow Operator](https://github.com/kubeflow/tf-operator), [KubeFlow PyTorch Operator](https://github.com/kubeflow/pytorch-operator). However, no one is built for all kinds of applications and combination of the existing ones still cannot support some kinds of applications. So, we have to learn, use, develop, deploy and maintain so many Pod controllers. ### Solution Build a General-Purpose Kubernetes Pod Controller: FrameworkController. And then we can get below benefits from it: -- __Support Kubernetes official unsupported applications__ - - Such as the [Stateful Batch with Service](example/framework/basic/batchwithservicesucceeded.yaml) application, like [Distributed TensorFlow - Training](https://www.tensorflow.org/deploy/distributed). - +- __Support Kubernetes official unsupported applications__: + - [Stateful Batch with Service](example/framework/basic/batchwithservicesucceeded.yaml) applications, like [TensorFlow ParameterServer Training on FC](example/framework/scenario/tensorflow/ps). + - [ScaleUp/ScaleDown Tolerable Stateful Batch](doc/user-manual.md#FrameworkRescale) applications, like [PyTorch Elastic Training on FC](example/framework/scenario/pytorch/elastic). - __Only need to learn, use, develop, deploy and maintain a single controller__ - -- __All kinds of applications can be orchestrated through the same interface with a unified experience__ - -- __If really required, only need to build specialized controllers on top of it, instead of building from scratch__ - - The similar practice is also adopted by Kubernetes official controllers, such as the [Kubernetes Deployment Controller](https://kubernetes.io/docs/concepts/workloads/controllers/deployment) is built on top of the [Kubernetes ReplicaSet Controller](https://kubernetes.io/docs/concepts/workloads/controllers/replicaset). +- __All kinds of applications can leverage almost all provided features and guarantees__ +- __All kinds of applications can be used through the same interface with a unified experience__ +- __If really required, only need to build specialized controllers on top of it, instead of building from scratch__: + - The similar practice is also adopted by Kubernetes official controllers, such as the [Kubernetes Deployment Controller](https://kubernetes.io/docs/concepts/workloads/controllers/deployment) is built on top of the [Kubernetes ReplicaSet Controller](https://kubernetes.io/docs/concepts/workloads/controllers/replicaset). ## Architecture

@@ -48,6 +57,7 @@ A Framework represents an application with a set of Tasks: 5. With fine grained [RetryPolicy](doc/user-manual.md#RetryPolicy) for each Task and the whole Framework 6. With fine grained [FrameworkAttemptCompletionPolicy](doc/user-manual.md#FrameworkAttemptCompletionPolicy) for each TaskRole 7. With PodGracefulDeletionTimeoutSec for each Task to [tune Consistency vs Availability](doc/user-manual.md#FrameworkConsistencyAvailability) +8. With fine grained [Status](pkg/apis/frameworkcontroller/v1/types.go) for each TaskAttempt/Task, each TaskRole and the whole FrameworkAttempt/Framework ### Controller Feature 1. Highly generalized as it is built for all kinds of applications @@ -55,12 +65,13 @@ A Framework represents an application with a set of Tasks: 3. Well-defined Framework [Consistency vs Availability](doc/user-manual.md#FrameworkConsistencyAvailability), [State Machine](doc/user-manual.md#FrameworkTaskStateMachine) and [Failure Model](doc/user-manual.md#CompletionStatus) 4. Tolerate Pod/ConfigMap unexpected deletion, Node/Network/FrameworkController/Kubernetes failure 5. Support to specify how to [classify and summarize Pod failures](doc/user-manual.md#PodFailureClassification) -6. Support to expose [Framework and Pod history snapshots](doc/user-manual.md#FrameworkPodHistory) to external systems -7. Easy to leverage [FrameworkBarrier](doc/user-manual.md#FrameworkBarrier) to achieve light-weight Gang Execution and Service Discovery -8. Easy to leverage [HiveDScheduler](doc/user-manual.md#HiveDScheduler) to achieve GPU Topology-Aware, Multi-Tenant, Priority and Gang Scheduling -9. Compatible with other Kubernetes features, such as Kubernetes [Service](https://kubernetes.io/docs/concepts/services-networking/service), [Gpu Scheduling](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus), [Volume](https://kubernetes.io/docs/concepts/storage/volumes/), [Logging](https://kubernetes.io/docs/concepts/cluster-administration/logging) -10. Idiomatic with Kubernetes official controllers, such as [Pod Spec](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates) -11. Aligned with Kubernetes [Controller Design Guidelines](https://github.com/kubernetes/community/blob/f0dd87ad477e1e91c53866902adf7832c32ce543/contributors/devel/sig-api-machinery/controllers.md) and [API Conventions](https://github.com/kubernetes/community/blob/a2cdce51a0bbbc214f0e8813e0a877176ad3b6c9/contributors/devel/sig-architecture/api-conventions.md) +6. Support to [ScaleUp/ScaleDown Framework with Strong Safety Guarantee](doc/user-manual.md#FrameworkRescale) +7. Support to expose [Framework and Pod history snapshots](doc/user-manual.md#FrameworkPodHistory) to external systems +8. Easy to leverage [FrameworkBarrier](doc/user-manual.md#FrameworkBarrier) to achieve light-weight Gang Execution and Service Discovery +9. Easy to leverage [HiveDScheduler](doc/user-manual.md#HiveDScheduler) to achieve GPU Topology-Aware, Multi-Tenant, Priority and Gang Scheduling +10. Compatible with other Kubernetes features, such as Kubernetes [Service](https://kubernetes.io/docs/concepts/services-networking/service), [Gpu Scheduling](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus), [Volume](https://kubernetes.io/docs/concepts/storage/volumes), [Logging](https://kubernetes.io/docs/concepts/cluster-administration/logging) +11. Idiomatic with Kubernetes official controllers, such as [Pod Spec](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates) +12. Aligned with Kubernetes [Controller Design Guidelines](https://github.com/kubernetes/community/blob/f0dd87ad477e1e91c53866902adf7832c32ce543/contributors/devel/sig-api-machinery/controllers.md) and [API Conventions](https://github.com/kubernetes/community/blob/a2cdce51a0bbbc214f0e8813e0a877176ad3b6c9/contributors/devel/sig-architecture/api-conventions.md) ## Prerequisite 1. A Kubernetes cluster, v1.14.2 or above, on-cloud or on-premise. @@ -81,8 +92,9 @@ A Framework represents an application with a set of Tasks: ## Related Project ### Third Party Controller Wrapper A specialized wrapper can be built on top of FrameworkController to optimize for a specific kind of application: -* [OpenPAI Controller Wrapper (Job RestServer)](https://github.com/microsoft/pai/tree/master/src/rest-server): A wrapper client optimized for AI applications -* [NNI Controller Wrapper (TrainingService)](https://github.com/microsoft/nni/blob/master/docs/en_US/TrainingService/FrameworkControllerMode.md): A wrapper client optimized for AutoML applications +* [Microsoft OpenPAI Controller Wrapper (Job RestServer)](https://github.com/microsoft/pai/tree/master/src/rest-server): A wrapper client optimized for AI applications +* [Microsoft DLWorkspace Controller Wrapper (Job Manager)](https://github.com/microsoft/DLWorkspace/blob/914f347d18e852bc6a6d3e86fe25ac040a3f78f9/src/ClusterManager/job_manager.py): A wrapper client optimized for AI applications +* [Microsoft NNI Controller Wrapper (TrainingService)](https://github.com/microsoft/nni/blob/master/docs/en_US/TrainingService/FrameworkControllerMode.md): A wrapper client optimized for AutoML applications ### Recommended Kubernetes Scheduler FrameworkController can directly leverage many [Kubernetes Schedulers](https://kubernetes.io/docs/tasks/administer-cluster/configure-multiple-schedulers) and among them we recommend these best fits: @@ -90,7 +102,7 @@ FrameworkController can directly leverage many [Kubernetes Schedulers](https://k * [HiveDScheduler](doc/user-manual.md#HiveDScheduler): A Kubernetes Scheduler Extender optimized for AI applications ### Similar Offering On Other Cluster Manager -* [YARN FrameworkLauncher](https://github.com/microsoft/pai/blob/master/subprojects/frameworklauncher/yarn): Similar offering natively supports [Apache YARN](http://hadoop.apache.org) +* [YARN FrameworkLauncher](https://github.com/microsoft/pai/blob/master/subprojects/frameworklauncher/yarn): Similar offering on [Apache YARN](http://hadoop.apache.org) ## Contributing This project welcomes contributions and suggestions. Most contributions require you to agree to a diff --git a/doc/known-issue-and-upcoming-feature.md b/doc/known-issue-and-upcoming-feature.md index 9b169417..f1792407 100644 --- a/doc/known-issue-and-upcoming-feature.md +++ b/doc/known-issue-and-upcoming-feature.md @@ -13,6 +13,5 @@ Tracked in [Dashboard errors if pod's owner reference is not supported](https://github.com/kubernetes/dashboard/issues/3251) ## Upcoming Feature -- [ ] Support Framework Spec Update - [ ] Support Framework Spec Validation and Defaulting - [ ] Support Framework Status Subresource diff --git a/doc/user-manual.md b/doc/user-manual.md index eea51e71..3726af83 100644 --- a/doc/user-manual.md +++ b/doc/user-manual.md @@ -8,6 +8,7 @@ - [CompletionStatus](#CompletionStatus) - [RetryPolicy](#RetryPolicy) - [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy) + - [Framework ScaleUp/ScaleDown](#FrameworkRescale) - [Large Scale Framework](#LargeScaleFramework) - [Framework and Pod History](#FrameworkPodHistory) - [Framework and Task State Machine](#FrameworkTaskStateMachine) @@ -37,7 +38,7 @@ As Framework is actually a [Kubernetes CRD](https://kubernetes.io/docs/concepts/ ### Supported Interoperation | API Kind | Operations | |:---- |:---- | -| Framework | [CREATE](#CREATE_Framework) [PATCH](#PATCH_Framework) [DELETE](#DELETE_Framework) [GET](#GET_Framework) [LIST](#LIST_Frameworks) [WATCH](#WATCH_Framework) [WATCH_LIST](#WATCH_LIST_Frameworks) | +| Framework | [CREATE](#CREATE_Framework) [DELETE](#DELETE_Framework) [GET](#GET_Framework) [LIST](#LIST_Frameworks) [WATCH](#WATCH_Framework) [WATCH_LIST](#WATCH_LIST_Frameworks)
[PATCH](#PATCH_Framework) ([Stop](#Stop_Framework), [Add TaskRole](#Add_TaskRole), [Delete TaskRole](#Delete_TaskRole), [Add/Delete Task](#Add_Delete_Task)) | | [ConfigMap](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#configmap-v1-core) | All operations except for [CREATE](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#create-configmap-v1-core) [PUT](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#replace-configmap-v1-core) [PATCH](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#patch-configmap-v1-core) | | [Pod](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#pod-v1-core) | All operations except for [CREATE](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#create-pod-v1-core) [PUT](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#replace-pod-v1-core) [PATCH](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#patch-pod-v1-core) | @@ -72,14 +73,16 @@ Create the specified Framework. Body: ```json -{ - "spec": { - "executionType": "Stop" +[ + { + "op": "replace", + "path": "/spec/executionType", + "value": "Stop" } -} +] ``` -Type: application/merge-patch+json +Type: application/json-patch+json **Description** @@ -94,6 +97,149 @@ All running containers of the Framework will be stopped while the object of the | OK(200) | [Framework](../pkg/apis/frameworkcontroller/v1/types.go) | Return current Framework. | | NotFound(404) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified Framework is not found. | +##### Add TaskRole +**Request** + + PATCH /apis/frameworkcontroller.microsoft.com/v1/namespaces/{FrameworkNamespace}/frameworks/{FrameworkName} + +Body: + +*Follow the [TaskRoleSpec](../pkg/apis/frameworkcontroller/v1/types.go) to override below $(TaskRoleSpec) placeholder.* + +```json +[ + { + "op": "add", + "path": "/spec/taskRoles/-", + "value": $(TaskRoleSpec) + } +] +``` + +Type: application/json-patch+json + +**Description** + +Append the specified TaskRole to the specified Framework. + +See more in [Framework ScaleUp/ScaleDown](#FrameworkRescale). + +**Response** + +| Code | Body | Description | +|:---- |:---- |:---- | +| OK(200) | [Framework](../pkg/apis/frameworkcontroller/v1/types.go) | Return current Framework. | +| NotFound(404) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified Framework is not found. | + +##### Delete TaskRole +**Request** + + PATCH /apis/frameworkcontroller.microsoft.com/v1/namespaces/{FrameworkNamespace}/frameworks/{FrameworkName} + +Body: + +*Use the current array index of the $(TaskRoleName) to override below $(TaskRoleIndex).* + +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/$(TaskRoleIndex)/name", + "value": "$(TaskRoleName)" + }, + { + "op": "remove", + "path": "/spec/taskRoles/$(TaskRoleIndex)" + } +] +``` + +Type: application/json-patch+json + +**Description** + +Delete the specified TaskRole from the specified Framework. + +See more in [Framework ScaleUp/ScaleDown](#FrameworkRescale). + +Notes: +* Better to first delete all Tasks in the TaskRole and wait until they are all deleted, then delete the whole TaskRole. Otherwise, the Tasks in the TaskRole will be deleted according to last observed [PodGracefulDeletionTimeoutSec](../pkg/apis/frameworkcontroller/v1/types.go) in TaskRoleStatus, as the [PodGracefulDeletionTimeoutSec](../pkg/apis/frameworkcontroller/v1/types.go) in TaskSpec is already deleted. + +**Response** + +| Code | Body | Description | +|:---- |:---- |:---- | +| OK(200) | [Framework](../pkg/apis/frameworkcontroller/v1/types.go) | Return current Framework. | +| NotFound(404) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified Framework is not found. | +| UnprocessableEntity(422) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified $(TaskRoleName) does not exist or does not match the specified $(TaskRoleIndex). | + +##### Add/Delete Task +**Request** + + PATCH /apis/frameworkcontroller.microsoft.com/v1/namespaces/{FrameworkNamespace}/frameworks/{FrameworkName} + +Body: + +*Use the current array index of the $(TaskRoleName) to override below $(TaskRoleIndex).* + +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/$(TaskRoleIndex)/name", + "value": "$(TaskRoleName)" + }, + { + "op": "replace", + "path": "/spec/taskRoles/$(TaskRoleIndex)/taskNumber", + "value": $(TaskNumber) + } +] +``` + +*Generally, you may also need to adjust the TaskRole's [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy) according to the new $(TaskNumber). It is safe as [Framework ScaleUp/ScaleDown Strong Safety Guarantee](#FrameworkRescaleGuarantee).* + +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/$(TaskRoleIndex)/name", + "value": "$(TaskRoleName)" + }, + { + "op": "replace", + "path": "/spec/taskRoles/$(TaskRoleIndex)/taskNumber", + "value": $(TaskNumber) + }, + { + "op": "replace", + "path": "/spec/taskRoles/$(TaskRoleIndex)/frameworkAttemptCompletionPolicy/minSucceededTaskCount", + "value": $(MinSucceededTaskCount) + }, + { + "op": "replace", + "path": "/spec/taskRoles/$(TaskRoleIndex)/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": $(MinFailedTaskCount) + } +] +``` + +Type: application/json-patch+json + +**Description** + +Update the TaskNumber (and the FrameworkAttemptCompletionPolicy) of the specified TaskRole in the specified Framework. + +See more in [Framework ScaleUp/ScaleDown](#FrameworkRescale). + +**Response** + +| Code | Body | Description | +|:---- |:---- |:---- | +| OK(200) | [Framework](../pkg/apis/frameworkcontroller/v1/types.go) | Return current Framework. | +| NotFound(404) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified Framework is not found. | +| UnprocessableEntity(422) | [Status](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#status-v1-meta) | The specified $(TaskRoleName) does not exist or does not match the specified $(TaskRoleIndex). | + #### DELETE Framework **Request** @@ -227,10 +373,7 @@ You can leverage the [Predefined CompletionCode](../pkg/apis/frameworkcontroller ### Example Notes: -1. *Italic Conditions* can be inherited from the **DEFAULT** RetryPolicy, so no need to specify them explicitly. - - *You still need to specify them explicitly, as we have not supported the Framework Spec Defaulting yet.* - +1. *Italic Conditions* still need to be specified explicitly, as we have not supported the Framework Spec Defaulting yet. 2. For the definition of each [CompletionType](../pkg/apis/frameworkcontroller/v1/types.go), such as Transient Failed, see [CompletionStatus](#CompletionStatus). @@ -300,9 +443,7 @@ Notes: ### Example Notes: -1. *Italic Conditions* can be inherited from the **DEFAULT** FrameworkAttemptCompletionPolicy, so no need to specify them explicitly. - - *You still need to specify them explicitly, as we have not supported the Framework Spec Defaulting yet.* +1. *Italic Conditions* still need to be specified explicitly, as we have not supported the Framework Spec Defaulting yet.
@@ -339,7 +480,7 @@ Notes: - + @@ -349,28 +490,225 @@ Notes: - - + + - + - - - - - + - - + +
MinFailedTaskCount = {Reduce.TaskNumber} * {mapreduce.reduce.failures.maxpercent} + 1
MinSucceededTaskCount = -1
TensorFlowWorker Dominated: TensorFlow ParameterServer Training ParameterServer MinFailedTaskCount = 1
MinSucceededTaskCount = -1
Succeed a certain TaskRole is enough, and do not want to wait all Tasks to succeed:
Fail the FrameworkAttempt immediately if any Task failed.
Succeed the FrameworkAttempt immediately if Worker's all Tasks succeeded.
MinFailedTaskCount = 1
MinSucceededTaskCount = {Worker.TaskNumber}
Arbitrator DominatedArbitratorMaster Dominated: Horovod MPI TrainingMaster MinFailedTaskCount = 1
MinSucceededTaskCount = 1
The FrameworkAttemptCompletionPolicy is fully delegated to the single instance arbitrator of the user application:
Fail the FrameworkAttempt immediately if the arbitrator failed.
Succeed the FrameworkAttempt immediately if the arbitrator succeeded.
The FrameworkAttemptCompletionPolicy is fully delegated to the single instance master of the user application:
Fail the FrameworkAttempt immediately if the master failed.
Succeed the FrameworkAttempt immediately if the master succeeded.
TaskRole-AMinFailedTaskCount = -1
MinSucceededTaskCount = -1
TaskRole-BWorker MinFailedTaskCount = -1
MinSucceededTaskCount = -1
First Completed Task DominatedTaskRole-AFirst Completed Task Dominated: PyTorch Elastic TrainingWorker MinFailedTaskCount = 1
MinSucceededTaskCount = 1
The FrameworkAttemptCompletionPolicy is fully delegated to the first completed Task of the user application:
Fail the FrameworkAttempt immediately if any Task failed.
Succeed the FrameworkAttempt immediately if any Task succeeded.
+## Framework ScaleUp/ScaleDown +Framework ScaleUp/ScaleDown (Rescale) refers to take any below action for an existing Framework on the fly: +1. Add/Delete TaskRole without touching other TaskRoles. +2. Add/Delete Task without touching other Tasks. + +### Application Assumption +Before you start to Rescale Framework, make sure your application executed by the Framework can tolerate Rescale, such as: +1. Your application should be able to **rebalance** its workload after Rescale: + 1. For Service application, may need to rebalance its serving traffic by a load balancer, and/or re-replicate its state. + 2. For Batch application, may need to rebalance its work items by a work queue, and/or adjust its Tasks membership, like [PyTorch Elastic Training](https://pytorch.org/elastic). +2. For **Batch** application, it would better **not rerun** after ScaleUp: + 1. Rerun may happen if the application already completed, but the completion event have not yet been observed by FrameworkController. So, during this period, if you ScaleUp the Framework, the application may rerun. + 2. To mitigate it, the ScaleUp Task can immediately complete itself by leveraging empty work queue or existing checkpoint from previous run. +3. For **Batch** application, it would better **not too early succeeded** after ScaleDown: + 1. Too early succeeded may happen if all Tasks succeeded except one Task still running, but you ScaleDown the running Task. + 2. To resolve it, make sure it is safe to ScaleDown the running Task, such as leverage `First Completed Task Dominated` or `Master Dominated` FrameworkType in [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy). For the `First Completed Task Dominated` FrameworkType, an exit barrier may be needed to ensure any Task succeeded means the whole application already succeeded, like [PyTorch Elastic Training](https://github.com/pytorch/elastic/blob/adb5f057359e87a2cb804c7354af5a4e424961c3/torchelastic/agent/server/local_elastic_agent.py#L122-L127). For the `Master Dominated` FrameworkType, a master TaskRole is needed and do not ScaleDown the master TaskRole. + +### API +- [Add TaskRole](#Add_TaskRole) +- [Delete TaskRole](#Delete_TaskRole) +- [Add/Delete Task](#Add_Delete_Task) + +### Example +#### Basic Example +This example will demonstrate the basic usage of Framework Rescale, as well as its [Strong Safety Guarantee](#FrameworkRescaleGuarantee). +1. [Create Framework](#CREATE_Framework) as below, and wait until all Tasks are AttemptRunning: +```yaml +apiVersion: frameworkcontroller.microsoft.com/v1 +kind: Framework +metadata: + name: rescalebasic +spec: + executionType: Start + retryPolicy: + fancyRetryPolicy: false + maxRetryCount: 0 + taskRoles: + - name: a + taskNumber: 4 + frameworkAttemptCompletionPolicy: + minFailedTaskCount: 4 + minSucceededTaskCount: 1 + task: + retryPolicy: + fancyRetryPolicy: false + maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 + pod: + spec: + restartPolicy: Never + containers: + - name: ubuntu + image: ubuntu:trusty + command: ["sh", "-c", "printenv && sleep infinity"] +``` +2. Delete Pod `rescalebasic-a-2`, `rescalebasic-a-3`, and wait until their Tasks are Completed (Failed). +3. [ScaleDown Framework](#Add_Delete_Task): Decrease the taskNumber and minFailedTaskCount from 4 to 2 by below patch: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/0/name", + "value": "a" + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/taskNumber", + "value": 2 + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": 2 + } +] +``` +4. The Completed Tasks `rescalebasic-a-2`, `rescalebasic-a-3` are immediately deleted, and the Framework stays AttemptRunning. + - This demonstrates [SafetyGuarantee1](#FrameworkRescaleGuarantee), as otherwise, the old Failed Tasks `rescalebasic-a-2`, `rescalebasic-a-3` may be wrongly considered in the new FrameworkAttemptCompletionPolicy.minFailedTaskCount (2) and triggers the completion. +5. [ScaleUp Framework](#Add_Delete_Task): Increase the taskNumber and minFailedTaskCount from 2 to 4 by below patch, and wait until all Tasks are AttemptRunning: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/0/name", + "value": "a" + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/taskNumber", + "value": 4 + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": 4 + } +] +``` +6. Delete Pod `rescalebasic-a-2`, and wait until its Task is Completed (Failed). +7. Redo Step 3, and wait until the `rescalebasic-a-2`, `rescalebasic-a-3` Tasks are DeletionPending, but before `rescalebasic-a-3` is deleted, then immediately [ScaleUp Framework](#Add_Delete_Task): Increase the taskNumber and minFailedTaskCount from 2 to 3 by below patch: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/0/name", + "value": "a" + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/taskNumber", + "value": 3 + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": 3 + } +] +``` +8. The Completed Task `rescalebasic-a-2` is immediately replaced by a new Task instance, the AttemptRunning Task `rescalebasic-a-3` is eventually deleted, and the Framework stays AttemptRunning. + - This demonstrates [SafetyGuarantee2](#FrameworkRescaleGuarantee), as otherwise, the previous ScaleDown Task `rescalebasic-a-2` may be wrongly reused in later ScaleUp. +9. [ScaleUp Framework](#Add_Delete_Task): Increase the taskNumber and minFailedTaskCount from 3 to 4 by below patch, and wait until all Tasks are AttemptRunning: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/0/name", + "value": "a" + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/taskNumber", + "value": 4 + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": 4 + } +] +``` +10. Delete Pod `rescalebasic-a-2`, and wait until its Task is Completed (Failed). +11. Redo Step 3, and wait until the `rescalebasic-a-2`, `rescalebasic-a-3` Tasks are DeletionPending, but before `rescalebasic-a-3` is deleted, then immediately [ScaleUp Framework](#Add_Delete_Task): Increase the taskNumber and minFailedTaskCount from 2 to 5 by below patch: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/0/name", + "value": "a" + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/taskNumber", + "value": 5 + }, + { + "op": "replace", + "path": "/spec/taskRoles/0/frameworkAttemptCompletionPolicy/minFailedTaskCount", + "value": 5 + } +] +``` +12. The Completed Task `rescalebasic-a-2` is immediately replaced by a new Task instance, the AttemptRunning Task `rescalebasic-a-3` is eventually replaced by a new Task instance, the new Task `rescalebasic-a-4` is immediately added, and the Framework stays AttemptRunning. + - This demonstrates [SafetyGuarantee2](#FrameworkRescaleGuarantee), as otherwise, the previous ScaleDown Task `rescalebasic-a-2`, `rescalebasic-a-3` may be wrongly reused in later ScaleUp. + +#### PyTorch Elastic Training Example +[PyTorch Elastic Training On FrameworkController](../example/framework/scenario/pytorch/elastic) + +### Pipeline +**ScaleUp Pipeline**: +1. As soon as FrameworkController observes not Completing/Completed Framework ScaleUp request, it will immediately mark the ScaleUp Task as [AttemptCreationPending](../pkg/apis/frameworkcontroller/v1/types.go) then persist (expose) the Task, before take any other action for the Framework, such as start to create its TaskAttempt or evaluate the Task's impact to the Framework, such as [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy). +2. As soon as the AttemptCreationPending Task is persisted (exposed), the Task will impact its Framework in future, such as: + 1. The Task will be considered in FrameworkAttemptCompletionPolicy. + 2. The Task will be deleted in later ScaleDown. +3. Only until then, FrameworkController will start to create the AttemptCreationPending TaskAttempt. + +**ScaleDown Pipeline**: +1. As soon as FrameworkController observes not Completing/Completed Framework ScaleDown request, it will immediately mark the ScaleDown Task as [DeletionPending](../pkg/apis/frameworkcontroller/v1/types.go) then persist (expose) the Task, before take any other action for the Framework, such as start to delete the Task or evaluate the Task's impact to the Framework, such as [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy). +2. As soon as the DeletionPending Task is persisted (exposed), the Task will never impact its Framework in future anymore except for the Task's graceful deletion period itself, such as: + 1. The Task will never be considered in FrameworkAttemptCompletionPolicy. + 2. The Task will never be reused in later ScaleUp. +3. Only until then, FrameworkController will start to delete the DeletionPending Task. +4. After Framework Completing/Completed, the Framework may still contain DeletionPending Tasks but these Tasks must be Completed. + +**Notes**: +1. User also needs to explicitly ignore the DeletionPending Task if he does not care about the Task's graceful deletion period, like [FrameworkBarrier](../pkg/barrier/barrier.go). + +### Strong Safety Guarantee +Besides these general [Framework ConsistencyGuarantees](#ConsistencyGuarantees), FrameworkController also provides below Strong Safety Guarantees for Framework Rescale: +- **SafetyGuarantee1**: + - User can always safely Rescale Framework and update its other Spec within a single Framework update, as if the Rescale is already done, such as [also update the FrameworkAttemptCompletionPolicy according to the new scale](#Add_Delete_Task), as: + - The ScaleUp/ScaleDown Task will be immediately ignored/considered for the updated Framework. +- **SafetyGuarantee2**: + - User can always safely Rescale before any previous Rescale totally finished (including the ScaleDown Task final deletion), as: + - For ScaleUp immediately followed by a ScaleDown: + - If the user observed a new non-[DeletionPending](../pkg/apis/frameworkcontroller/v1/types.go) Task (caused by ScaleUp), later ScaleDown will delete it (i.e. ScaleUp committed), otherwise, later ScaleDown may not delete it but previous ScaleUp must not have impacted it, such as start to create its TaskAttempt (i.e. ScaleUp rollbacked). + - For ScaleDown immediately followed by a ScaleUp: + - If the user observed a new [DeletionPending](../pkg/apis/frameworkcontroller/v1/types.go) Task (caused by ScaleDown), later ScaleUp will not reuse it (i.e. ScaleDown committed), otherwise, later ScaleUp may reuse it but previous ScaleDown must not have impacted it, such as start to delete it (i.e. ScaleDown rollbacked). + +**See [Framework Rescale Basic Example](#FrameworkRescaleBasicExample) to demonstrate these Strong Safety Guarantees.** + ## Large Scale Framework To safely run large scale Framework, i.e. the total task number in a single Framework is greater than 300, you just need to enable the [LargeFrameworkCompression](../pkg/apis/frameworkcontroller/v1/config.go). However, you may also need to decompress the Framework by yourself. @@ -395,7 +733,7 @@ For a specific Task identified by {FrameworkName}-{TaskRoleName}-{TaskIndex}: - **ConsistencyGuarantee2**: - No instance of the Task is running if it is TaskAttemptCompleted, TaskCompleted or the whole Framework is deleted. + No instance of the Task is running if it is deleted (or does not exist), TaskAttemptCompleted, TaskCompleted or the whole Framework is deleted (or does not exist). For a specific Framework identified by {FrameworkName}: @@ -405,7 +743,7 @@ For a specific Framework identified by {FrameworkName}: - **ConsistencyGuarantee4**: - No instance of the Framework is running if it is FrameworkAttemptCompleted, FrameworkCompleted or the whole Framework is deleted. + No instance of the Framework is running if it is FrameworkAttemptCompleted, FrameworkCompleted or the whole Framework is deleted (or does not exist). #### How to achieve ConsistencyGuarantees @@ -462,11 +800,11 @@ See more in: ## Controller Extension ### FrameworkBarrier 1. [Usage](../pkg/barrier/barrier.go) -2. Example: [FrameworkBarrier Example](../example/framework/extension/frameworkbarrier.yaml), [TensorFlow Example](../example/framework/scenario/tensorflow), [etc](../example/framework/scenario). +2. Example: [FrameworkBarrier Example](../example/framework/extension/frameworkbarrier.yaml), [TensorFlow ParameterServer Training Example](../example/framework/scenario/tensorflow/ps), [etc](../example/framework/scenario). ### HiveDScheduler 1. [Usage](https://github.com/microsoft/hivedscheduler) -2. Example: [TensorFlow Example](../example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml), [etc](https://github.com/microsoft/hivedscheduler/example/request/design/request.yaml). +2. Example: [TensorFlow ParameterServer Training Example](../example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml), [etc](https://github.com/microsoft/hivedscheduler/blob/master/example/request/design/request.yaml). ## Best Practice [Best Practice](../pkg/apis/frameworkcontroller/v1/types.go) diff --git a/example/framework/basic/batchfailedpermanent.yaml b/example/framework/basic/batchfailedpermanent.yaml index d95c2787..eca68364 100644 --- a/example/framework/basic/batchfailedpermanent.yaml +++ b/example/framework/basic/batchfailedpermanent.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: true maxRetryCount: 1 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchfailedtransient.yaml b/example/framework/basic/batchfailedtransient.yaml index e8bc573d..6ae99023 100644 --- a/example/framework/basic/batchfailedtransient.yaml +++ b/example/framework/basic/batchfailedtransient.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: true maxRetryCount: 1 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchfailedtransientconflict.yaml b/example/framework/basic/batchfailedtransientconflict.yaml index 8acc2136..c511f261 100644 --- a/example/framework/basic/batchfailedtransientconflict.yaml +++ b/example/framework/basic/batchfailedtransientconflict.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchfailedunknown.yaml b/example/framework/basic/batchfailedunknown.yaml index 0c13fa7a..dc25cb07 100644 --- a/example/framework/basic/batchfailedunknown.yaml +++ b/example/framework/basic/batchfailedunknown.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: true maxRetryCount: 1 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchstatefulfailed.yaml b/example/framework/basic/batchstatefulfailed.yaml index 054b6e88..df14d0a9 100644 --- a/example/framework/basic/batchstatefulfailed.yaml +++ b/example/framework/basic/batchstatefulfailed.yaml @@ -18,6 +18,8 @@ spec: retryPolicy: fancyRetryPolicy: true maxRetryCount: 1 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchsucceeded.yaml b/example/framework/basic/batchsucceeded.yaml index 523d31c4..7e8a97c6 100644 --- a/example/framework/basic/batchsucceeded.yaml +++ b/example/framework/basic/batchsucceeded.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: true maxRetryCount: 1 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/batchwithservicesucceeded.yaml b/example/framework/basic/batchwithservicesucceeded.yaml index 3fc0aedb..e181cddd 100644 --- a/example/framework/basic/batchwithservicesucceeded.yaml +++ b/example/framework/basic/batchwithservicesucceeded.yaml @@ -18,6 +18,7 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never @@ -36,6 +37,7 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/basic/service.yaml b/example/framework/basic/service.yaml index 68326276..8a1b0fc6 100644 --- a/example/framework/basic/service.yaml +++ b/example/framework/basic/service.yaml @@ -7,17 +7,18 @@ spec: executionType: Start retryPolicy: fancyRetryPolicy: false - maxRetryCount: -1 + maxRetryCount: -2 taskRoles: - name: server - taskNumber: 1 + taskNumber: 3 frameworkAttemptCompletionPolicy: minFailedTaskCount: 1 minSucceededTaskCount: -1 task: retryPolicy: fancyRetryPolicy: false - maxRetryCount: -1 + maxRetryCount: -2 + podGracefulDeletionTimeoutSec: 600 pod: #metadata: # labels: @@ -31,10 +32,11 @@ spec: - containerPort: 80 --- # Post to {kubeApiServerAddress}/api/v1/namespaces/default/services/ +# Then the above nginx server can be accessed by load-balancer http://service:30170 apiVersion: v1 kind: Service metadata: - name: server + name: service spec: selector: # Using predefined labels @@ -42,6 +44,8 @@ spec: FC_TASKROLE_NAME: server # Also can use customized labels #app: server - ports: - - port: 80 type: NodePort + ports: + - targetPort: 80 + port: 30170 + nodePort: 30170 diff --git a/example/framework/basic/servicestateful.yaml b/example/framework/basic/servicestateful.yaml index 8ad4ac32..3ccedd6f 100644 --- a/example/framework/basic/servicestateful.yaml +++ b/example/framework/basic/servicestateful.yaml @@ -7,7 +7,7 @@ spec: executionType: Start retryPolicy: fancyRetryPolicy: false - maxRetryCount: -1 + maxRetryCount: -2 taskRoles: - name: serverstateful taskNumber: 3 @@ -17,9 +17,15 @@ spec: task: retryPolicy: fancyRetryPolicy: false - maxRetryCount: -1 + maxRetryCount: -2 + # Never force delete Pod as it may break the stateful service. + podGracefulDeletionTimeoutSec: null pod: spec: + # By leveraging below Headless Service, the Pod can be discovered by its DNS FQDN: + # {FC_TASKROLE_NAME}-{FC_TASK_INDEX}.{FC_FRAMEWORK_NAME}.{FC_FRAMEWORK_NAMESPACE}.svc.cluster.local + hostname: "{{FC_TASKROLE_NAME}}-{{FC_TASK_INDEX}}" + subdomain: "{{FC_FRAMEWORK_NAME}}" # Using Never restartPolicy, ActiveDeadlineSeconds and initContainers # just to demonstrate consistent identities in this example. restartPolicy: Never @@ -33,18 +39,34 @@ spec: - name: ubuntu image: ubuntu:trusty # See comments in batchstatefulfailed.yaml - command: ["sh", "-c", "printenv"] + command: [ + "sh", "-c", + "printenv && + getent ahosts && + echo Local DNS FQDN Resolved: $(hostname -f) && + PEER_FQDN=${FC_TASKROLE_NAME}-$((2-${FC_TASK_INDEX})).${FC_FRAMEWORK_NAME}.${FC_FRAMEWORK_NAMESPACE}.svc.cluster.local && + until getent ahosts ${PEER_FQDN}; + do echo Waiting Peer DNS FQDN Resolved: ${PEER_FQDN}; sleep 3; + done; + echo Peer DNS FQDN Resolved: ${PEER_FQDN}"] --- # Post to {kubeApiServerAddress}/api/v1/namespaces/default/services/ +# It is a Headless Service which does not create a load-balancer, but still +# creates DNS records for each Pod of above Framework, so that they can resolve +# each other's IP. apiVersion: v1 kind: Service metadata: - name: serverstateful + name: servicestateful spec: + # Headless Service + clusterIP: None + # Create DNS records regardless of Pod readiness, to simply peer discovery. + publishNotReadyAddresses: true selector: # See comments in service.yaml FC_FRAMEWORK_NAME: servicestateful FC_TASKROLE_NAME: serverstateful ports: - - port: 80 - type: NodePort + - targetPort: 80 + port: 30180 diff --git a/example/framework/extension/frameworkbarrier.yaml b/example/framework/extension/frameworkbarrier.yaml index b1cb6eeb..33b96c2e 100644 --- a/example/framework/extension/frameworkbarrier.yaml +++ b/example/framework/extension/frameworkbarrier.yaml @@ -23,6 +23,7 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never @@ -96,6 +97,7 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + podGracefulDeletionTimeoutSec: 600 pod: spec: restartPolicy: Never diff --git a/example/framework/scenario/pytorch/README.md b/example/framework/scenario/pytorch/README.md new file mode 100644 index 00000000..0bf2c5d4 --- /dev/null +++ b/example/framework/scenario/pytorch/README.md @@ -0,0 +1,9 @@ +# PyTorch On FrameworkController + +## Feature +1. Support [almost all kinds of PyTorch applications](../../../../README.md#microsoft-openpai-frameworkcontroller), including [PyTorch Elastic Training](elastic) +2. [Common Feature](../../../../README.md#Feature) + +## Quick Start +1. [Common Quick Start](../../../../README.md#Quick-Start) +2. [PyTorch Elastic Training Example](elastic) diff --git a/example/framework/scenario/pytorch/elastic/README.md b/example/framework/scenario/pytorch/elastic/README.md new file mode 100644 index 00000000..a0cbc354 --- /dev/null +++ b/example/framework/scenario/pytorch/elastic/README.md @@ -0,0 +1,585 @@ +# [PyTorch Elastic Training](https://pytorch.org/elastic) On FrameworkController + +## Feature +1. Support to [ScaleUp/ScaleDown with Strong Safety Guarantee](../../../../../doc/user-manual.md#FrameworkRescale) +2. Support to use whole cluster shared etcd or per-application dedicated etcd. If latter is used, the etcd will be automatically cleaned up when the whole FrameworkAttempt is completed +3. [Common Feature](../../../../../README.md#Feature) + +## Prerequisite +1. Need to setup [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service) +2. Need to setup [Kubernetes GPU Device Plugin](https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus) for at least 4 NVIDIA GPUs +3. Need to setup [Kubernetes Cluster-Level Logging](https://kubernetes.io/docs/concepts/cluster-administration/logging), if you need to persist and expose the log for deleted Pod + +## ImageNet Example +1. Create Service for etcd as below, so that etcd can be discovered by training workers: +```yaml +apiVersion: v1 +kind: Service +metadata: + name: pet-etcd +spec: + selector: + FC_FRAMEWORK_NAME: pet + FC_TASKROLE_NAME: etcd + ports: + - targetPort: 2379 + port: 2379 +``` +2. Create Service for training worker as below, so that training workers can be discovered by each other: +```yaml +# See comments in ./example/framework/basic/servicestateful.yaml +apiVersion: v1 +kind: Service +metadata: + name: pet-worker +spec: + clusterIP: None + publishNotReadyAddresses: true + selector: + FC_FRAMEWORK_NAME: pet + FC_TASKROLE_NAME: worker +``` +3. [Create Framework](../../../../../doc/user-manual.md#CREATE_Framework) for training as below, and wait until all Tasks are AttemptRunning: +```yaml +apiVersion: frameworkcontroller.microsoft.com/v1 +kind: Framework +metadata: + name: pet +spec: + executionType: Start + retryPolicy: + fancyRetryPolicy: true + maxRetryCount: 2 + taskRoles: + - name: etcd + taskNumber: 1 + frameworkAttemptCompletionPolicy: + minFailedTaskCount: 1 + minSucceededTaskCount: -1 + task: + # Always retry etcd if there is only etcd failure + retryPolicy: + fancyRetryPolicy: false + maxRetryCount: -2 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 + pod: + spec: + restartPolicy: Always + containers: + - name: etcd + image: quay.io/coreos/etcd:v3.4.9 + command: [ + "sh", "-c", + "/usr/local/bin/etcd + --data-dir /var/lib/etcd --enable-v2 + --listen-client-urls http://0.0.0.0:2379 + --advertise-client-urls http://0.0.0.0:2379 + --initial-cluster-state new"] + ports: + - containerPort: 2379 + - name: worker + # Should within torchelastic nnodes range. + taskNumber: 4 + # As exit barrier is not yet supported in below torchelastic image, it is better + # to still wait until all workers succeeded and only until then succeed the + # whole training. + frameworkAttemptCompletionPolicy: + minFailedTaskCount: 1 + minSucceededTaskCount: 4 + task: + retryPolicy: + fancyRetryPolicy: true + maxRetryCount: 2 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 + pod: + spec: + # See comments in ./example/framework/basic/servicestateful.yaml + hostname: "{{FC_TASKROLE_NAME}}-{{FC_TASK_INDEX}}" + subdomain: "{{FC_FRAMEWORK_NAME}}-{{FC_TASKROLE_NAME}}" + restartPolicy: Never + containers: + - name: pytorch + # Using official image to demonstrate this example. + # The imagenet example does not require a distributed shared file system to broadcast checkpoint: + # https://github.com/pytorch/elastic/blob/45dc33f3eca1344fe1fd84634fb0d62767822f3e/examples/imagenet/main.py#L336-L343 + image: torchelastic/examples:0.2.0 + command: [ + "sh", "-c", + "python -m torchelastic.distributed.launch + --rdzv_backend=etcd + --rdzv_endpoint=${FC_FRAMEWORK_NAME}-etcd:2379 + --rdzv_id=${FC_FRAMEWORK_NAME} + --nnodes=1:4 + --nproc_per_node=1 + /workspace/examples/imagenet/main.py + --arch=resnet18 --batch-size=32 --epochs=2 + /workspace/data/tiny-imagenet-200"] + resources: + limits: + # Should equal to torchelastic nproc_per_node. + nvidia.com/gpu: 1 + volumeMounts: + # Mount shared memory otherwise pytorch data loaders may be OOM. + - name: shm-volume + mountPath: /dev/shm + volumes: + - name: shm-volume + emptyDir: + medium: Memory +``` +4. All workers will train the model, with log like below: +```text +[INFO] 2020-07-31 06:52:44,968 launch: Running torchelastic.distributed.launch with args: ['/opt/conda/lib/python3.7/site-packages/torchelastic/distributed/launch.py', '--rdzv_backend=etcd', '--rdzv_endpoint=pet-etcd:2379', '--rdzv_id=pet', '--nnodes=1:4', '--nproc_per_node=1', '/workspace/examples/imagenet/main.py', '--arch=resnet18', '--batch-size=32', '--epochs=2', '/workspace/data/tiny-imagenet-200'] +INFO 2020-07-31 06:52:44,976 Etcd machines: ['http://0.0.0.0:2379'] +[INFO] 2020-07-31 06:52:44,985 launch: Using nproc_per_node=1. +[INFO] 2020-07-31 06:52:45,715 api: [default] starting workers for function: wrapper_fn +[INFO] 2020-07-31 06:52:45,715 api: [default] Rendezvous'ing worker group +INFO 2020-07-31 06:52:45,715 Attempting to join next rendezvous +INFO 2020-07-31 06:52:45,723 New rendezvous state created: {'status': 'joinable', 'version': '1', 'participants': []} +INFO 2020-07-31 06:52:45,739 Joined rendezvous version 1 as rank 0. Full state: {'status': 'joinable', 'version': '1', 'participants': [0]} +INFO 2020-07-31 06:52:45,739 Rank 0 is responsible for join last call. +INFO 2020-07-31 06:52:46,942 Rank 0 finished join last call. +INFO 2020-07-31 06:52:46,942 Waiting for remaining peers. +INFO 2020-07-31 06:52:46,943 All peers arrived. Confirming membership. +INFO 2020-07-31 06:52:46,954 Waiting for confirmations from all peers. +INFO 2020-07-31 06:52:47,064 Rendezvous version 1 is complete. Final state: {'status': 'final', 'version': '1', 'participants': [0, 1, 2, 3], 'keep_alives': ['/torchelastic/p2p/run_pet/rdzv/v_1/rank_0', '/torchelastic/p2p/run_pet/rdzv/v_1/rank_3', '/torchelastic/p2p/run_pet/rdzv/v_1/rank_2', '/torchelastic/p2p/run_pet/rdzv/v_1/rank_1'], 'num_workers_waiting': 0} +INFO 2020-07-31 06:52:47,064 Creating EtcdStore as the c10d::Store implementation +[INFO] 2020-07-31 06:52:47,071 api: [default] Rendezvous complete for workers. +Result: + restart_count=0 + group_rank=0 + group_world_size=4 + rank stride=1 + assigned global_ranks=[0] + master_addr=worker-0.pet-worker.default.svc.cluster.local + master_port=43429 + +[INFO] 2020-07-31 06:52:47,071 api: [default] Starting worker group +=> set cuda device = 0 +=> creating model: resnet18 +=> no workers have checkpoints, starting from epoch 0 +=> start_epoch: 0, best_acc1: 0 +Epoch: [0][ 0/782] Time 3.613 ( 3.613) Data 0.079 ( 0.079) Loss 7.0412e+00 (7.0412e+00) Acc@1 0.00 ( 0.00) Acc@5 0.00 ( 0.00) +Epoch: [0][ 10/782] Time 1.638 ( 1.849) Data 0.086 ( 0.300) Loss 5.7640e+00 (6.3097e+00) Acc@1 0.00 ( 0.00) Acc@5 0.00 ( 1.99) +...... +Test: [300/313] Time 0.122 ( 0.167) Loss 7.0159e+00 (7.0172e+00) Acc@1 0.00 ( 0.40) Acc@5 6.25 ( 1.43) +Test: [310/313] Time 0.139 ( 0.166) Loss 7.3541e+00 (7.0174e+00) Acc@1 0.00 ( 0.39) Acc@5 3.12 ( 1.43) + * Acc@1 0.390 Acc@5 1.420 +=> saved checkpoint for epoch 0 at /tmp/checkpoint.pth.tar +=> best model found at epoch 0 saving to /tmp/model_best.pth.tar +Epoch: [1][ 0/782] Time 6.522 ( 6.522) Data 0.052 ( 0.052) Loss 4.4326e+00 (4.4326e+00) Acc@1 3.12 ( 3.12) Acc@5 15.62 ( 15.62) +Epoch: [1][ 10/782] Time 1.427 ( 1.703) Data 0.045 ( 0.202) Loss 4.3480e+00 (4.4527e+00) Acc@1 0.00 ( 7.67) Acc@5 34.38 ( 22.73) +...... +``` +5. [ScaleDown Framework](../../../../../doc/user-manual.md#Add_Delete_Task): Decrease `worker` taskNumber and minSucceededTaskCount from 4 to 2 by below patch: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/1/name", + "value": "worker" + }, + { + "op": "replace", + "path": "/spec/taskRoles/1/taskNumber", + "value": 2 + }, + { + "op": "replace", + "path": "/spec/taskRoles/1/frameworkAttemptCompletionPolicy/minSucceededTaskCount", + "value": 2 + } +] +``` +6. Remaining workers `pet-worker-0`, `pet-worker-1` will re-rendezvous and recover from last epoch checkpoint, with log like below: +```text +...... +Epoch: [1][180/782] Time 1.186 ( 1.230) Data 0.095 ( 0.179) Loss 4.2262e+00 (4.3819e+00) Acc@1 9.38 ( 9.12) Acc@5 34.38 ( 25.57) +Epoch: [1][190/782] Time 1.580 ( 1.230) Data 0.782 ( 0.180) Loss 3.9395e+00 (4.3789e+00) Acc@1 9.38 ( 9.18) Acc@5 34.38 ( 25.59) +Traceback (most recent call last): + File "/workspace/examples/imagenet/main.py", line 603, in + main() + File "/workspace/examples/imagenet/main.py", line 188, in main + train(train_loader, model, criterion, optimizer, epoch, device_id, print_freq) + File "/workspace/examples/imagenet/main.py", line 471, in train + loss.backward() + File "/opt/conda/lib/python3.7/site-packages/torch/tensor.py", line 198, in backward + torch.autograd.backward(self, gradient, retain_graph, create_graph) + File "/opt/conda/lib/python3.7/site-packages/torch/autograd/__init__.py", line 100, in backward + allow_unreachable=True) # allow_unreachable flag +RuntimeError: NCCL error: unhandled system error, NCCL version 2.4.8 +[ERROR] 2020-07-31 07:15:34,783 local_elastic_agent: [default] Worker group failed +Traceback (most recent call last): + File "/opt/conda/lib/python3.7/site-packages/torchelastic/agent/server/local_elastic_agent.py", line 190, in _monitor_workers + if self._process_context.join(timeout=-1): + File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 119, in join + raise Exception(msg) +Exception: + +-- Process 0 terminated with the following error: +Traceback (most recent call last): + File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap + fn(i, *args) + File "/opt/conda/lib/python3.7/site-packages/torchelastic/agent/server/local_elastic_agent.py", line 79, in _wrap + ret = fn(*args) + File "/opt/conda/lib/python3.7/site-packages/torchelastic/distributed/launch.py", line 392, in wrapper_fn + raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd) +subprocess.CalledProcessError: Command '['/opt/conda/bin/python', '-u', '/workspace/examples/imagenet/main.py', '--arch=resnet18', '--batch-size=32', '--epochs=2', '/workspace/data/tiny-imagenet-200']' returned non-zero exit status 1. + +[INFO] 2020-07-31 07:15:34,785 api: [default] Worker group FAILED. 3/3 attempts left; will restart worker group +[INFO] 2020-07-31 07:15:34,785 api: [default] Stopping worker group +[INFO] 2020-07-31 07:15:34,785 api: [default] Rendezvous'ing worker group +INFO 2020-07-31 07:15:34,785 Attempting to join next rendezvous +INFO 2020-07-31 07:15:34,791 Observed existing rendezvous state: {'status': 'joinable', 'version': '2', 'participants': [0]} +INFO 2020-07-31 07:15:34,826 Joined rendezvous version 2 as rank 1. Full state: {'status': 'joinable', 'version': '2', 'participants': [0, 1]} +INFO 2020-07-31 07:15:34,826 Waiting for remaining peers. +INFO 2020-07-31 07:16:04,896 All peers arrived. Confirming membership. +INFO 2020-07-31 07:16:04,904 Waiting for confirmations from all peers. +INFO 2020-07-31 07:16:04,908 Rendezvous version 2 is complete. Final state: {'status': 'final', 'version': '2', 'participants': [0, 1], 'keep_alives': ['/torchelastic/p2p/run_pet/rdzv/v_2/rank_1', '/torchelastic/p2p/run_pet/rdzv/v_2/rank_0'], 'num_workers_waiting': 0} +INFO 2020-07-31 07:16:04,908 Creating EtcdStore as the c10d::Store implementation +[INFO] 2020-07-31 07:16:04,915 api: [default] Rendezvous complete for workers. +Result: + restart_count=1 + group_rank=1 + group_world_size=2 + rank stride=1 + assigned global_ranks=[1] + master_addr=worker-1.pet-worker.default.svc.cluster.local + master_port=55787 + +[INFO] 2020-07-31 07:16:04,915 api: [default] Starting worker group +=> set cuda device = 0 +=> creating model: resnet18 +=> loading checkpoint file: /tmp/checkpoint.pth.tar +=> loaded checkpoint file: /tmp/checkpoint.pth.tar +=> using checkpoint from rank: 1, max_epoch: 0 +=> checkpoint broadcast size is: 93588276 +/opt/conda/conda-bld/pytorch_1587428398394/work/torch/csrc/utils/tensor_numpy.cpp:141: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. +=> done broadcasting checkpoint +=> done restoring from previous checkpoint +=> start_epoch: 1, best_acc1: 0.38999998569488525 +Epoch: [1][ 0/1563] Time 2.916 ( 2.916) Data 0.095 ( 0.095) Loss 4.3512e+00 (4.3512e+00) Acc@1 15.62 ( 15.62) Acc@5 21.88 ( 21.88) +Epoch: [1][ 10/1563] Time 0.650 ( 0.833) Data 0.043 ( 0.090) Loss 4.4603e+00 (4.4707e+00) Acc@1 12.50 ( 8.52) Acc@5 21.88 ( 20.45) +...... +``` +7. [ScaleUp Framework](../../../../../doc/user-manual.md#Add_Delete_Task): Increase `worker` taskNumber and minSucceededTaskCount from 2 to 3 by below patch: +```json +[ + { + "op": "test", + "path": "/spec/taskRoles/1/name", + "value": "worker" + }, + { + "op": "replace", + "path": "/spec/taskRoles/1/taskNumber", + "value": 3 + }, + { + "op": "replace", + "path": "/spec/taskRoles/1/frameworkAttemptCompletionPolicy/minSucceededTaskCount", + "value": 3 + } +] +``` +8. Augmented workers `pet-worker-0`, `pet-worker-1`, `pet-worker-2` will re-rendezvous and recover from last epoch checkpoint, with log like below: +```text +...... +Epoch: [1][1450/1563] Time 0.563 ( 0.783) Data 0.108 ( 0.177) Loss 4.0248e+00 (4.2794e+00) Acc@1 12.50 ( 10.54) Acc@5 37.50 ( 28.52) +Epoch: [1][1460/1563] Time 0.560 ( 0.783) Data 0.074 ( 0.179) Loss 4.5901e+00 (4.2787e+00) Acc@1 6.25 ( 10.55) Acc@5 18.75 ( 28.53) +[INFO] 2020-07-31 07:35:16,348 api: [default] Detected 1 new nodes from group_rank=1; will restart worker group +[INFO] 2020-07-31 07:35:16,349 api: [default] Stopping worker group +[INFO] 2020-07-31 07:35:21,306 api: [default] Rendezvous'ing worker group +INFO 2020-07-31 07:35:21,307 Attempting to join next rendezvous +INFO 2020-07-31 07:35:21,310 Observed existing rendezvous state: {'status': 'final', 'version': '2', 'participants': [0, 1], 'keep_alives': ['/torchelastic/p2p/run_pet/rdzv/v_2/rank_1', '/torchelastic/p2p/run_pet/rdzv/v_2/rank_0'], 'num_workers_waiting': 1} +INFO 2020-07-31 07:35:21,363 Announce self as waiting CAS unsuccessful, retrying +INFO 2020-07-31 07:35:21,411 Added self to waiting list. Rendezvous full state: {"status": "final", "version": "2", "participants": [0, 1], "keep_alives": ["/torchelastic/p2p/run_pet/rdzv/v_2/rank_1", "/torchelastic/p2p/run_pet/rdzv/v_2/rank_0"], "num_workers_waiting": 3} +INFO 2020-07-31 07:35:30,806 Keep-alive key /torchelastic/p2p/run_pet/rdzv/v_2/rank_1 is not renewed. +INFO 2020-07-31 07:35:30,807 Rendevous version 2 is incomplete. +INFO 2020-07-31 07:35:30,807 Attempting to destroy it. +INFO 2020-07-31 07:35:30,808 Rendezvous attempt failed, will retry. Reason: Key not found : /torchelastic/p2p/run_pet/rdzv/active_version +INFO 2020-07-31 07:35:31,810 Attempting to join next rendezvous +INFO 2020-07-31 07:35:31,813 Observed existing rendezvous state: {'status': 'joinable', 'version': '3', 'participants': [0]} +INFO 2020-07-31 07:35:31,867 Joined rendezvous version 3 as rank 1. Full state: {'status': 'joinable', 'version': '3', 'participants': [0, 1]} +INFO 2020-07-31 07:35:31,868 Waiting for remaining peers. +INFO 2020-07-31 07:36:01,831 All peers arrived. Confirming membership. +INFO 2020-07-31 07:36:01,882 Waiting for confirmations from all peers. +INFO 2020-07-31 07:36:01,915 Rendezvous version 3 is complete. Final state: {'status': 'final', 'version': '3', 'participants': [0, 1, 2], 'keep_alives': ['/torchelastic/p2p/run_pet/rdzv/v_3/rank_0', '/torchelastic/p2p/run_pet/rdzv/v_3/rank_1', '/torchelastic/p2p/run_pet/rdzv/v_3/rank_2'], 'num_workers_waiting': 0} +INFO 2020-07-31 07:36:01,915 Creating EtcdStore as the c10d::Store implementation +[INFO] 2020-07-31 07:36:01,919 api: [default] Rendezvous complete for workers. +Result: + restart_count=1 + group_rank=1 + group_world_size=3 + rank stride=1 + assigned global_ranks=[1] + master_addr=worker-1.pet-worker.default.svc.cluster.local + master_port=44823 + +[INFO] 2020-07-31 07:36:01,919 api: [default] Starting worker group +=> set cuda device = 0 +=> creating model: resnet18 +=> loading checkpoint file: /tmp/checkpoint.pth.tar +=> loaded checkpoint file: /tmp/checkpoint.pth.tar +=> using checkpoint from rank: 1, max_epoch: 0 +=> checkpoint broadcast size is: 93588276 +/opt/conda/conda-bld/pytorch_1587428398394/work/torch/csrc/utils/tensor_numpy.cpp:141: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. +=> done broadcasting checkpoint +=> done restoring from previous checkpoint +=> start_epoch: 1, best_acc1: 0.38999998569488525 +Epoch: [1][ 0/1042] Time 2.741 ( 2.741) Data 0.073 ( 0.073) Loss 4.7370e+00 (4.7370e+00) Acc@1 0.00 ( 0.00) Acc@5 15.62 ( 15.62) +Epoch: [1][ 10/1042] Time 0.732 ( 1.339) Data 0.045 ( 0.292) Loss 4.5558e+00 (4.4728e+00) Acc@1 3.12 ( 7.39) Acc@5 12.50 ( 23.86) +...... +``` +9. When training completed, all workers will succeed, with log like below: +```text +...... +Test: [300/313] Time 0.118 ( 0.179) Loss 8.1714e+00 (8.1763e+00) Acc@1 3.12 ( 1.15) Acc@5 6.25 ( 3.87) +Test: [310/313] Time 0.281 ( 0.177) Loss 8.7371e+00 (8.1767e+00) Acc@1 0.00 ( 1.14) Acc@5 6.25 ( 3.86) + * Acc@1 1.130 Acc@5 3.850 +=> saved checkpoint for epoch 1 at /tmp/checkpoint.pth.tar +=> best model found at epoch 1 saving to /tmp/model_best.pth.tar +[INFO] 2020-07-31 07:55:16,413 api: [default] All workers successfully finished. +``` +10. Finally, the whole Framework will succeed, with Status like below: +```yaml +apiVersion: frameworkcontroller.microsoft.com/v1 +kind: Framework +metadata: + name: pet + namespace: default + creationTimestamp: '2020-07-31T06:52:25Z' + generation: 43 + resourceVersion: '35492058' + selfLink: "/apis/frameworkcontroller.microsoft.com/v1/namespaces/default/frameworks/pet" + uid: c3f9e3a1-314d-4b5f-94b9-9287d15ac5d6 +spec: + executionType: Start + retryPolicy: + fancyRetryPolicy: true + maxRetryCount: 2 + taskRoles: + - name: etcd + taskNumber: 1 + frameworkAttemptCompletionPolicy: + minFailedTaskCount: 1 + minSucceededTaskCount: -1 + task: + retryPolicy: + fancyRetryPolicy: false + maxRetryCount: -2 + podGracefulDeletionTimeoutSec: 1800 + pod: + spec: + restartPolicy: Always + containers: + - name: etcd + image: quay.io/coreos/etcd:v3.4.9 + command: + - sh + - "-c" + - "/usr/local/bin/etcd --data-dir /var/lib/etcd --enable-v2 --listen-client-urls + http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --initial-cluster-state + new" + ports: + - containerPort: 2379 + - name: worker + taskNumber: 3 + frameworkAttemptCompletionPolicy: + minFailedTaskCount: 1 + minSucceededTaskCount: 3 + task: + retryPolicy: + fancyRetryPolicy: true + maxRetryCount: 2 + podGracefulDeletionTimeoutSec: 1800 + pod: + spec: + hostname: "{{FC_TASKROLE_NAME}}-{{FC_TASK_INDEX}}" + subdomain: "{{FC_FRAMEWORK_NAME}}-{{FC_TASKROLE_NAME}}" + restartPolicy: Never + containers: + - name: pytorch + image: torchelastic/examples:0.2.0 + command: + - sh + - "-c" + - python -m torchelastic.distributed.launch --rdzv_backend=etcd --rdzv_endpoint=${FC_FRAMEWORK_NAME}-etcd:2379 + --rdzv_id=${FC_FRAMEWORK_NAME} --nnodes=1:4 --nproc_per_node=1 /workspace/examples/imagenet/main.py + --arch=resnet18 --batch-size=32 --epochs=2 /workspace/data/tiny-imagenet-200 + resources: + limits: + nvidia.com/gpu: '1' + volumeMounts: + - name: shm-volume + mountPath: "/dev/shm" + volumes: + - name: shm-volume + emptyDir: + medium: Memory +status: + startTime: '2020-07-31T06:52:25Z' + completionTime: '2020-07-31T07:56:14Z' + state: Completed + transitionTime: '2020-07-31T07:56:14Z' + retryPolicyStatus: + accountableRetriedCount: 0 + retryDelaySec: + totalRetriedCount: 0 + attemptStatus: + id: 0 + startTime: '2020-07-31T06:52:25Z' + runTime: '2020-07-31T06:52:30Z' + completionTime: '2020-07-31T07:56:14Z' + instanceUID: 0_da3b7866-d1b2-45be-8222-ff85ac67ef23 + configMapName: pet-attempt + configMapUID: da3b7866-d1b2-45be-8222-ff85ac67ef23 + completionStatus: + code: 0 + phrase: Succeeded + type: + attributes: [] + name: Succeeded + diagnostics: Pod succeeded + trigger: + message: SucceededTaskCount 3 has reached MinSucceededTaskCount 3 in the TaskRole + taskRoleName: worker + taskIndex: 1 + taskRoleStatuses: + - name: etcd + podGracefulDeletionTimeoutSec: 1800 + taskStatuses: + - index: 0 + startTime: '2020-07-31T06:52:25Z' + completionTime: '2020-07-31T07:56:14Z' + state: Completed + transitionTime: '2020-07-31T07:56:14Z' + deletionPending: false + retryPolicyStatus: + accountableRetriedCount: 0 + retryDelaySec: + totalRetriedCount: 0 + attemptStatus: + id: 0 + startTime: '2020-07-31T06:52:25Z' + runTime: '2020-07-31T06:52:30Z' + completionTime: '2020-07-31T07:56:13Z' + instanceUID: 0_7870a450-4eb4-4596-a0d2-be5c6727de03 + podName: pet-etcd-0 + podUID: 7870a450-4eb4-4596-a0d2-be5c6727de03 + podNodeName: node11 + podIP: 10.207.128.5 + podHostIP: 10.151.40.232 + completionStatus: + code: -220 + phrase: FrameworkAttemptCompletion + type: + attributes: + - Permanent + name: Failed + diagnostics: Stop to complete current FrameworkAttempt + - name: worker + podGracefulDeletionTimeoutSec: 1800 + taskStatuses: + - index: 0 + startTime: '2020-07-31T06:52:25Z' + completionTime: '2020-07-31T07:55:18Z' + state: Completed + transitionTime: '2020-07-31T07:55:18Z' + deletionPending: false + retryPolicyStatus: + accountableRetriedCount: 0 + retryDelaySec: + totalRetriedCount: 0 + attemptStatus: + id: 0 + startTime: '2020-07-31T06:52:25Z' + runTime: '2020-07-31T06:52:30Z' + completionTime: '2020-07-31T07:55:18Z' + instanceUID: 0_a550a22a-6371-4ac3-991b-fc5957fb0dac + podName: pet-worker-0 + podUID: a550a22a-6371-4ac3-991b-fc5957fb0dac + podNodeName: node9 + podIP: 10.204.128.1 + podHostIP: 10.151.40.230 + completionStatus: + code: 0 + phrase: Succeeded + type: + attributes: [] + name: Succeeded + diagnostics: Pod succeeded + pod: + containers: + - code: 0 + name: pytorch + reason: Completed + - index: 1 + startTime: '2020-07-31T06:52:25Z' + completionTime: '2020-07-31T07:55:30Z' + state: Completed + transitionTime: '2020-07-31T07:55:30Z' + deletionPending: false + retryPolicyStatus: + accountableRetriedCount: 0 + retryDelaySec: + totalRetriedCount: 0 + attemptStatus: + id: 0 + startTime: '2020-07-31T06:52:25Z' + runTime: '2020-07-31T06:52:31Z' + completionTime: '2020-07-31T07:55:29Z' + instanceUID: 0_6213facb-d11d-416d-9530-32adeb708439 + podName: pet-worker-1 + podUID: 6213facb-d11d-416d-9530-32adeb708439 + podNodeName: node10 + podIP: 10.201.0.2 + podHostIP: 10.151.40.231 + completionStatus: + code: 0 + phrase: Succeeded + type: + attributes: [] + name: Succeeded + diagnostics: Pod succeeded + pod: + containers: + - code: 0 + name: pytorch + reason: Completed + - index: 2 + startTime: '2020-07-31T07:34:55Z' + completionTime: '2020-07-31T07:55:22Z' + state: Completed + transitionTime: '2020-07-31T07:55:22Z' + deletionPending: false + retryPolicyStatus: + accountableRetriedCount: 0 + retryDelaySec: + totalRetriedCount: 0 + attemptStatus: + id: 0 + startTime: '2020-07-31T07:34:55Z' + runTime: '2020-07-31T07:34:58Z' + completionTime: '2020-07-31T07:55:22Z' + instanceUID: 0_d12681fc-986d-4f36-95f3-f0edde5750ca + podName: pet-worker-2 + podUID: d12681fc-986d-4f36-95f3-f0edde5750ca + podNodeName: node6 + podIP: 10.202.128.6 + podHostIP: 10.151.40.227 + completionStatus: + code: 0 + phrase: Succeeded + type: + attributes: [] + name: Succeeded + diagnostics: Pod succeeded + pod: + containers: + - code: 0 + name: pytorch + reason: Completed +``` diff --git a/example/framework/scenario/tensorflow/README.md b/example/framework/scenario/tensorflow/README.md index 8f0a6858..1c094b95 100644 --- a/example/framework/scenario/tensorflow/README.md +++ b/example/framework/scenario/tensorflow/README.md @@ -1,17 +1,9 @@ # TensorFlow On FrameworkController ## Feature -1. Support both GPU and CPU Distributed Training -2. Automatically clean up PS when the whole FrameworkAttempt is completed -3. No need to adjust existing TensorFlow image -4. No need to setup [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service) and [Kubernetes Service](https://kubernetes.io/docs/concepts/services-networking/service) -5. [Common Feature](../../../../README.md#Feature) - -## Prerequisite -1. See `[PREREQUISITE]` in each specific Framework yaml file. -2. Need to setup [Kubernetes Cluster-Level Logging](https://kubernetes.io/docs/concepts/cluster-administration/logging), if you need to persist and expose the log for deleted Pod. +1. Support [almost all kinds of TensorFlow applications](../../../../README.md#microsoft-openpai-frameworkcontroller), including [TensorFlow ParameterServer Training](ps) +2. [Common Feature](../../../../README.md#Feature) ## Quick Start 1. [Common Quick Start](../../../../README.md#Quick-Start) -2. [CPU Example](cpu) -3. [GPU Example](gpu) +2. [TensorFlow ParameterServer Training Example](ps) diff --git a/example/framework/scenario/tensorflow/ps/README.md b/example/framework/scenario/tensorflow/ps/README.md new file mode 100644 index 00000000..e0cf475c --- /dev/null +++ b/example/framework/scenario/tensorflow/ps/README.md @@ -0,0 +1,17 @@ +# [TensorFlow ParameterServer Training](https://www.tensorflow.org/guide/distributed_training#parameterserverstrategy) On FrameworkController + +## Feature +1. Support both GPU and CPU Distributed Training +2. Automatically clean up PS when the whole FrameworkAttempt is completed +3. No need to adjust existing TensorFlow image +4. No need to setup [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service) and [Kubernetes Service](https://kubernetes.io/docs/concepts/services-networking/service) +5. [Common Feature](../../../../../README.md#Feature) + +## Prerequisite +1. See `[PREREQUISITE]` in each specific Framework yaml file +2. Need to setup [Kubernetes Cluster-Level Logging](https://kubernetes.io/docs/concepts/cluster-administration/logging), if you need to persist and expose the log for deleted Pod + +## Quick Start +1. [Common Quick Start](../../../../../README.md#Quick-Start) +2. [CPU Example](cpu) +3. [GPU Example](gpu) diff --git a/example/framework/scenario/tensorflow/cpu/build/Dockerfile b/example/framework/scenario/tensorflow/ps/cpu/build/Dockerfile similarity index 100% rename from example/framework/scenario/tensorflow/cpu/build/Dockerfile rename to example/framework/scenario/tensorflow/ps/cpu/build/Dockerfile diff --git a/example/framework/scenario/tensorflow/cpu/build/docker-build.sh b/example/framework/scenario/tensorflow/ps/cpu/build/docker-build.sh similarity index 100% rename from example/framework/scenario/tensorflow/cpu/build/docker-build.sh rename to example/framework/scenario/tensorflow/ps/cpu/build/docker-build.sh diff --git a/example/framework/scenario/tensorflow/cpu/tensorflowdistributedtrainingwithcpu.yaml b/example/framework/scenario/tensorflow/ps/cpu/tensorflowdistributedtrainingwithcpu.yaml similarity index 96% rename from example/framework/scenario/tensorflow/cpu/tensorflowdistributedtrainingwithcpu.yaml rename to example/framework/scenario/tensorflow/ps/cpu/tensorflowdistributedtrainingwithcpu.yaml index 2aff55ef..e10e000e 100644 --- a/example/framework/scenario/tensorflow/cpu/tensorflowdistributedtrainingwithcpu.yaml +++ b/example/framework/scenario/tensorflow/ps/cpu/tensorflowdistributedtrainingwithcpu.yaml @@ -23,6 +23,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: spec: restartPolicy: Never @@ -117,6 +119,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: spec: restartPolicy: Never diff --git a/example/framework/scenario/tensorflow/gpu/build/Dockerfile b/example/framework/scenario/tensorflow/ps/gpu/build/Dockerfile similarity index 100% rename from example/framework/scenario/tensorflow/gpu/build/Dockerfile rename to example/framework/scenario/tensorflow/ps/gpu/build/Dockerfile diff --git a/example/framework/scenario/tensorflow/gpu/build/docker-build.sh b/example/framework/scenario/tensorflow/ps/gpu/build/docker-build.sh similarity index 100% rename from example/framework/scenario/tensorflow/gpu/build/docker-build.sh rename to example/framework/scenario/tensorflow/ps/gpu/build/docker-build.sh diff --git a/example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml b/example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml similarity index 97% rename from example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml rename to example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml index 209e4ef2..b5a0e50c 100644 --- a/example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml +++ b/example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithdefaultscheduledgpu.yaml @@ -23,6 +23,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: spec: restartPolicy: Never @@ -123,6 +125,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: spec: restartPolicy: Never diff --git a/example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml b/example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml similarity index 97% rename from example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml rename to example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml index b4dfeb37..2bb76d6e 100644 --- a/example/framework/scenario/tensorflow/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml +++ b/example/framework/scenario/tensorflow/ps/gpu/tensorflowdistributedtrainingwithhivedscheduledgpu.yaml @@ -23,6 +23,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: metadata: annotations: @@ -143,6 +145,8 @@ spec: retryPolicy: fancyRetryPolicy: false maxRetryCount: 0 + # Large timeout to force delete Pod as it may break the stateful batch. + podGracefulDeletionTimeoutSec: 1800 pod: metadata: annotations: diff --git a/example/run/README.md b/example/run/README.md index 08f370cb..3af894d5 100644 --- a/example/run/README.md +++ b/example/run/README.md @@ -14,7 +14,7 @@ Notes: - This approach is better for production, since StatefulSet by itself provides [self-healing](https://kubernetes.io/docs/concepts/workloads/pods/pod/#durability-of-pods-or-lack-thereof) and can ensure [at most one instance](https://github.com/kubernetes/community/blob/ee8998b156031f6b363daade51ca2d12521f4ac0/contributors/design-proposals/storage/pod-safety.md) of FrameworkController is running at any point in time. - Using official image to demonstrate this example. -**Prerequisite** +### Prerequisite If the k8s cluster enforces [Authorization](https://kubernetes.io/docs/reference/access-authn-authz/authorization/#using-flags-for-your-authorization-module), you need to first create a [Service Account](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account) with granted permission for FrameworkController. For example, if the cluster enforces [RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/#kubectl-create-clusterrolebinding): ```shell @@ -24,14 +24,80 @@ kubectl create clusterrolebinding frameworkcontroller \ --user=system:serviceaccount:default:frameworkcontroller ``` -**Run** +### Run Run FrameworkController with above Service Account and the [k8s inClusterConfig](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod): + +#### Run with [default config](../../example/config/default/frameworkcontroller.yaml) ```shell -kubectl create -f frameworkcontroller.yaml +kubectl create -f frameworkcontroller-with-default-config.yaml +``` + +frameworkcontroller-with-default-config.yaml: +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: frameworkcontroller + namespace: default +spec: + serviceName: frameworkcontroller + selector: + matchLabels: + app: frameworkcontroller + replicas: 1 + template: + metadata: + labels: + app: frameworkcontroller + spec: + # Using the service account with granted permission + # if the k8s cluster enforces authorization. + serviceAccountName: frameworkcontroller + containers: + - name: frameworkcontroller + image: frameworkcontroller/frameworkcontroller + # Using k8s inClusterConfig, so usually, no need to specify + # KUBE_APISERVER_ADDRESS or KUBECONFIG + #env: + #- name: KUBE_APISERVER_ADDRESS + # value: {http[s]://host:port} + #- name: KUBECONFIG + # value: {Pod Local KubeConfig File Path} +``` + +#### Run with customized config +```shell +kubectl create -f frameworkcontroller-customized-config.yaml +kubectl create -f frameworkcontroller-with-customized-config.yaml +``` + +frameworkcontroller-customized-config.yaml: +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: frameworkcontroller-config + namespace: default +data: + frameworkcontroller.yaml: | + workerNumber: 20 + largeFrameworkCompression: true + frameworkCompletedRetainSec: 2592000 + #podFailureSpec: + #- code: 221 + # phrase: ContainerTensorflowOOMKilled + # type: + # attributes: [Permanent] + # podPatterns: + # - containers: + # - messageRegex: '(?msi)tensorflow.*ResourceExhaustedError.*OOM.*' + # codeRange: {min: 1} + # nameRegex: '(?ms).*' + #- {More customized podFailureSpec, better to also include these in the default config} ``` -frameworkcontroller.yaml: +frameworkcontroller-with-customized-config.yaml: ```yaml apiVersion: apps/v1 kind: StatefulSet @@ -62,13 +128,24 @@ spec: # value: {http[s]://host:port} #- name: KUBECONFIG # value: {Pod Local KubeConfig File Path} + command: [ + "bash", "-c", + "cp /frameworkcontroller-config/frameworkcontroller.yaml . && + ./start.sh"] + volumeMounts: + - name: frameworkcontroller-config + mountPath: /frameworkcontroller-config + volumes: + - name: frameworkcontroller-config + configMap: + name: frameworkcontroller-config ``` ## Run By Docker Container - This approach may be better for development sometimes. - Using official image to demonstrate this example. -**Run** +### Run If you have an insecure ApiServer address (can be got from [Insecure ApiServer](https://kubernetes.io/docs/reference/access-authn-authz/controlling-access/#api-server-ports-and-ips) or [kubectl proxy](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#using-kubectl-proxy)) which does not enforce authentication, you only need to provide the address: ```shell @@ -95,7 +172,7 @@ docker run -e KUBECONFIG=/mnt/.kube/config \ - This approach may be better for development sometimes. - Using local built binary distribution to demonstrate this example. -**Prerequisite** +### Prerequisite Ensure you have installed [Golang 1.12.6 or above](https://golang.org/doc/install#install) and the [${GOPATH}](https://golang.org/doc/code.html#GOPATH) is valid. @@ -109,7 +186,7 @@ cd ${PROJECT_DIR} ./build/frameworkcontroller/go-build.sh ``` -**Run** +### Run If you have an insecure ApiServer address (can be got from [Insecure ApiServer](https://kubernetes.io/docs/reference/access-authn-authz/controlling-access/#api-server-ports-and-ips) or [kubectl proxy](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#using-kubectl-proxy)) which does not enforce authentication, you only need to provide the address: ```shell diff --git a/pkg/apis/frameworkcontroller/v1/completion.go b/pkg/apis/frameworkcontroller/v1/completion.go index fb8f7f30..ea51d89a 100644 --- a/pkg/apis/frameworkcontroller/v1/completion.go +++ b/pkg/apis/frameworkcontroller/v1/completion.go @@ -71,6 +71,7 @@ const ( CompletionCodePodSpecPermanentError CompletionCode = -200 CompletionCodeStopFrameworkRequested CompletionCode = -210 CompletionCodeFrameworkAttemptCompletion CompletionCode = -220 + CompletionCodeDeleteTaskRequested CompletionCode = -230 // -3XX: Unknown Error CompletionCodePodFailedWithoutFailedContainer CompletionCode = -300 ) @@ -180,6 +181,12 @@ func initCompletionCodeInfos() { Type: CompletionType{CompletionTypeNameFailed, []CompletionTypeAttribute{CompletionTypeAttributePermanent}}, }, + { + Code: CompletionCodeDeleteTaskRequested.Ptr(), + Phrase: "DeleteTaskRequested", + Type: CompletionType{CompletionTypeNameFailed, + []CompletionTypeAttribute{CompletionTypeAttributePermanent}}, + }, { Code: CompletionCodePodFailedWithoutFailedContainer.Ptr(), Phrase: "PodFailedWithoutFailedContainer", @@ -530,7 +537,8 @@ func (re Regex) FindString(s string) *string { return &s } if loc := re.Regexp.FindStringIndex(s); loc != nil { - return common.PtrString(s[loc[0]:loc[1]]) + // Decouple string garbage collection + return common.PtrString(string([]byte(s[loc[0]:loc[1]]))) } return nil } diff --git a/pkg/apis/frameworkcontroller/v1/config.go b/pkg/apis/frameworkcontroller/v1/config.go index 7ba101cc..11a28174 100644 --- a/pkg/apis/frameworkcontroller/v1/config.go +++ b/pkg/apis/frameworkcontroller/v1/config.go @@ -148,6 +148,7 @@ type LogObjectSnapshot struct { type LogFrameworkSnapshot struct { OnTaskRetry *bool `yaml:"onTaskRetry"` OnFrameworkRetry *bool `yaml:"onFrameworkRetry"` + OnFrameworkRescale *bool `yaml:"onFrameworkRescale"` OnFrameworkDeletion *bool `yaml:"onFrameworkDeletion"` } @@ -245,6 +246,9 @@ func NewConfig() *Config { if c.LogObjectSnapshot.Framework.OnFrameworkRetry == nil { c.LogObjectSnapshot.Framework.OnFrameworkRetry = common.PtrBool(true) } + if c.LogObjectSnapshot.Framework.OnFrameworkRescale == nil { + c.LogObjectSnapshot.Framework.OnFrameworkRescale = common.PtrBool(true) + } if c.LogObjectSnapshot.Framework.OnFrameworkDeletion == nil { c.LogObjectSnapshot.Framework.OnFrameworkDeletion = common.PtrBool(true) } diff --git a/pkg/apis/frameworkcontroller/v1/funcs.go b/pkg/apis/frameworkcontroller/v1/funcs.go index 3d86c7fb..1e1b56f5 100644 --- a/pkg/apis/frameworkcontroller/v1/funcs.go +++ b/pkg/apis/frameworkcontroller/v1/funcs.go @@ -29,6 +29,7 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog" + "sort" "strconv" "strings" ) @@ -142,10 +143,75 @@ func GetAllContainerStatuses(pod *core.Pod) []core.ContainerStatus { pod.Status.ContainerStatuses...) } +func BindIDP( + selectorIDP TaskStatusSelectorIDP, + ignoreDeletionPending bool) TaskStatusSelector { + return func(taskStatus *TaskStatus) bool { + return selectorIDP(taskStatus, ignoreDeletionPending) + } +} + +func NewFailedTaskTriggeredCompletionStatus( + triggerTaskStatus *TaskStatus, + triggerTaskRoleName string, + failedTaskCount int32, + minFailedTaskCount int32) *FrameworkAttemptCompletionStatus { + return &FrameworkAttemptCompletionStatus{ + CompletionStatus: triggerTaskStatus.AttemptStatus.CompletionStatus.CompletionStatus, + Trigger: &CompletionPolicyTriggerStatus{ + Message: fmt.Sprintf( + "FailedTaskCount %v has reached MinFailedTaskCount %v in the TaskRole", + failedTaskCount, minFailedTaskCount), + TaskRoleName: triggerTaskRoleName, + TaskIndex: triggerTaskStatus.Index, + }, + } +} + +func NewSucceededTaskTriggeredCompletionStatus( + triggerTaskStatus *TaskStatus, + triggerTaskRoleName string, + succeededTaskCount int32, + minSucceededTaskCount int32) *FrameworkAttemptCompletionStatus { + return &FrameworkAttemptCompletionStatus{ + CompletionStatus: triggerTaskStatus.AttemptStatus.CompletionStatus.CompletionStatus, + Trigger: &CompletionPolicyTriggerStatus{ + Message: fmt.Sprintf( + "SucceededTaskCount %v has reached MinSucceededTaskCount %v in the TaskRole", + succeededTaskCount, minSucceededTaskCount), + TaskRoleName: triggerTaskRoleName, + TaskIndex: triggerTaskStatus.Index, + }, + } +} + +func NewCompletedTaskTriggeredCompletionStatus( + triggerTaskStatus *TaskStatus, + triggerTaskRoleName string, + completedTaskCount int32, + totalTaskCount int32) *FrameworkAttemptCompletionStatus { + diag := fmt.Sprintf( + "CompletedTaskCount %v has reached TotalTaskCount %v and no user specified "+ + "conditions in FrameworkAttemptCompletionPolicy have ever been triggered", + completedTaskCount, totalTaskCount) + if triggerTaskStatus == nil { + return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, nil) + } else { + return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, + &CompletionPolicyTriggerStatus{ + Message: diag, + TaskRoleName: triggerTaskRoleName, + TaskIndex: triggerTaskStatus.Index, + }, + ) + } +} + /////////////////////////////////////////////////////////////////////////////////////// // Interfaces /////////////////////////////////////////////////////////////////////////////////////// -type TaskStatusSelector func(*TaskStatus) bool +type TaskStatusSelector func(taskStatus *TaskStatus) bool +type TaskStatusSelectorIDP func(taskStatus *TaskStatus, ignoreDeletionPending bool) bool /////////////////////////////////////////////////////////////////////////////////////// // Spec Read Methods @@ -154,15 +220,38 @@ func (f *Framework) Key() string { return f.Namespace + "/" + f.Name } -func (f *Framework) TaskRoleSpec(taskRoleName string) *TaskRoleSpec { +// Return nil if and only if TaskRoleSpec is deleted while the TaskRole's +// TaskRoleStatus still exist due to graceful deletion. +func (f *Framework) GetTaskRoleSpec(taskRoleName string) *TaskRoleSpec { for _, taskRole := range f.Spec.TaskRoles { if taskRole.Name == taskRoleName { return taskRole } } + return nil +} + +// Panic if and only if TaskRoleSpec is deleted while the TaskRole's +// TaskRoleStatus still exist due to graceful deletion. +func (f *Framework) TaskRoleSpec(taskRoleName string) *TaskRoleSpec { + if taskRole := f.GetTaskRoleSpec(taskRoleName); taskRole != nil { + return taskRole + } panic(fmt.Errorf("[%v]: TaskRole is not found in Spec", taskRoleName)) } +func (f *Framework) GetTaskCountSpec() int32 { + taskCount := int32(0) + for _, taskRole := range f.Spec.TaskRoles { + taskCount += taskRole.TaskNumber + } + return taskCount +} + +func (f *Framework) GetTotalTaskCountSpec() int32 { + return f.GetTaskCountSpec() +} + /////////////////////////////////////////////////////////////////////////////////////// // Status Read Methods /////////////////////////////////////////////////////////////////////////////////////// @@ -210,28 +299,49 @@ func (f *Framework) TaskRoleStatuses() []*TaskRoleStatus { return f.Status.AttemptStatus.TaskRoleStatuses } -func (f *Framework) TaskRoleStatus(taskRoleName string) *TaskRoleStatus { +func (f *Framework) GetTaskRoleStatus(taskRoleName string) *TaskRoleStatus { for _, taskRoleStatus := range f.TaskRoleStatuses() { if taskRoleStatus.Name == taskRoleName { return taskRoleStatus } } + return nil +} + +func (f *Framework) TaskRoleStatus(taskRoleName string) *TaskRoleStatus { + if taskRoleStatus := f.GetTaskRoleStatus(taskRoleName); taskRoleStatus != nil { + return taskRoleStatus + } panic(fmt.Errorf("[%v]: TaskRole is not found in Status", taskRoleName)) } -func (f *Framework) TaskStatus(taskRoleName string, taskIndex int32) *TaskStatus { +func (f *Framework) GetTaskStatus(taskRoleName string, taskIndex int32) *TaskStatus { taskRoleStatus := f.TaskRoleStatus(taskRoleName) if 0 <= taskIndex && taskIndex < int32(len(taskRoleStatus.TaskStatuses)) { return taskRoleStatus.TaskStatuses[taskIndex] } + return nil +} + +func (f *Framework) TaskStatus(taskRoleName string, taskIndex int32) *TaskStatus { + if taskStatus := f.GetTaskStatus(taskRoleName, taskIndex); taskStatus != nil { + return taskStatus + } panic(fmt.Errorf("[%v][%v]: Task is not found in Status", taskRoleName, taskIndex)) } +func (ts *TaskStatus) IsDeletionPendingIgnored(ignoreDeletionPending bool) bool { + return ts.DeletionPending && ignoreDeletionPending +} + func (f *Framework) IsCompleted() bool { return f.Status.State == FrameworkCompleted } -func (ts *TaskStatus) IsCompleted() bool { +func (ts *TaskStatus) IsCompleted(ignoreDeletionPending bool) bool { + if ts.IsDeletionPendingIgnored(ignoreDeletionPending) { + return false + } return ts.State == TaskCompleted } @@ -239,7 +349,10 @@ func (f *Framework) IsRunning() bool { return f.Status.State == FrameworkAttemptRunning } -func (ts *TaskStatus) IsRunning() bool { +func (ts *TaskStatus) IsRunning(ignoreDeletionPending bool) bool { + if ts.IsDeletionPendingIgnored(ignoreDeletionPending) { + return false + } return ts.State == TaskAttemptRunning } @@ -249,7 +362,10 @@ func (f *Framework) IsCompleting() bool { f.Status.State == FrameworkAttemptDeleting } -func (ts *TaskStatus) IsCompleting() bool { +func (ts *TaskStatus) IsCompleting(ignoreDeletionPending bool) bool { + if ts.IsDeletionPendingIgnored(ignoreDeletionPending) { + return false + } return ts.State == TaskAttemptDeletionPending || ts.State == TaskAttemptDeletionRequested || ts.State == TaskAttemptDeleting @@ -259,50 +375,72 @@ func (f *Framework) IsSucceeded() bool { return f.IsCompleted() && f.CompletionType().IsSucceeded() } -func (ts *TaskStatus) IsSucceeded() bool { - return ts.IsCompleted() && ts.CompletionType().IsSucceeded() +func (ts *TaskStatus) IsSucceeded(ignoreDeletionPending bool) bool { + return ts.IsCompleted(ignoreDeletionPending) && ts.CompletionType().IsSucceeded() } func (f *Framework) IsFailed() bool { return f.IsCompleted() && f.CompletionType().IsFailed() } -func (ts *TaskStatus) IsFailed() bool { - return ts.IsCompleted() && ts.CompletionType().IsFailed() +func (ts *TaskStatus) IsFailed(ignoreDeletionPending bool) bool { + return ts.IsCompleted(ignoreDeletionPending) && ts.CompletionType().IsFailed() } -func (trs *TaskRoleStatus) GetTaskStatuses(selector TaskStatusSelector) []*TaskStatus { - if selector == nil { - return trs.TaskStatuses +func (trs *TaskRoleStatus) CompletionTimeOrderedTaskStatus( + selector TaskStatusSelector, orderIndex int32) *TaskStatus { + orderedTasks := trs.GetTaskStatuses(selector) + sort.SliceStable(orderedTasks, func(i, j int) bool { + return orderedTasks[i].CompletionTime.Before(orderedTasks[j].CompletionTime) + }) + + if 0 <= orderIndex && orderIndex < int32(len(orderedTasks)) { + return orderedTasks[orderIndex] } + panic(fmt.Errorf( + "Task orderIndex %v is not found in CompletionTime orderedTasks: %v", + orderIndex, orderedTasks)) +} +func (trs *TaskRoleStatus) GetTaskStatuses(selector TaskStatusSelector) []*TaskStatus { taskStatuses := []*TaskStatus{} for _, taskStatus := range trs.TaskStatuses { - if selector(taskStatus) { + if selector == nil || selector(taskStatus) { taskStatuses = append(taskStatuses, taskStatus) } } return taskStatuses } -func (trs *TaskRoleStatus) GetTaskCount(selector TaskStatusSelector) int32 { - return int32(len(trs.GetTaskStatuses(selector))) +func (trs *TaskRoleStatus) GetTaskCountStatus(selector TaskStatusSelector) int32 { + if selector == nil { + return int32(len(trs.TaskStatuses)) + } + + taskCount := int32(0) + for _, taskStatus := range trs.TaskStatuses { + if selector(taskStatus) { + taskCount++ + } + } + return taskCount } -func (f *Framework) GetTaskCount(selector TaskStatusSelector) int32 { +func (f *Framework) GetTaskCountStatus(selector TaskStatusSelector) int32 { taskCount := int32(0) for _, taskRoleStatus := range f.TaskRoleStatuses() { - taskCount += taskRoleStatus.GetTaskCount(selector) + taskCount += taskRoleStatus.GetTaskCountStatus(selector) } return taskCount } -func (f *Framework) AreAllTasksCompleted() bool { - return f.GetTaskCount((*TaskStatus).IsCompleted) == f.GetTaskCount(nil) +func (f *Framework) GetTotalTaskCountStatus() int32 { + return f.GetTaskCountStatus(nil) } -func (f *Framework) IsAnyTaskRunning() bool { - return f.GetTaskCount((*TaskStatus).IsRunning) > 0 +func (f *Framework) IsAnyTaskRunning(ignoreDeletionPending bool) bool { + return f.GetTaskCountStatus(BindIDP( + (*TaskStatus).IsRunning, ignoreDeletionPending)) > 0 } func (f *Framework) NewConfigMap() *core.ConfigMap { @@ -484,11 +622,12 @@ func (f *Framework) NewTaskRoleStatuses() []*TaskRoleStatus { func (f *Framework) NewTaskStatus(taskRoleName string, taskIndex int32) *TaskStatus { return &TaskStatus{ - Index: taskIndex, - StartTime: meta.Now(), - CompletionTime: nil, - State: TaskAttemptCreationPending, - TransitionTime: meta.Now(), + Index: taskIndex, + StartTime: meta.Now(), + CompletionTime: nil, + State: TaskAttemptCreationPending, + TransitionTime: meta.Now(), + DeletionPending: false, RetryPolicyStatus: RetryPolicyStatus{ TotalRetriedCount: 0, AccountableRetriedCount: 0, @@ -539,7 +678,8 @@ func (rp RetryPolicySpec) ShouldRetry( // 0. Built-in Always-on RetryPolicy if cs.Code == CompletionCodeStopFrameworkRequested || - cs.Code == CompletionCodeFrameworkAttemptCompletion { + cs.Code == CompletionCodeFrameworkAttemptCompletion || + cs.Code == CompletionCodeDeleteTaskRequested { return RetryDecision{false, true, 0, fmt.Sprintf( "CompletionCode is %v, %v", cs.Code, cs.Phrase)} } @@ -693,3 +833,18 @@ func (f *Framework) Decompress() error { return nil } + +func (ts *TaskStatus) MarkAsDeletionPending() (isNewDeletionPendingTask bool) { + if ts.DeletionPending { + return false + } + + ts.DeletionPending = true + if ts.AttemptStatus.CompletionStatus == nil { + ts.AttemptStatus.CompletionStatus = + CompletionCodeDeleteTaskRequested. + NewTaskAttemptCompletionStatus( + "User has requested to delete the Task by Framework ScaleDown", nil) + } + return true +} diff --git a/pkg/apis/frameworkcontroller/v1/types.go b/pkg/apis/frameworkcontroller/v1/types.go index 22793770..e5adf44e 100644 --- a/pkg/apis/frameworkcontroller/v1/types.go +++ b/pkg/apis/frameworkcontroller/v1/types.go @@ -48,6 +48,8 @@ type FrameworkList struct { // 5. With fine grained RetryPolicy for each Task and the whole Framework // 6. With fine grained FrameworkAttemptCompletionPolicy for each TaskRole // 7. With PodGracefulDeletionTimeoutSec for each Task to tune Consistency vs Availability +// 8. With fine grained Status for each TaskAttempt/Task, each TaskRole and the whole +// FrameworkAttempt/Framework // // Notes: // 1. Status field should only be modified by FrameworkController, and @@ -135,7 +137,8 @@ const ( // // Usage: // If the ExecutionType is ExecutionStop or -// the Task's FrameworkAttempt is completing, +// the Task's FrameworkAttempt is completing or +// the Task is DeletionPending (ScaleDown), // will not retry. // // If the FancyRetryPolicy is enabled, @@ -184,19 +187,20 @@ type RetryPolicySpec struct { // 1. If the ExecutionType is ExecutionStop, immediately complete the FrameworkAttempt, // regardless of any uncompleted Task, and the CompletionStatus is failed which // is not inherited from any Task. -// 2. If MinFailedTaskCount != -1 and MinFailedTaskCount <= failed Task count of +// 2. If MinFailedTaskCount >= 1 and MinFailedTaskCount <= failed Task count of // current TaskRole, immediately complete the FrameworkAttempt, regardless of // any uncompleted Task, and the CompletionStatus is failed which is inherited // from the Task which triggers the completion. -// 3. If MinSucceededTaskCount != -1 and MinSucceededTaskCount <= succeeded Task +// 3. If MinSucceededTaskCount >= 1 and MinSucceededTaskCount <= succeeded Task // count of current TaskRole, immediately complete the FrameworkAttempt, regardless // of any uncompleted Task, and the CompletionStatus is succeeded which is // inherited from the Task which triggers the completion. -// 4. If multiple above 1. and 2. conditions of all TaskRoles are satisfied at the -// same time, the behavior can be any one of these satisfied conditions. -// 5. If none of above 1. and 2. conditions of all TaskRoles are satisfied until all -// Tasks of the Framework are completed, immediately complete the FrameworkAttempt -// and the CompletionStatus is succeeded which is not inherited from any Task. +// 4. If multiple above conditions are satisfied at the same time, the behavior can +// be any one of these satisfied conditions. +// 5. If none of above conditions are satisfied until all Tasks of the Framework are +// completed (including a special case that the Framework does even not have any +// Task), immediately complete the FrameworkAttempt and the CompletionStatus is +// succeeded which is not inherited from any Task. // // Notes: // 1. When the FrameworkAttempt is completed, the FrameworkState is transitioned to @@ -279,6 +283,12 @@ type TaskRoleStatus struct { // TaskRoleName Name string `json:"name"` + // Effective and Backup PodGracefulDeletionTimeoutSec: + // It is the immediate backup of corresponding field in TaskRoleSpec.TaskSpec, + // in case the TaskRoleSpec is directly deleted later while the TaskRole's + // TaskRoleStatus still exist due to graceful deletion. + PodGracefulDeletionTimeoutSec *int64 `json:"podGracefulDeletionTimeoutSec"` + // Tasks with TaskIndex in range [0, TaskNumber) TaskStatuses []*TaskStatus `json:"taskStatuses"` } @@ -287,10 +297,16 @@ type TaskStatus struct { // TaskIndex Index int32 `json:"index"` - StartTime meta.Time `json:"startTime"` - CompletionTime *meta.Time `json:"completionTime"` - State TaskState `json:"state"` - TransitionTime meta.Time `json:"transitionTime"` + StartTime meta.Time `json:"startTime"` + CompletionTime *meta.Time `json:"completionTime"` + State TaskState `json:"state"` + TransitionTime meta.Time `json:"transitionTime"` + + // Task DeletionPending is caused by Framework ScaleDown. + // If a Task is DeletionPending, it is logically detached from its Framework + // immediately, and will be proactively but still gracefully completed and + // finally deleted. + DeletionPending bool `json:"deletionPending"` RetryPolicyStatus RetryPolicyStatus `json:"retryPolicyStatus"` AttemptStatus TaskAttemptStatus `json:"attemptStatus"` } @@ -381,6 +397,7 @@ type ContainerCompletionStatus struct { type TaskAttemptCompletionStatus struct { // Summary + // Must be not nil for TaskAttemptCompleted and TaskCompleted Task. *CompletionStatus `json:",inline"` // Detail Pod *PodCompletionStatus `json:"pod,omitempty"` @@ -394,6 +411,7 @@ type CompletionPolicyTriggerStatus struct { type FrameworkAttemptCompletionStatus struct { // Summary + // Must be not nil for FrameworkAttemptCompleted and FrameworkCompleted Framework. *CompletionStatus `json:",inline"` // Detail Trigger *CompletionPolicyTriggerStatus `json:"trigger,omitempty"` diff --git a/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go b/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go index a4c2428b..b75d2adc 100644 --- a/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go +++ b/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go @@ -497,6 +497,11 @@ func (in *LogFrameworkSnapshot) DeepCopyInto(out *LogFrameworkSnapshot) { *out = new(bool) **out = **in } + if in.OnFrameworkRescale != nil { + in, out := &in.OnFrameworkRescale, &out.OnFrameworkRescale + *out = new(bool) + **out = **in + } if in.OnFrameworkDeletion != nil { in, out := &in.OnFrameworkDeletion, &out.OnFrameworkDeletion *out = new(bool) @@ -855,6 +860,11 @@ func (in *TaskRoleSpec) DeepCopy() *TaskRoleSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TaskRoleStatus) DeepCopyInto(out *TaskRoleStatus) { *out = *in + if in.PodGracefulDeletionTimeoutSec != nil { + in, out := &in.PodGracefulDeletionTimeoutSec, &out.PodGracefulDeletionTimeoutSec + *out = new(int64) + **out = **in + } if in.TaskStatuses != nil { in, out := &in.TaskStatuses, &out.TaskStatuses *out = make([]*TaskStatus, len(*in)) diff --git a/pkg/barrier/barrier.go b/pkg/barrier/barrier.go index d3d3a5a2..0d5ad356 100644 --- a/pkg/barrier/barrier.go +++ b/pkg/barrier/barrier.go @@ -288,24 +288,37 @@ func (b *FrameworkBarrier) Run() { } func isBarrierPassed(f *ci.Framework) bool { + // Fully counting Tasks in f.Status against f.Spec, as FrameworkController may + // have not persist DeletionPending (ScaleDown) Tasks according to current + // f.Spec. totalTaskCount := int32(0) - for _, taskRole := range f.Spec.TaskRoles { - totalTaskCount += taskRole.TaskNumber - } - readyTaskCount := int32(0) - if f.Status != nil { - for _, taskRoleStatus := range f.TaskRoleStatuses() { - for _, taskStatus := range taskRoleStatus.TaskStatuses { - if isTaskReady(taskStatus) { - readyTaskCount++ - } + for _, taskRoleSpec := range f.Spec.TaskRoles { + taskRoleName := taskRoleSpec.Name + taskCountSpec := taskRoleSpec.TaskNumber + totalTaskCount += taskCountSpec + + if f.Status == nil { + continue + } + + taskRoleStatus := f.GetTaskRoleStatus(taskRoleName) + if taskRoleStatus == nil { + continue + } + + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + taskCountStatusAndSpec := common.MinInt32(taskCountStatus, taskCountSpec) + for taskIndex := int32(0); taskIndex < taskCountStatusAndSpec; taskIndex++ { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + if isTaskReady(taskStatus, true) { + readyTaskCount++ } } } // Wait until readyTaskCount is consistent with totalTaskCount. - if readyTaskCount == totalTaskCount { + if readyTaskCount >= totalTaskCount { klog.Infof("BarrierPassed: "+ "%v/%v Tasks are ready with not nil PodIP.", readyTaskCount, totalTaskCount) @@ -318,9 +331,11 @@ func isBarrierPassed(f *ci.Framework) bool { } } -func isTaskReady(taskStatus *ci.TaskStatus) bool { - return taskStatus.AttemptStatus.PodIP != nil && - *taskStatus.AttemptStatus.PodIP != "" +func isTaskReady(ts *ci.TaskStatus, ignoreDeletionPending bool) bool { + if ts.IsDeletionPendingIgnored(ignoreDeletionPending) { + return false + } + return ts.AttemptStatus.PodIP != nil && *ts.AttemptStatus.PodIP != "" } func dumpFramework(f *ci.Framework) { @@ -341,6 +356,8 @@ func getTaskRoleEnvName(taskRoleName string, suffix string) string { return strings.Join([]string{"FB", strings.ToUpper(taskRoleName), suffix}, "_") } +// All Tasks in f.Spec must be also included in f.Status as Ready, so inject from +// f.Status is enough. func generateInjector(f *ci.Framework) { var injector strings.Builder injector.WriteString("#!/bin/bash") @@ -357,17 +374,28 @@ func generateInjector(f *ci.Framework) { // {Task[TaskRole.TaskNumber-1].PodIP} injector.WriteString("\n") for _, taskRoleStatus := range f.TaskRoleStatuses() { - ipsEnvName := getTaskRoleEnvName(taskRoleStatus.Name, "IPS") + taskRoleName := taskRoleStatus.Name + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) + if taskRoleSpec == nil { + continue + } + + ipsEnvName := getTaskRoleEnvName(taskRoleName, "IPS") injector.WriteString("export " + ipsEnvName + "=") - for _, taskStatus := range taskRoleStatus.TaskStatuses { - taskIndex := taskStatus.Index + + taskCountSpec := taskRoleSpec.TaskNumber + taskCountStatusAndSpec := common.MinInt32(taskCountStatus, taskCountSpec) + for taskIndex := int32(0); taskIndex < taskCountStatusAndSpec; taskIndex++ { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + taskIP := *taskStatus.AttemptStatus.PodIP if taskIndex > 0 { injector.WriteString(",") } - taskIP := *taskStatus.AttemptStatus.PodIP injector.WriteString(taskIP) } + injector.WriteString("\n") injector.WriteString("echo " + ipsEnvName + "=${" + ipsEnvName + "}") injector.WriteString("\n") @@ -378,18 +406,29 @@ func generateInjector(f *ci.Framework) { // {Task[TaskRole.TaskNumber-1].PodIP}:${FB_{UpperCase({TaskRoleName})}_PORT} injector.WriteString("\n") for _, taskRoleStatus := range f.TaskRoleStatuses() { - addrsEnvName := getTaskRoleEnvName(taskRoleStatus.Name, "ADDRESSES") - portEnvName := getTaskRoleEnvName(taskRoleStatus.Name, "PORT") + taskRoleName := taskRoleStatus.Name + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) + if taskRoleSpec == nil { + continue + } + + addrsEnvName := getTaskRoleEnvName(taskRoleName, "ADDRESSES") + portEnvName := getTaskRoleEnvName(taskRoleName, "PORT") injector.WriteString("export " + addrsEnvName + "=") - for _, taskStatus := range taskRoleStatus.TaskStatuses { - taskIndex := taskStatus.Index + + taskCountSpec := taskRoleSpec.TaskNumber + taskCountStatusAndSpec := common.MinInt32(taskCountStatus, taskCountSpec) + for taskIndex := int32(0); taskIndex < taskCountStatusAndSpec; taskIndex++ { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + taskAddr := *taskStatus.AttemptStatus.PodIP + ":" + "${" + portEnvName + "}" if taskIndex > 0 { injector.WriteString(",") } - taskAddr := *taskStatus.AttemptStatus.PodIP + ":" + "${" + portEnvName + "}" injector.WriteString(taskAddr) } + injector.WriteString("\n") injector.WriteString("echo " + addrsEnvName + "=${" + addrsEnvName + "}") injector.WriteString("\n") diff --git a/pkg/common/utils.go b/pkg/common/utils.go index df67cddd..34895c39 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -53,6 +53,42 @@ func ReferPlaceholder(name string) string { return "{{" + name + "}}" } +func MinInt32(x, y int32) int32 { + if x < y { + return x + } else { + return y + } +} + +func MaxInt32(x, y int32) int32 { + if x > y { + return x + } else { + return y + } +} + +func EqualsPtrInt64(x, y *int64) bool { + if x == y { + return true + } else if x == nil { + return false + } else if y == nil { + return false + } else { + return *x == *y + } +} + +func DeepCopyInt64(o *int64) *int64 { + if o == nil { + return o + } else { + return PtrInt64(*o) + } +} + func PtrString(o string) *string { return &o } @@ -175,6 +211,14 @@ func RandInt64(min int64, max int64) int64 { return min + rand.Int63n(max-min+1) } +func SprintPtrInt32(obj *int32) string { + if obj == nil { + return fmt.Sprintf("%v", obj) + } else { + return fmt.Sprintf("%v", *obj) + } +} + func ToYaml(obj interface{}) string { yamlBytes, err := yaml.Marshal(obj) if err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2b6ba76f..170c72dc 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -662,15 +662,14 @@ func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck( } func (c *FrameworkController) enqueuePodGracefulDeletionTimeoutCheck( - f *ci.Framework, taskRoleName string, + f *ci.Framework, timeoutSec *int64, failIfTimeout bool, pod *core.Pod) bool { - taskSpec := f.TaskRoleSpec(taskRoleName).Task if pod.DeletionTimestamp == nil { return false } return c.enqueueFrameworkTimeoutCheck( - f, *internal.GetPodDeletionStartTime(pod), taskSpec.PodGracefulDeletionTimeoutSec, + f, *internal.GetPodDeletionStartTime(pod), timeoutSec, failIfTimeout, "PodGracefulDeletionTimeoutCheck") } @@ -708,13 +707,297 @@ func (c *FrameworkController) syncFrameworkStatus(f *ci.Framework) error { if f.Status == nil { f.Status = f.NewFrameworkStatus() + + // To ensure FrameworkAttemptCreationPending is persisted before creating + // its cm, we need to wait until next sync to create the cm, so manually + // enqueue a sync. + c.enqueueFrameworkSync(f, "FrameworkAttemptCreationPending") + klog.Infof(logPfx + "Waiting FrameworkAttemptCreationPending to be persisted") + return nil } else { - // TODO: Support Framework.Spec Update + if c.syncFrameworkScale(f) || c.compactFrameworkScale(f) { + // To ensure TaskAttemptCreationPending is persisted before creating + // its pod, we need to wait until next sync to create the pod, so manually + // enqueue a sync. + // To ensure the Task[DeletionPending] is persisted before deleting its pod + // or deleting/replacing its Task instance, we need to wait until next sync + // to delete its pod or delete/replace the Task instance, so manually enqueue + // a sync. + c.enqueueFrameworkSync(f, "TaskAttemptCreationPending/Task[DeletionPending]") + klog.Infof(logPfx + + "Waiting TaskAttemptCreationPending/Task[DeletionPending] to be persisted") + return nil + } + + if c.updatePodGracefulDeletionTimeoutSec(f) { + // To ensure PodGracefulDeletionTimeoutSec is persisted before gracefully + // delete any pod, we need to wait until next sync to gracefully delete, + // so manually enqueue a sync. + c.enqueueFrameworkSync(f, "Task[PodGracefulDeletionTimeoutSec][Changed]") + klog.Infof(logPfx + + "Waiting Task[PodGracefulDeletionTimeoutSec][Changed] to be persisted") + return nil + } } return c.syncFrameworkState(f) } +// Rescale not Completing/Completed Framework according to its current f.Spec. +// After this, all ScaleUp TaskRoles and Tasks are added, and all ScaleDown Tasks +// are marked as DeletionPending for later lazy graceful deletion, thus: +// 1. TaskRoles/Tasks in f.Status must fully contain TaskRoles/Tasks in f.Spec. +// 2. TaskRoles/Tasks in f.Spec must fully contain not DeletionPending (ScaleDown) +// TaskRoles/Tasks in f.Status. +// +// This helps to ensure the Rescale is effective immediately, as essentially, +// ScaleUp/ScaleDown is to setup/destroy the relationship between Framework and +// its TaskRoles/Tasks, which does not have to wait until, such as +// FrameworkAttemptInstance (ConfigMap) is created or any DeletionPending +// (ScaleDown) TaskAttemptInstance (Pod) is gracefully deleted. +func (c *FrameworkController) syncFrameworkScale( + f *ci.Framework) (producedNewPendingTask bool) { + logPfx := fmt.Sprintf("[%v]: syncFrameworkScale: ", f.Key()) + klog.Infof(logPfx + "Started") + defer func() { klog.Infof(logPfx + "Completed") }() + + producedNewPendingTask = false + + // No longer react to Rescale after the whole FrameworkAttempt Completing, + // to ensure DeletionPending (ScaleDown) Task will never trigger (impact) + // Framework/FrameworkAttempt completion. + if f.IsCompleting() || + f.Status.State == ci.FrameworkAttemptCompleted || + f.Status.State == ci.FrameworkCompleted { + klog.Infof(logPfx+"Skipped: Framework is already %v", f.Status.State) + return producedNewPendingTask + } + + for _, taskRoleSpec := range f.Spec.TaskRoles { + taskRoleName := taskRoleSpec.Name + taskCountSpec := taskRoleSpec.TaskNumber + taskRoleStatus := f.GetTaskRoleStatus(taskRoleName) + + if taskRoleStatus == nil { + // ScaleUp: Directly add TaskRole that need to bring up. + klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v", + f.Key(), taskRoleName, nil, taskCountSpec) + + trs := ci.TaskRoleStatus{Name: taskRoleName, TaskStatuses: []*ci.TaskStatus{}} + for taskIndex := int32(0); taskIndex < taskCountSpec; taskIndex++ { + trs.TaskStatuses = + append(trs.TaskStatuses, f.NewTaskStatus(taskRoleName, taskIndex)) + producedNewPendingTask = true + } + f.Status.AttemptStatus.TaskRoleStatuses = + append(f.Status.AttemptStatus.TaskRoleStatuses, &trs) + } else { + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + if taskCountStatus < taskCountSpec { + // ScaleUp: Directly add Task that need to bring up. + klog.Infof("[%v][%v]: syncFrameworkScale: ScaleUp: Goal: %v -> %v", + f.Key(), taskRoleName, taskCountStatus, taskCountSpec) + + for taskIndex := taskCountStatus; taskIndex < taskCountSpec; taskIndex++ { + taskRoleStatus.TaskStatuses = + append(taskRoleStatus.TaskStatuses, f.NewTaskStatus(taskRoleName, taskIndex)) + producedNewPendingTask = true + } + } else if taskCountStatus > taskCountSpec { + // ScaleDown: Just mark Task that need to bring down as DeletionPending. + klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v", + f.Key(), taskRoleName, taskCountStatus, taskCountSpec) + + for taskIndex := taskCountStatus - 1; taskIndex >= taskCountSpec; taskIndex-- { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + if taskStatus.MarkAsDeletionPending() { + producedNewPendingTask = true + } + } + } + } + } + + for _, taskRoleStatus := range f.TaskRoleStatuses() { + taskRoleName := taskRoleStatus.Name + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) + + if taskRoleSpec == nil { + // ScaleDown: Just mark Task that need to bring down as DeletionPending. + klog.Infof("[%v][%v]: syncFrameworkScale: ScaleDown: Goal: %v -> %v", + f.Key(), taskRoleName, taskCountStatus, nil) + + for taskIndex := taskCountStatus - 1; taskIndex >= 0; taskIndex-- { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + if taskStatus.MarkAsDeletionPending() { + producedNewPendingTask = true + } + } + } + } + + return producedNewPendingTask +} + +// Compact not Completing/Completed Framework scale by cleaning up its Completed +// DeletionPending TaskRoles/Tasks. +// It drives the Completed DeletionPending TaskRoles/Tasks to be deleted or +// replaced by new Task instance. +// Before calling it, ensure the Completed DeletionPending TaskRoles/Tasks has +// been persisted, so it is safe to also expose them as history snapshots here. +func (c *FrameworkController) compactFrameworkScale( + f *ci.Framework) (producedNewPendingTask bool) { + logPfx := fmt.Sprintf("[%v]: compactFrameworkScale: ", f.Key()) + klog.Infof(logPfx + "Started") + defer func() { klog.Infof(logPfx + "Completed") }() + + producedNewPendingTask = false + + // Align with syncFrameworkScale to simplify completing. + if f.IsCompleting() || + f.Status.State == ci.FrameworkAttemptCompleted || + f.Status.State == ci.FrameworkCompleted { + klog.Infof(logPfx+"Skipped: Framework is already %v", f.Status.State) + return producedNewPendingTask + } + + // For TaskRoles/Tasks which no longer belong to its current f.Spec, try to + // delete the Completed DeletionPending ones. + taskRoleStatuses := &f.Status.AttemptStatus.TaskRoleStatuses + for taskRoleIndex := len(*taskRoleStatuses) - 1; taskRoleIndex >= 0; taskRoleIndex-- { + taskRoleStatus := (*taskRoleStatuses)[taskRoleIndex] + taskRoleName := taskRoleStatus.Name + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + // Will delete Tasks in in range [taskIndexDeleteStart, taskCountStatus) + taskIndexDeleteStart := taskCountStatus + + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) + var taskCountSpec int32 + if taskRoleSpec == nil { + taskCountSpec = 0 + } else { + taskCountSpec = taskRoleSpec.TaskNumber + } + + for taskIndex := taskCountStatus - 1; taskIndex >= taskCountSpec; taskIndex-- { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + if taskStatus.DeletionPending && taskStatus.State == ci.TaskCompleted { + taskIndexDeleteStart = taskIndex + } else { + // Cannot continue graceful deletion anymore + break + } + } + + var newTaskCountStatus *int32 + if taskIndexDeleteStart == 0 && taskRoleSpec == nil { + // Delete the whole Completed DeletionPending TaskRole + newTaskCountStatus = nil + } else { + // Delete tail Completed DeletionPending Tasks + newTaskCountStatus = &taskIndexDeleteStart + } + + if newTaskCountStatus != nil && *newTaskCountStatus == taskCountStatus { + // Nothing can be deleted + continue + } + + // Start deletion + logSfx := "" + if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkRescale { + // Ensure the FrameworkSnapshot is exposed before the deletion. + logSfx = ci.GetFrameworkSnapshotLogTail(f) + } + klog.Info(fmt.Sprintf( + "[%v][%v]: compactFrameworkScale: ScaleDown: Deletion: %v -> %v", + f.Key(), taskRoleName, taskCountStatus, + common.SprintPtrInt32(newTaskCountStatus)) + logSfx) + + if newTaskCountStatus == nil { + taskRoleLastIndex := len(*taskRoleStatuses) - 1 + (*taskRoleStatuses)[taskRoleIndex] = (*taskRoleStatuses)[taskRoleLastIndex] + (*taskRoleStatuses)[taskRoleLastIndex] = nil + *taskRoleStatuses = (*taskRoleStatuses)[:taskRoleLastIndex] + } else { + for taskIndex := taskCountStatus - 1; taskIndex >= *newTaskCountStatus; taskIndex-- { + taskRoleStatus.TaskStatuses[taskIndex] = nil + } + taskRoleStatus.TaskStatuses = taskRoleStatus.TaskStatuses[:*newTaskCountStatus] + } + } + + // For TaskRoles/Tasks which still belong to its current f.Spec, replace all + // Completed DeletionPending ones with new Task instances. + for _, taskRoleStatus := range f.TaskRoleStatuses() { + taskRoleName := taskRoleStatus.Name + taskCountStatus := int32(len(taskRoleStatus.TaskStatuses)) + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) + + if taskRoleSpec != nil { + taskCountSpec := taskRoleSpec.TaskNumber + taskCountStatusAndSpec := common.MinInt32(taskCountStatus, taskCountSpec) + for taskIndex := taskCountStatusAndSpec - 1; taskIndex >= 0; taskIndex-- { + taskStatus := taskRoleStatus.TaskStatuses[taskIndex] + + if taskStatus.DeletionPending && taskStatus.State == ci.TaskCompleted { + // Replace the Completed DeletionPending Task with new instance + logSfx := "" + if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkRescale { + // Ensure the FrameworkSnapshot is exposed before the deletion. + logSfx = ci.GetFrameworkSnapshotLogTail(f) + } + klog.Info(fmt.Sprintf( + "[%v][%v][%v]: compactFrameworkScale: ScaleDown: Replacement", + f.Key(), taskRoleName, taskIndex) + logSfx) + + taskRoleStatus.TaskStatuses[taskIndex] = + f.NewTaskStatus(taskRoleName, taskIndex) + producedNewPendingTask = true + } + } + } + } + + return producedNewPendingTask +} + +func (c *FrameworkController) updatePodGracefulDeletionTimeoutSec( + f *ci.Framework) (changed bool) { + logPfx := fmt.Sprintf("[%v]: updatePodGracefulDeletionTimeoutSec: ", f.Key()) + klog.Infof(logPfx + "Started") + defer func() { klog.Infof(logPfx + "Completed") }() + + changed = false + + if f.Status.State == ci.FrameworkCompleted { + klog.Infof(logPfx+"Skipped: Framework is already %v", f.Status.State) + return changed + } + + for _, taskRoleSpec := range f.Spec.TaskRoles { + taskRoleName := taskRoleSpec.Name + taskRoleStatus := f.GetTaskRoleStatus(taskRoleName) + if taskRoleStatus == nil { + // Unreachable + continue + } + + if !common.EqualsPtrInt64( + taskRoleStatus.PodGracefulDeletionTimeoutSec, + taskRoleSpec.Task.PodGracefulDeletionTimeoutSec) { + taskRoleStatus.PodGracefulDeletionTimeoutSec = + common.DeepCopyInt64(taskRoleSpec.Task.PodGracefulDeletionTimeoutSec) + changed = true + } + } + + return changed +} + +// Sync Framework with other related objects. +// It also drives the DeletionPending TaskRoles/Tasks to be Completed. func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { logPfx := fmt.Sprintf("[%v]: syncFrameworkState: ", f.Key()) klog.Infof(logPfx + "Started") @@ -722,8 +1005,9 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { if f.Status.State == ci.FrameworkCompleted { if c.enqueueFrameworkCompletedRetainTimeoutCheck(f, true) { - klog.Infof(logPfx + "Skipped: Framework is already completed, " + - "and waiting to be deleted after FrameworkCompletedRetainSec") + klog.Infof(logPfx+"Skipped: Framework is already %v, "+ + "and waiting to be deleted after FrameworkCompletedRetainSec", + f.Status.State) return nil } @@ -755,7 +1039,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { var diag string var code ci.CompletionCode if f.Spec.ExecutionType == ci.ExecutionStop { - diag = fmt.Sprintf("User has requested to stop the Framework") + diag = "User has requested to stop the Framework" code = ci.CompletionCodeStopFrameworkRequested klog.Info(logPfx + diag) } else { @@ -909,6 +1193,13 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { f.Status.AttemptStatus = f.NewFrameworkAttemptStatus( f.Status.RetryPolicyStatus.TotalRetriedCount) f.TransitionFrameworkState(ci.FrameworkAttemptCreationPending) + + // To ensure FrameworkAttemptCreationPending is persisted before creating + // its cm, we need to wait until next sync to create the cm, so manually + // enqueue a sync. + c.enqueueFrameworkSync(f, "FrameworkAttemptCreationPending") + klog.Infof(logPfx + "Waiting FrameworkAttemptCreationPending to be persisted") + return nil } } // At this point, f.Status.State must be in: @@ -924,7 +1215,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { } if f.Spec.ExecutionType == ci.ExecutionStop { - diag := fmt.Sprintf("User has requested to stop the Framework") + diag := "User has requested to stop the Framework" klog.Info(logPfx + diag) // Ensure cm is deleted in remote to avoid managed cm leak after @@ -972,7 +1263,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { f.Status.State == ci.FrameworkAttemptDeleting { if !f.IsCompleting() { if f.Spec.ExecutionType == ci.ExecutionStop { - diag := fmt.Sprintf("User has requested to stop the Framework") + diag := "User has requested to stop the Framework" klog.Info(logPfx + diag) c.completeFrameworkAttempt(f, false, ci.CompletionCodeStopFrameworkRequested. @@ -980,10 +1271,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { } } + if !f.IsCompleting() { + c.syncFrameworkAttemptCompletionPolicy(f) + } + err := c.syncTaskRoleStatuses(f, cm) if f.Status.State == ci.FrameworkAttemptPreparing { - if f.IsAnyTaskRunning() { + if f.IsAnyTaskRunning(true) { f.TransitionFrameworkState(ci.FrameworkAttemptRunning) } } @@ -1167,6 +1462,125 @@ func (c *FrameworkController) createConfigMap( } } +// FrameworkAttemptCompletionPolicy can be triggered by not only completed Tasks +// increased in f.Status, but also FrameworkAttemptCompletionPolicy or TotalTaskCount +// decreased in f.Spec, so full sync here is needed. +// Note, the sync is relatively very cheap, so it is fine to call the sync during +// all kinds of FrameworkSync. +func (c *FrameworkController) syncFrameworkAttemptCompletionPolicy( + f *ci.Framework) (completionPolicyTriggered bool) { + logPfx := fmt.Sprintf("[%v]: syncFrameworkAttemptCompletionPolicy: ", f.Key()) + klog.Infof(logPfx + "Started") + defer func() { klog.Infof(logPfx + "Completed") }() + + failedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsFailed, true) + succeededTaskSelector := ci.BindIDP((*ci.TaskStatus).IsSucceeded, true) + completedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsCompleted, true) + + var firstTriggerTime *meta.Time + var firstTriggerCompletionStatus *ci.FrameworkAttemptCompletionStatus + for _, taskRoleSpec := range f.Spec.TaskRoles { + taskRoleName := taskRoleSpec.Name + taskRoleStatus := f.GetTaskRoleStatus(taskRoleName) + if taskRoleStatus == nil { + // Unreachable + continue + } + + completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy + minFailedTaskCount := completionPolicy.MinFailedTaskCount + minSucceededTaskCount := completionPolicy.MinSucceededTaskCount + + if minFailedTaskCount >= 1 { + failedTaskCount := taskRoleStatus.GetTaskCountStatus(failedTaskSelector) + if failedTaskCount >= minFailedTaskCount { + trigger := taskRoleStatus.CompletionTimeOrderedTaskStatus( + failedTaskSelector, minFailedTaskCount-1) + + if firstTriggerTime == nil || trigger.CompletionTime.Before(firstTriggerTime) { + firstTriggerTime = trigger.CompletionTime + firstTriggerCompletionStatus = ci.NewFailedTaskTriggeredCompletionStatus( + trigger, taskRoleName, failedTaskCount, minFailedTaskCount) + } + } + } + + if minSucceededTaskCount >= 1 { + succeededTaskCount := taskRoleStatus.GetTaskCountStatus(succeededTaskSelector) + if succeededTaskCount >= minSucceededTaskCount { + trigger := taskRoleStatus.CompletionTimeOrderedTaskStatus( + succeededTaskSelector, minSucceededTaskCount-1) + + if firstTriggerTime == nil || trigger.CompletionTime.Before(firstTriggerTime) { + firstTriggerTime = trigger.CompletionTime + firstTriggerCompletionStatus = ci.NewSucceededTaskTriggeredCompletionStatus( + trigger, taskRoleName, succeededTaskCount, minSucceededTaskCount) + } + } + } + } + + if firstTriggerCompletionStatus != nil { + klog.Infof("[%v][%v][%v]: syncFrameworkAttemptCompletionPolicy: %v", f.Key(), + firstTriggerCompletionStatus.Trigger.TaskRoleName, + firstTriggerCompletionStatus.Trigger.TaskIndex, + firstTriggerCompletionStatus.Trigger.Message) + c.completeFrameworkAttempt(f, false, firstTriggerCompletionStatus) + return true + } + + // The Framework must not Completing or Completed, so TaskRoles/Tasks in + // f.Spec must fully contain not DeletionPending (ScaleDown) TaskRoles/Tasks + // in f.Status, thus completedTaskCount must <= totalTaskCount. + totalTaskCount := f.GetTotalTaskCountSpec() + completedTaskCount := f.GetTaskCountStatus(completedTaskSelector) + if completedTaskCount >= totalTaskCount { + var lastCompletedTaskStatus *ci.TaskStatus + var lastCompletedTaskRoleName string + for _, taskRoleSpec := range f.Spec.TaskRoles { + taskRoleName := taskRoleSpec.Name + taskRoleStatus := f.GetTaskRoleStatus(taskRoleName) + if taskRoleStatus == nil { + // Unreachable + continue + } + + roleTotalTaskCount := taskRoleSpec.TaskNumber + if roleTotalTaskCount == 0 { + continue + } + + roleLastCompletedTask := taskRoleStatus.CompletionTimeOrderedTaskStatus( + completedTaskSelector, roleTotalTaskCount-1) + + if lastCompletedTaskStatus == nil || + roleLastCompletedTask.CompletionTime.Time.After( + lastCompletedTaskStatus.CompletionTime.Time) { + lastCompletedTaskStatus = roleLastCompletedTask + lastCompletedTaskRoleName = taskRoleName + } + } + + firstTriggerCompletionStatus = ci.NewCompletedTaskTriggeredCompletionStatus( + lastCompletedTaskStatus, lastCompletedTaskRoleName, + completedTaskCount, totalTaskCount) + + if firstTriggerCompletionStatus.Trigger == nil { + klog.Infof("[%v]: syncFrameworkAttemptCompletionPolicy: %v", f.Key(), + firstTriggerCompletionStatus.Diagnostics) + } else { + klog.Infof("[%v][%v][%v]: syncFrameworkAttemptCompletionPolicy: %v", f.Key(), + firstTriggerCompletionStatus.Trigger.TaskRoleName, + firstTriggerCompletionStatus.Trigger.TaskIndex, + firstTriggerCompletionStatus.Trigger.Message) + } + c.completeFrameworkAttempt(f, false, firstTriggerCompletionStatus) + return true + } + + return false +} + func (c *FrameworkController) syncTaskRoleStatuses( f *ci.Framework, cm *core.ConfigMap) (err error) { logPfx := fmt.Sprintf("[%v]: syncTaskRoleStatuses: ", f.Key()) @@ -1199,18 +1613,19 @@ func (c *FrameworkController) syncTaskState( klog.Infof(logPfx + "Started") defer func() { klog.Infof(logPfx + "Completed") }() - taskRoleSpec := f.TaskRoleSpec(taskRoleName) - taskSpec := taskRoleSpec.Task + taskRoleSpec := f.GetTaskRoleSpec(taskRoleName) taskRoleStatus := f.TaskRoleStatus(taskRoleName) taskStatus := f.TaskStatus(taskRoleName, taskIndex) if taskStatus.State == ci.TaskCompleted { - // The TaskCompleted should not trigger FrameworkAttemptDeletionPending, so - // it is safe to skip the attemptToCompleteFrameworkAttempt. - // Otherwise, given it is impossible that the TaskCompleted is persisted - // but the FrameworkAttemptDeletionPending is not persisted, the TaskCompleted - // should have already triggered and persisted FrameworkAttemptDeletionPending - // in previous sync, so current sync should have already been skipped but not. + // The TaskCompleted has already been considered during above + // syncFrameworkAttemptCompletionPolicy, so it is safe to skip below + // attemptToCompleteFrameworkAttempt. + // + // If the Task is DeletionPending, since it is not deleted, this must be caused + // by other Task behind it has not yet TaskCompleted. + // And if the following Task becomes TaskCompleted later, a sync will be + // enqueued to trigger the deletion. klog.Infof(logPfx + "Skipped: Task is already completed") return nil } @@ -1228,17 +1643,26 @@ func (c *FrameworkController) syncTaskState( // Avoid sync with outdated object: // pod is remote creation requested but not found in the local cache. if taskStatus.State == ci.TaskAttemptCreationRequested { - if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) { - klog.Infof(logPfx + - "Waiting Pod to appear in the local cache or timeout") - return nil - } + var diag string + var code ci.CompletionCode + if taskStatus.DeletionPending { + diag = "User has requested to delete the Task by Framework ScaleDown" + code = ci.CompletionCodeDeleteTaskRequested + klog.Info(logPfx + diag) + } else { + if c.enqueueTaskAttemptCreationTimeoutCheck(f, taskRoleName, taskIndex, true) { + klog.Infof(logPfx + + "Waiting Pod to appear in the local cache or timeout") + return nil + } - diag := fmt.Sprintf( - "Pod does not appear in the local cache within timeout %v, "+ - "so consider it was deleted and explicitly delete it", - common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) - klog.Warning(logPfx + diag) + diag = fmt.Sprintf( + "Pod does not appear in the local cache within timeout %v, "+ + "so consider it was deleted and explicitly delete it", + common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) + code = ci.CompletionCodePodCreationTimeout + klog.Warning(logPfx + diag) + } // Ensure pod is deleted in remote to avoid managed pod leak after // TaskAttemptCompleted. @@ -1248,8 +1672,7 @@ func (c *FrameworkController) syncTaskState( } c.completeTaskAttempt(f, taskRoleName, taskIndex, true, - ci.CompletionCodePodCreationTimeout. - NewTaskAttemptCompletionStatus(diag, nil)) + code.NewTaskAttemptCompletionStatus(diag, nil)) return nil } @@ -1295,31 +1718,26 @@ func (c *FrameworkController) syncTaskState( f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing) } - // Possibly due to the NodeController has not heard from the kubelet who - // manages the Pod for more than node-monitor-grace-period but less than - // pod-eviction-timeout. - // And after pod-eviction-timeout, the Pod will be marked as deleting, but - // it will only be automatically deleted after the kubelet comes back and - // kills the Pod. - if pod.Status.Phase == core.PodUnknown { - klog.Infof(logPfx+ - "Waiting Pod to be deleted or deleting or transitioned from %v", - pod.Status.Phase) - return nil - } - // Below Pod fields may be available even when PodPending, such as the Pod // has been bound to a Node, but one or more Containers has not been started. taskStatus.AttemptStatus.PodNodeName = &pod.Spec.NodeName taskStatus.AttemptStatus.PodIP = &pod.Status.PodIP taskStatus.AttemptStatus.PodHostIP = &pod.Status.HostIP - if pod.Status.Phase == core.PodPending { + if pod.Status.Phase == core.PodUnknown { + // Possibly due to the NodeController has not heard from the kubelet who + // manages the Pod for more than node-monitor-grace-period but less than + // pod-eviction-timeout. + // And after pod-eviction-timeout, the Pod will be marked as deleting, but + // it will only be automatically deleted after the kubelet comes back and + // kills the Pod. + klog.Infof(logPfx+ + "Waiting Pod to be deleted or deleting or transitioned from %v", + pod.Status.Phase) + } else if pod.Status.Phase == core.PodPending { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptPreparing) - return nil } else if pod.Status.Phase == core.PodRunning { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptRunning) - return nil } else if pod.Status.Phase == core.PodSucceeded { diag := fmt.Sprintf("Pod succeeded") klog.Info(logPfx + diag) @@ -1362,14 +1780,36 @@ func (c *FrameworkController) syncTaskState( } } // At this point, taskStatus.State must be in: + // {TaskAttemptCreationPending, TaskAttemptPreparing, + // TaskAttemptRunning, TaskAttemptCompleted} + + if taskStatus.State == ci.TaskAttemptPreparing || + taskStatus.State == ci.TaskAttemptRunning { + if taskStatus.DeletionPending { + diag := "User has requested to delete the Task by Framework ScaleDown" + klog.Info(logPfx + diag) + c.completeTaskAttempt(f, taskRoleName, taskIndex, false, + ci.CompletionCodeDeleteTaskRequested. + NewTaskAttemptCompletionStatus(diag, nil)) + } + return nil + } + // At this point, taskStatus.State must be in: // {TaskAttemptCreationPending, TaskAttemptCompleted} if taskStatus.State == ci.TaskAttemptCompleted { // attemptToRetryTask - retryDecision := taskSpec.RetryPolicy.ShouldRetry( - taskStatus.RetryPolicyStatus, - taskStatus.AttemptStatus.CompletionStatus.CompletionStatus, - 0, 0) + var retryDecision ci.RetryDecision + if taskRoleSpec == nil { + retryDecision = ci.RetryDecision{ + ShouldRetry: false, IsAccountable: true, + DelaySec: 0, Reason: "TaskRoleSpec is already deleted"} + } else { + retryDecision = taskRoleSpec.Task.RetryPolicy.ShouldRetry( + taskStatus.RetryPolicyStatus, + taskStatus.AttemptStatus.CompletionStatus.CompletionStatus, + 0, 0) + } if taskStatus.RetryPolicyStatus.RetryDelaySec == nil { // RetryTask is not yet scheduled, so need to be decided. @@ -1393,9 +1833,15 @@ func (c *FrameworkController) syncTaskState( if taskStatus.RetryPolicyStatus.RetryDelaySec != nil { // RetryTask is already scheduled, so just need to check whether it // should be executed now. - if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) { - klog.Infof(logPfx + "Waiting Task to retry after delay") - return nil + if taskStatus.DeletionPending { + klog.Infof(logPfx + + "User has requested to delete the Task by Framework ScaleDown, " + + "so immediately retry without delay") + } else { + if c.enqueueTaskRetryDelayTimeoutCheck(f, taskRoleName, taskIndex, true) { + klog.Infof(logPfx + "Waiting Task to retry after delay") + return nil + } } // retryTask @@ -1415,6 +1861,13 @@ func (c *FrameworkController) syncTaskState( taskStatus.AttemptStatus = f.NewTaskAttemptStatus( taskRoleName, taskIndex, taskStatus.RetryPolicyStatus.TotalRetriedCount) f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptCreationPending) + + // To ensure TaskAttemptCreationPending is persisted before creating + // its pod, we need to wait until next sync to create the pod, so manually + // enqueue a sync. + c.enqueueFrameworkSync(f, "TaskAttemptCreationPending") + klog.Infof(logPfx + "Waiting TaskAttemptCreationPending to be persisted") + return nil } } // At this point, taskStatus.State must be in: @@ -1427,6 +1880,23 @@ func (c *FrameworkController) syncTaskState( return nil } + if taskStatus.DeletionPending || taskRoleSpec == nil { + diag := "User has requested to delete the Task by Framework ScaleDown" + klog.Info(logPfx + diag) + + // Ensure pod is deleted in remote to avoid managed pod leak after + // TaskAttemptCompleted. + _, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true) + if err != nil { + return err + } + + c.completeTaskAttempt(f, taskRoleName, taskIndex, true, + ci.CompletionCodeDeleteTaskRequested. + NewTaskAttemptCompletionStatus(diag, nil)) + return nil + } + // createTaskAttempt pod, err = c.createPod(f, cm, taskRoleName, taskIndex) if err != nil { @@ -1478,72 +1948,61 @@ func (c *FrameworkController) syncTaskState( return nil } + if taskStatus.DeletionPending || taskRoleSpec == nil { + klog.Infof(logPfx + "Skip to attemptToCompleteFrameworkAttempt: " + + "Task is DeletionPending") + + // To ensure the TaskCompleted[DeletionPending] is persisted before + // deleting/replacing its Task instance, we need to wait until next + // sync to delete/replace the Task instance, so manually enqueue a sync. + c.enqueueFrameworkSync(f, "TaskCompleted[DeletionPending]") + klog.Infof(logPfx + "Waiting TaskCompleted[DeletionPending] to be persisted") + return nil + } + // attemptToCompleteFrameworkAttempt + failedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsFailed, true) + succeededTaskSelector := ci.BindIDP((*ci.TaskStatus).IsSucceeded, true) + completedTaskSelector := ci.BindIDP((*ci.TaskStatus).IsCompleted, true) + completionPolicy := taskRoleSpec.FrameworkAttemptCompletionPolicy minFailedTaskCount := completionPolicy.MinFailedTaskCount minSucceededTaskCount := completionPolicy.MinSucceededTaskCount - if taskStatus.IsFailed() && minFailedTaskCount != ci.UnlimitedValue { - failedTaskCount := taskRoleStatus.GetTaskCount((*ci.TaskStatus).IsFailed) + var triggerCompletionStatus *ci.FrameworkAttemptCompletionStatus + if taskStatus.IsFailed(true) && minFailedTaskCount >= 1 { + failedTaskCount := taskRoleStatus.GetTaskCountStatus(failedTaskSelector) if failedTaskCount >= minFailedTaskCount { - msg := fmt.Sprintf( - "FailedTaskCount %v has reached MinFailedTaskCount %v in the TaskRole", - failedTaskCount, minFailedTaskCount) - klog.Info(logPfx + msg) - c.completeFrameworkAttempt(f, false, - &ci.FrameworkAttemptCompletionStatus{ - CompletionStatus: taskStatus.AttemptStatus.CompletionStatus.CompletionStatus, - Trigger: &ci.CompletionPolicyTriggerStatus{ - Message: msg, - TaskRoleName: taskRoleName, - TaskIndex: taskIndex, - }, - }, - ) - return nil + triggerCompletionStatus = ci.NewFailedTaskTriggeredCompletionStatus( + taskStatus, taskRoleName, failedTaskCount, minFailedTaskCount) } } - if taskStatus.IsSucceeded() && minSucceededTaskCount != ci.UnlimitedValue { - succeededTaskCount := taskRoleStatus.GetTaskCount((*ci.TaskStatus).IsSucceeded) + if taskStatus.IsSucceeded(true) && minSucceededTaskCount >= 1 { + succeededTaskCount := taskRoleStatus.GetTaskCountStatus(succeededTaskSelector) if succeededTaskCount >= minSucceededTaskCount { - msg := fmt.Sprintf( - "SucceededTaskCount %v has reached MinSucceededTaskCount %v in the TaskRole", - succeededTaskCount, minSucceededTaskCount) - klog.Info(logPfx + msg) - c.completeFrameworkAttempt(f, false, - ci.CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus( - taskStatus.AttemptStatus.CompletionStatus.Diagnostics, - &ci.CompletionPolicyTriggerStatus{ - Message: msg, - TaskRoleName: taskRoleName, - TaskIndex: taskIndex, - }, - ), - ) - return nil + triggerCompletionStatus = ci.NewSucceededTaskTriggeredCompletionStatus( + taskStatus, taskRoleName, succeededTaskCount, minSucceededTaskCount) } } - if f.AreAllTasksCompleted() { - totalTaskCount := f.GetTaskCount(nil) - failedTaskCount := f.GetTaskCount((*ci.TaskStatus).IsFailed) - msg := fmt.Sprintf( - "All Tasks are completed and no user specified conditions in "+ - "FrameworkAttemptCompletionPolicy have ever been triggered: "+ - "TotalTaskCount: %v, FailedTaskCount: %v", - totalTaskCount, failedTaskCount) - klog.Info(logPfx + msg) - c.completeFrameworkAttempt(f, false, - ci.CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus( - taskStatus.AttemptStatus.CompletionStatus.Diagnostics, - &ci.CompletionPolicyTriggerStatus{ - Message: msg, - TaskRoleName: taskRoleName, - TaskIndex: taskIndex, - }, - ), - ) + if triggerCompletionStatus != nil { + klog.Info(logPfx + triggerCompletionStatus.Trigger.Message) + c.completeFrameworkAttempt(f, false, triggerCompletionStatus) + return nil + } + + // The Framework must not Completing or Completed, so TaskRoles/Tasks in + // f.Spec must fully contain not DeletionPending (ScaleDown) TaskRoles/Tasks + // in f.Status, thus completedTaskCount must <= totalTaskCount. + totalTaskCount := f.GetTotalTaskCountSpec() + completedTaskCount := f.GetTaskCountStatus(completedTaskSelector) + if completedTaskCount >= totalTaskCount { + triggerCompletionStatus = ci.NewCompletedTaskTriggeredCompletionStatus( + taskStatus, taskRoleName, completedTaskCount, totalTaskCount) + + klog.Info(logPfx + triggerCompletionStatus.Trigger.Message) + c.completeFrameworkAttempt(f, false, triggerCompletionStatus) return nil } @@ -1563,23 +2022,24 @@ func (c *FrameworkController) handlePodGracefulDeletion( f *ci.Framework, taskRoleName string, taskIndex int32, pod *core.Pod) error { logPfx := fmt.Sprintf("[%v][%v][%v]: handlePodGracefulDeletion: ", f.Key(), taskRoleName, taskIndex) - taskSpec := f.TaskRoleSpec(taskRoleName).Task + taskStatus := f.TaskRoleStatus(taskRoleName) + timeoutSec := taskStatus.PodGracefulDeletionTimeoutSec if pod.DeletionTimestamp == nil { return nil } - if taskSpec.PodGracefulDeletionTimeoutSec == nil { + if timeoutSec == nil { klog.Infof(logPfx + "Waiting Pod to be deleted") return nil } - if c.enqueuePodGracefulDeletionTimeoutCheck(f, taskRoleName, true, pod) { + if c.enqueuePodGracefulDeletionTimeoutCheck(f, timeoutSec, true, pod) { klog.Infof(logPfx + "Waiting Pod to be deleted or timeout") return nil } klog.Warningf(logPfx+ "Pod cannot be deleted within timeout %v, so force delete it", - common.SecToDuration(taskSpec.PodGracefulDeletionTimeoutSec)) + common.SecToDuration(timeoutSec)) // Always confirm the force deletion to expose the failure that even force // deletion cannot delete the Pod, such as the Pod Finalizers is not empty. return c.deletePod(f, taskRoleName, taskIndex, pod.UID, true, true) @@ -1649,6 +2109,10 @@ func (c *FrameworkController) getOrCleanupPod( // Using UID to ensure we delete the right object. // The podUID should be controlled by f's cm. +// Note, Pod force deletion can only be done after PodGracefulDeletionTimeoutSec +// expired which is mostly caused by bad node, for other cases, such as even if +// delete an unmanaged Pod, the force deletion may cause local node resource +// conflict since the node may be still healthy. func (c *FrameworkController) deletePod( f *ci.Framework, taskRoleName string, taskIndex int32, podUID types.UID, confirm bool, force bool) error { @@ -1751,18 +2215,19 @@ func (c *FrameworkController) completeTaskAttempt( common.ToJson(taskStatus.AttemptStatus.CompletionStatus)) } - // To ensure the completed TaskAttempt is persisted before exposed, - // we need to wait until next sync to expose it, so manually enqueue a sync. + // To ensure TaskAttemptCompleted is persisted before exposed its TaskAttempt, + // we need to wait until next sync to expose the TaskAttempt, so manually + // enqueue a sync. c.enqueueFrameworkSync(f, "TaskAttemptCompleted") - klog.Infof(logPfx + "Waiting the completed TaskAttempt to be persisted") + klog.Infof(logPfx + "Waiting TaskAttemptCompleted to be persisted") } else { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionPending) - // To ensure the CompletionStatus is persisted before deleting the pod, + // To ensure TaskAttemptDeletionPending is persisted before deleting its pod, // we need to wait until next sync to delete the pod, so manually enqueue // a sync. c.enqueueFrameworkSync(f, "TaskAttemptDeletionPending") - klog.Infof(logPfx + "Waiting the CompletionStatus to be persisted") + klog.Infof(logPfx + "Waiting TaskAttemptDeletionPending to be persisted") } } @@ -1817,18 +2282,19 @@ func (c *FrameworkController) completeFrameworkAttempt( common.ToJson(f.Status.AttemptStatus.CompletionStatus)) } - // To ensure the completed FrameworkAttempt is persisted before exposed, - // we need to wait until next sync to expose it, so manually enqueue a sync. + // To ensure FrameworkAttemptCompleted is persisted before exposed its + // FrameworkAttempt, we need to wait until next sync to expose the + // FrameworkAttempt, so manually enqueue a sync. c.enqueueFrameworkSync(f, "FrameworkAttemptCompleted") - klog.Infof(logPfx + "Waiting the completed FrameworkAttempt to be persisted") + klog.Infof(logPfx + "Waiting FrameworkAttemptCompleted to be persisted") } else { f.TransitionFrameworkState(ci.FrameworkAttemptDeletionPending) - // To ensure the CompletionStatus is persisted before deleting the cm, - // we need to wait until next sync to delete the cm, so manually enqueue - // a sync. + // To ensure FrameworkAttemptDeletionPending is persisted before deleting + // its cm, we need to wait until next sync to delete the cm, so manually + // enqueue a sync. c.enqueueFrameworkSync(f, "FrameworkAttemptDeletionPending") - klog.Infof(logPfx + "Waiting the CompletionStatus to be persisted") + klog.Infof(logPfx + "Waiting FrameworkAttemptDeletionPending to be persisted") } }