Skip to content

Commit

Permalink
[Cluster launcher] [vSphere] Fix the bug that multiple worker types d…
Browse files Browse the repository at this point in the history
…oesn't work (#40487)

Currently our code assumes that there is only one worker node type.
In this change I fix the bug to let it support multiple worker node types.

Signed-off-by: Chen Jing <jingch@vmware.com>
Co-authored-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
  • Loading branch information
JingChen23 and architkulkarni authored Oct 23, 2023
1 parent df3237a commit 7b83147
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
70 changes: 31 additions & 39 deletions python/ray/autoscaler/_private/vsphere/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def validate_frozen_vm_configs(conf: dict):


def check_and_update_frozen_vm_configs_in_provider_section(
config, head_node_config, worker_node_config
config, head_node_config, worker_node_configs
):
provider_config = config["provider"]

Expand All @@ -128,28 +128,31 @@ def check_and_update_frozen_vm_configs_in_provider_section(

head_node_config["frozen_vm"] = vsphere_config["frozen_vm"]

worker_node_config["frozen_vm"] = {}
for worker_node_config in worker_node_configs:

# Copy the fields from head node config to worker node config.
# We don't copy the library_item field into the worker node config as it'll
# trigger creation of frozen VM(s) again when the code executes on the head
# node.
# The copied fields will later be used when the code executes on the head
# node. The fields will determine the frozen VMs to be used for creating
# worker nodes.
if "name" in head_node_config["frozen_vm"]:
worker_node_config["frozen_vm"]["name"] = head_node_config["frozen_vm"]["name"]
worker_node_config["frozen_vm"] = {}

if "resource_pool" in head_node_config["frozen_vm"]:
worker_node_config["frozen_vm"]["resource_pool"] = head_node_config[
"frozen_vm"
]["resource_pool"]
# Copy the fields from head node config to worker node config.
# We don't copy the library_item field into the worker node config as it'll
# trigger creation of frozen VM(s) again when the code executes on the head
# node.
# The copied fields will later be used when the code executes on the head
# node. The fields will determine the frozen VMs to be used for creating
# worker nodes.
worker_frozen_vm_cfg = worker_node_config["frozen_vm"]
if "name" in head_node_config["frozen_vm"]:
worker_frozen_vm_cfg["name"] = head_node_config["frozen_vm"]["name"]

if "resource_pool" in head_node_config["frozen_vm"]:
worker_frozen_vm_cfg["resource_pool"] = head_node_config["frozen_vm"][
"resource_pool"
]


def add_credentials_into_provider_section(config):
provider_config = config["provider"]

# vsphere_config is an optional field as the credentials can also be specified
# vsphere_config is an optional field as the cred/entials can also be specified
# as env variables so first check verifies if this field is present before
# accessing its properties
if (
Expand All @@ -170,17 +173,17 @@ def add_credentials_into_provider_section(config):
def update_vsphere_configs(config):
available_node_types = config["available_node_types"]

# Fetch worker: field from the YAML file
worker_node = available_node_types["worker"]
worker_node_config = worker_node["node_config"]

# Fetch the head node field name from head_node_type field.
head_node_type = config["head_node_type"]

# Use head_node_type field's value to fetch the head node field
head_node = available_node_types[head_node_type]
head_node_config = head_node["node_config"]

# Fetch worker: field from the YAML file
worker_nodes = [v for k, v in available_node_types.items() if k != head_node_type]
worker_node_configs = [worker_node["node_config"] for worker_node in worker_nodes]

# A mandatory constraint enforced by the Ray's YAML validator
# is to add resources field for both head and worker nodes.
# For example, to specify resources for the worker the
Expand All @@ -193,34 +196,23 @@ def update_vsphere_configs(config):
# resources
# This enables us to access the field during node creation.
# The same happens for head node too.
worker_node_config["resources"] = worker_node["resources"]
head_node_config["resources"] = head_node["resources"]

head_resource_pool = None
if "resource_pool" in head_node_config:
head_resource_pool = head_node_config["resource_pool"]

# by default create worker nodes in the head node's resource pool
worker_resource_pool = head_resource_pool

# If different resource pool is provided for worker nodes, use it
if "resource_pool" in worker_node_config:
worker_resource_pool = worker_node_config["resource_pool"]

worker_node_config["resource_pool"] = worker_resource_pool

worker_datastore = None

if "datastore" in head_node_config and head_node_config["datastore"]:
worker_datastore = head_node_config["datastore"]
for worker_node in worker_nodes:
worker_node["node_config"]["resources"] = worker_node["resources"]

if "datastore" in worker_node_config and worker_node_config["datastore"]:
worker_datastore = worker_node_config["datastore"]
for worker_node_config in worker_node_configs:
# if the resource pool for the worker node is unspecified then let it be the
# head node's resource pool

worker_node_config["datastore"] = worker_datastore
if not worker_node_config.get("resource_pool"):
worker_node_config["resource_pool"] = head_resource_pool

check_and_update_frozen_vm_configs_in_provider_section(
config, head_node_config, worker_node_config
config, head_node_config, worker_node_configs
)


Expand Down
8 changes: 8 additions & 0 deletions python/ray/tests/vsphere/test_vsphere_node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def test_update_vsphere_configs():
"node_config": {"resource_pool": "ray", "datastore": "vsan"},
},
"worker": {"resources": {}, "node_config": {}},
"worker1": {"resources": {}, "node_config": {}},
},
"head_node_type": "ray.head.default",
}
Expand All @@ -438,6 +439,13 @@ def test_update_vsphere_configs():
in input_config["available_node_types"]["ray.head.default"]["node_config"]
)
assert "frozen_vm" in input_config["available_node_types"]["worker"]["node_config"]
assert "frozen_vm" in input_config["available_node_types"]["worker1"]["node_config"]
assert (
input_config["available_node_types"]["worker"]["node_config"]["frozen_vm"][
"name"
]
== "frozen"
)


def test_validate_frozen_vm_configs():
Expand Down

0 comments on commit 7b83147

Please sign in to comment.