Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TPU Multi-Host Support #1913

Merged
merged 16 commits into from
Feb 22, 2024
Merged

TPU Multi-Host Support #1913

merged 16 commits into from
Feb 22, 2024

Conversation

ryanaoleary
Copy link
Contributor

@ryanaoleary ryanaoleary commented Feb 7, 2024

Why are these changes needed?

Fix reconciliation logic for multi-host worker groups.

For NumOfHosts > 1, the controller now treats replicas as workerGroups and scales by NumOfHosts pods per replica. Additionally, if a pod in a multi-host group fails or is deleted, the entire multi-host group is deleted. This PR adds unit tests to cover multi-host pod creation, deletion, and reconciliation logic.

Related issue number

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
r.Log.Info("reconcilePods", "removing worker groups in the scaleStrategy of", worker.GroupName)
for _, groupToDelete := range worker.ScaleStrategy.MultihostGroupsToDelete {
for _, pod := range workerPods.Items {
if pod.Labels[utils.RayNodeGroupLabelKey] == groupToDelete {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking some more about this, I am not sure if this would work - if this is the worker group name, we'll end up deleting all pods from the worker group, including all the replicas. I think the intention here should be to delete only a specific replica, including all the hosts.

Also I am not sure how the autoscaler logic would work. I am assuming the resource scheduler should know if certain pods have been idle, and it adds those pods to the deletion request. I don't think the autoscaler has any idea if an entire multihost replica has been idle.

@kevin85421 FYI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After giving this a bit more thought, I think there are a couple of ways to solve this:

  1. In the get_node_data function in Kuberay node provider, add some code to parse the replica index for each node.
  2. Then build a reverse lookup map that finds all other worker pods belonging to the same replica.
  3. When one pod needs to scale down, make sure that all other pods from the same replica are also sent as part of the WorkersToDelete.

OR:

Similar to the above, but in step 3 we send the name of the replica to the scale down request. On the Kuberay operator, we figure out which exact pods belong to that replica and delete them together.

return err
// Due to pods being scaled down, we are not guaranteed that the multihost group name will always be
// incremental. So we just need to use some random integer here.
group := rand.Uint32()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above comment, perhaps we should make this id more deterministic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Update) I think it should be fine to leave it as it is, see the reply on first comment.

ray-operator/controllers/ray/utils/constant.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/utils/constant.go Outdated Show resolved Hide resolved
@@ -22,6 +22,8 @@ const (
RayClusterServingServiceLabelKey = "ray.io/serve"
HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete"
NumWorkerGroupsKey = "ray.io/num-worker-groups"
MultihostReplicaKey = "ray.io/multihost-replica"
RayNodeHostIndexKey = "ray.io/host-index"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, this label is similar to TPU_WORKER_ID. We should not add this label because it will increase a lot of complexity of KubeRay's reconciliation logic.

  • No. 1: KubeRay is not aware of multi-host PodSlice and TPU_WORKER_ID. All Pods in the same worker group are the same. If a Pod is deleted accidentally, we just need to create a new one easily.
  • No. 2: KubeRay is aware of multi-host PodSlice but not of TPU_WORKER_ID. Pods in the same worker group are different. If a Pod is deleted accidentally, we need to figure out which PodSlice doesn't have enough Pods and create one for the PodSlice.
  • No. 3: KubeRay is aware of multi-host PodSlice and TPU_WORKER_ID. If a Pod is accidentally deleted, we need to determine which PodSlice the Pod belongs to and its TPU_WORKER_ID are needed to create a new Pod.

We should do our best to implement strategy No. 1. If that's not possible, we at least need to adhere to strategy No. 2.

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
group := rand.Uint32()
var j uint32
for j = 0; j < uint32(worker.NumOfHosts); j++ {
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy(), group, j); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we manually delete a Pod in a multi-host PodSlice? It seems the implementation may not be able to handle it.

ray-operator/controllers/ray/raycluster_controller.go Outdated Show resolved Hide resolved
@kevin85421
Copy link
Member

We need to address #1913 (comment) before everything moves forward.

@kevin85421
Copy link
Member

Summarize the offline discussion:

  • We will go with the No. 1 strategy.
    • KubeRay is not responsible for labels ray.io/multihost-replica and ray.io/host-index.
    • KubeRay doesn't need to handle MultihostReplicasToDelete. Ray should tell KubeRay which 4 Pods need to be deleted instead of the ID of the multi-host PodSlice.
    • For multi-host cases (NumOfHosts > 1), we don't need to handle random deletion at this moment. If users want to manually delete a PodSlice, they should update both replicas and WorkersToDelete manually. See TPU Multi-Host Support #1913 (comment) for more details.
    • KubeRay only promises to create the correct number of Pods for the worker group.
      • GKE webhook is responsible for:
        • Set labels to enable Pods to be scheduled on the correct Kubernetes nodes.
        • Inject environment variables to the Pods to enable Ray to know which Ray nodes belong to the same PodSlice.
        • Inject environment variables for TPU_WORKER_ID.

@kevin85421
Copy link
Member

@ryanaoleary @richardsliu any update?

@ryanaoleary
Copy link
Contributor Author

ryanaoleary commented Feb 21, 2024

@ryanaoleary @richardsliu any update?

The PR should be ready for review now going with Strategy 1. I took out the new labels and all of the deletion logic, since Ray should now use WorkersToDelete to tell Kuberay which Pods in a multi-host Podslice to remove.

@@ -758,7 +757,10 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
runningPods.Items = append(runningPods.Items, pod)
}
}
diff := workerReplicas - int32(len(runningPods.Items))
// A replica can contain multiple hosts, so we need to calculate this based on the number of hosts per replica.
runningReplicas := int32(len(runningPods.Items)) / worker.NumOfHosts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following case:

  • len(runningPods.Items): 7
  • NumOfHosts: 4
  • workerReplicas: 2
runningReplicas := int32(len(runningPods.Items)) / worker.NumOfHosts // 1
diff := workerReplicas - runningReplicas // 2 - 1 = 1 => Create 4 new Pods. => 11 Pods in total.

Maybe we should use:

numExpectedPods  := workerReplicas * worker.NumOfHosts
diff := numExpectedPods - len(runningPods.Items)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit unsure how we're supposed to handle the case where NumOfHosts > 1 and len(runningPods.Items) % NumOfHosts != 0. This would only happen when a pod in the multi-host podslice crashed or was deleted, in which case should the entire podslice be deleted and then recreated if necessary? If that's not the case then I think this makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this change in b038e4b.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take a step back:

  • KubeRay doesn't handle any logic about multi-host PodSlice ("when a pod in the multi-host podslice crashed ... entire podslice be deleted ...").
  • KubeRay only promises to create replicas * NumOfHosts Pods for a worker group.
  • Pod scheduling is handled by GKE webhook.
  • Scaling up and down is handled by the Ray Autoscaler (ex: "... should the entire podslice be deleted and then recreated ..." ).

// (1) 1 workerGroup (2) disable autoscaling
assert.Equal(t, 1, len(testRayCluster.Spec.WorkerGroupSpecs), "This test assumes only one worker group.")

// Disable autoscaling so that the random Pod deletion is enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random pod deletion is a pretty bad behavior for a multi-host setup. I am considering disabling it.

@kevin85421 kevin85421 merged commit dbd6b72 into ray-project:master Feb 22, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants