Skip to content

Commit

Permalink
[autoscaler v2][5/n] introducing reconciler (ray-project#34985)
Browse files Browse the repository at this point in the history
Why are these changes needed?
this is the stack of PRs to introduce new node_provider for autoscaler v2.

Stack of PRs
ray-project#34976
ray-project#34977
ray-project#34979
ray-project#34983
ray-project#34985 <- this PR

Added a reconciler that reconciles the state from instance_storage to node_provider. Specifically, it subscribes to changes in the storage, and triggers following operations in sequence:

launch new instances based on the queued requests
install ray on newly allocated instances
handle various ray failures on allocated instances (ray install failed, or ray stopped)
reconcile the instance status with node provider.

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
scv119 authored and arvind-chandra committed Aug 31, 2023
1 parent a9f8424 commit b20e982
Show file tree
Hide file tree
Showing 16 changed files with 869 additions and 118 deletions.
23 changes: 23 additions & 0 deletions python/ray/autoscaler/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,29 @@ py_test(
deps = ["//:ray_lib",],
)

py_test(
name = "test_instance_launcher",
size = "small",
srcs = ["tests/test_instance_launcher.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_reconciler",
size = "small",
srcs = ["tests/test_reconciler.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_threaded_ray_installer",
size = "small",
srcs = ["tests/test_threaded_ray_installer.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_sdk",
Expand Down
Empty file.
88 changes: 57 additions & 31 deletions python/ray/autoscaler/v2/instance_manager/instance_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import logging
import time
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Set, Tuple
Expand All @@ -15,6 +17,7 @@ class InstanceUpdateEvent:

instance_id: str
new_status: int
new_ray_status: int = Instance.RAY_STATUS_UNKOWN


class InstanceUpdatedSuscriber(metaclass=ABCMeta):
Expand All @@ -39,7 +42,12 @@ def __init__(
self._storage = storage
self._cluster_id = cluster_id
self._table_name = f"instance_table@{cluster_id}"
self._status_change_subscriber = status_change_subscriber
self._status_change_subscribers = []
if status_change_subscriber:
self._status_change_subscribers.append(status_change_subscriber)

def add_status_change_subscriber(self, subscriber: InstanceUpdatedSuscriber):
self._status_change_subscribers.append(subscriber)

def batch_upsert_instances(
self,
Expand Down Expand Up @@ -68,25 +76,29 @@ def batch_upsert_instances(
return StoreStatus(False, version)

for instance in updates:
instance = copy.deepcopy(instance)
# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
instance.timestamp_since_last_modified = int(time.time())
mutations[instance.instance_id] = instance.SerializeToString()

result, version = self._storage.batch_update(
self._table_name, mutations, {}, expected_storage_version
)

if result and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
)
for instance in updates
],
)
if result:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
new_ray_status=instance.ray_status,
)
for instance in updates
],
)

return StoreStatus(result, version)

Expand Down Expand Up @@ -114,9 +126,11 @@ def upsert_instance(
Returns:
StoreStatus: A tuple of (success, storage_version).
"""
instance = copy.deepcopy(instance)
# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
instance.timestamp_since_last_modified = int(time.time())
result, version = self._storage.update(
self._table_name,
key=instance.instance_id,
Expand All @@ -126,26 +140,34 @@ def upsert_instance(
insert_only=False,
)

if result and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
)
],
)
if result:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance.instance_id,
new_status=instance.status,
new_ray_status=instance.ray_status,
)
],
)

return StoreStatus(result, version)

def get_instances(
self, instance_ids: List[str] = None, status_filter: Set[int] = None
self,
instance_ids: List[str] = None,
status_filter: Set[int] = None,
ray_status_filter: Set[int] = None,
) -> Tuple[Dict[str, Instance], int]:
"""Get instances from the storage.
Args:
instance_ids: A list of instance ids to be retrieved. If empty, all
instances will be retrieved.
status_filter: Only instances with the specified status will be returned.
ray_status_filter: Only instances with the specified ray status will
be returned.
Returns:
Tuple[Dict[str, Instance], int]: A tuple of (instances, version).
Expand All @@ -161,6 +183,8 @@ def get_instances(
instance.version = entry_version
if status_filter and instance.status not in status_filter:
continue
if ray_status_filter and instance.ray_status not in ray_status_filter:
continue
instances[instance_id] = instance
return instances, version

Expand All @@ -186,14 +210,16 @@ def batch_delete_instances(
self._table_name, {}, instance_ids, expected_storage_version
)

if result[0] and self._status_change_subscriber:
self._status_change_subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance_id,
new_status=Instance.GARAGE_COLLECTED,
)
for instance_id in instance_ids
],
)
if result[0]:
for subscriber in self._status_change_subscribers:
subscriber.notify(
[
InstanceUpdateEvent(
instance_id=instance_id,
new_status=Instance.GARBAGE_COLLECTED,
new_ray_status=Instance.RAY_STATUS_UNKOWN,
)
for instance_id in instance_ids
],
)
return result
4 changes: 2 additions & 2 deletions python/ray/autoscaler/v2/instance_manager/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ def _get_instance(self, cloud_instance_id: str) -> Instance:
instance = Instance()
instance.cloud_instance_id = cloud_instance_id
if self._provider.is_running(cloud_instance_id):
instance.status = Instance.STARTING
instance.status = Instance.ALLOCATED
elif self._provider.is_terminated(cloud_instance_id):
instance.status = Instance.STOPPED
else:
instance.status = Instance.INSTANCE_STATUS_UNSPECIFIED
instance.status = Instance.UNKNOWN
instance.internal_ip = self._provider.internal_ip(cloud_instance_id)
instance.external_ip = self._provider.external_ip(cloud_instance_id)
instance.instance_type = self._provider.node_tags(cloud_instance_id)[
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import logging
import math
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import List

from ray.autoscaler._private.constants import (
AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
AUTOSCALER_MAX_LAUNCH_BATCH,
)
from ray.autoscaler.v2.instance_manager.instance_storage import (
InstanceStorage,
InstanceUpdatedSuscriber,
InstanceUpdateEvent,
)
from ray.autoscaler.v2.instance_manager.node_provider import NodeProvider
from ray.core.generated.instance_manager_pb2 import Instance

logger = logging.getLogger(__name__)


class InstanceLauncher(InstanceUpdatedSuscriber):
"""InstanceLauncher is responsible for provisioning new instances."""

def __init__(
self,
instance_storage: InstanceStorage,
node_provider: NodeProvider,
max_concurrent_requests: int = math.ceil(
AUTOSCALER_MAX_CONCURRENT_LAUNCHES / float(AUTOSCALER_MAX_LAUNCH_BATCH)
),
max_instances_per_request: int = AUTOSCALER_MAX_LAUNCH_BATCH,
) -> None:
self._instance_storage = instance_storage
self._node_provider = node_provider
self._max_concurrent_requests = max_concurrent_requests
self._max_instances_per_request = max_instances_per_request
self._executor = ThreadPoolExecutor(max_workers=1)
self._launch_instance_executor = ThreadPoolExecutor(
max_workers=self._max_concurrent_requests
)

def notify(self, events: List[InstanceUpdateEvent]) -> None:
# TODO: we should do reconciliation based on events.
has_new_request = any(
[event.new_status == Instance.UNKNOWN for event in events]
)
if has_new_request:
self._executor.submit(self._may_launch_new_instances)

def _may_launch_new_instances(self):
new_instances, _ = self._instance_storage.get_instances(
status_filter={Instance.UNKNOWN}
)

if not new_instances:
logger.debug("No instances to launch")
return

queued_instances = []
for instance in new_instances.values():
instance.status = Instance.QUEUED
success, version = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
if success:
instance.version = version
queued_instances.append(instance)
else:
logger.error(f"Failed to update {instance} QUEUED")

instances_by_type = defaultdict(list)
for instance in queued_instances:
instances_by_type[instance.instance_type].append(instance)

for instance_type, instances in instances_by_type.items():
for i in range(0, len(instances), self._max_instances_per_request):
self._launch_instance_executor.submit(
self._launch_new_instances_by_type,
instance_type,
instances[
i : min(
i + self._max_instances_per_request,
len(instances),
)
],
)

def _launch_new_instances_by_type(
self, instance_type: str, instances: List[Instance]
) -> int:
"""Launches instances of the given type.
Args:
instance_type: type of instance to launch.
instances: list of instances to launch. These instances should
have been marked as QUEUED with instance_type set.
Returns:
num of instances launched.
"""
logger.info(f"Launching {len(instances)} instances of type {instance_type}")
instances_selected = []
for instance in instances:
instance.status = Instance.REQUESTED
result, version = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
if not result:
logger.warn(f"Failed to update instance {instance}")
continue
instance.version = version
instances_selected.append(instance)

if not instances_selected:
return 0

# TODO: idempotency token.
created_cloud_instances = self._node_provider.create_nodes(
instance_type, len(instances_selected)
)

assert len(created_cloud_instances) <= len(instances_selected)

instances_launched = 0
while created_cloud_instances and instances_selected:
cloud_instance = created_cloud_instances.pop()
instance = instances_selected.pop()
instance.cloud_instance_id = cloud_instance.cloud_instance_id
instance.internal_ip = cloud_instance.internal_ip
instance.external_ip = cloud_instance.external_ip
instance.status = Instance.ALLOCATED
instance.ray_status = Instance.RAY_STATUS_UNKOWN

# update instance status into the storage
result, _ = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)

if not result:
# TODO: this could only happen when the request is canceled.
logger.warn(f"Failed to update instance {instance}")
# push the cloud instance back
created_cloud_instances.append(cloud_instance)
continue

instances_launched += 1

if created_cloud_instances:
# instances are leaked, we probably need to terminate them
for instance in created_cloud_instances:
self._node_provider.terminate_node(instance.cloud_instance_id)

if instances_selected:
# instances creation failed, we need to marke them allocation failed.
for instance in instances_selected:
instance.status = Instance.ALLOCATION_FAILED
# TODO: add more information about the failure.
result, _ = self._instance_storage.upsert_instance(
instance, expected_instance_version=instance.version
)
# TODO: this could only happen when the request is canceled.
return instances_launched
Loading

0 comments on commit b20e982

Please sign in to comment.