Skip to content

Commit

Permalink
Launch nodes in separate threads (#2183)
Browse files Browse the repository at this point in the history
Modifies the autoscaler to run launch_new_nodes in a separate thread, keeping track of the number of pending requests.
  • Loading branch information
AdamGleave authored and ericl committed Jun 6, 2018
1 parent 13d4e0d commit 6ef3b25
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 58 deletions.
108 changes: 89 additions & 19 deletions python/ray/autoscaler/autoscaler.py
Expand Up @@ -2,10 +2,14 @@
from __future__ import division
from __future__ import print_function

import copy
import json
import hashlib
import math
import os
import queue
import subprocess
import threading
import time
import traceback

Expand All @@ -16,8 +20,8 @@
import yaml

from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_HEARTBEAT_TIMEOUT_S
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES,\
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler.node_provider import get_node_provider, \
get_default_config
from ray.autoscaler.updater import NodeUpdaterProcess
Expand Down Expand Up @@ -199,6 +203,64 @@ def _info(self):
}


class NodeLauncher(threading.Thread):
def __init__(self, queue, pending, *args, **kwargs):
self.queue = queue
self.pending = pending
self.provider = None
super(NodeLauncher, self).__init__(*args, **kwargs)

def _launch_node(self, config, count):
if self.provider is None:
self.provider = get_node_provider(config["provider"],
config["cluster_name"])

tag_filters = {TAG_RAY_NODE_TYPE: "worker"}
before = self.provider.nodes(tag_filters=tag_filters)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.provider.create_node(
config["worker_nodes"], {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(
config["cluster_name"]),
TAG_RAY_NODE_TYPE: "worker",
TAG_RAY_NODE_STATUS: "uninitialized",
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}, count)
after = self.provider.nodes(tag_filters=tag_filters)
if set(after).issubset(before):
print("Warning: No new nodes reported after node creation")

def run(self):
while True:
config, count = self.queue.get()
try:
self._launch_node(config, count)
finally:
self.pending.dec(count)


class ConcurrentCounter():
def __init__(self):
self._value = 0
self._lock = threading.Lock()

def inc(self, count):
with self._lock:
self._value += count
return self._value

def dec(self, count):
with self._lock:
assert self._value >= count, "counter cannot go negative"
self._value -= count
return self._value

@property
def value(self):
with self._lock:
return self._value


class StandardAutoscaler(object):
"""The autoscaling control loop for a Ray cluster.
Expand All @@ -220,6 +282,7 @@ class StandardAutoscaler(object):
def __init__(self,
config_path,
load_metrics,
max_launch_batch=AUTOSCALER_MAX_LAUNCH_BATCH,
max_concurrent_launches=AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
max_failures=AUTOSCALER_MAX_NUM_FAILURES,
process_runner=subprocess,
Expand All @@ -233,6 +296,7 @@ def __init__(self,
self.config["cluster_name"])

self.max_failures = max_failures
self.max_launch_batch = max_launch_batch
self.max_concurrent_launches = max_concurrent_launches
self.verbose_updates = verbose_updates
self.process_runner = process_runner
Expand All @@ -246,6 +310,17 @@ def __init__(self,
self.last_update_time = 0.0
self.update_interval_s = update_interval_s

# Node launchers
self.launch_queue = queue.Queue()
self.num_launches_pending = ConcurrentCounter()
max_batches = math.ceil(
max_concurrent_launches / float(max_launch_batch))
for i in range(int(max_batches)):
node_launcher = NodeLauncher(
queue=self.launch_queue, pending=self.num_launches_pending)
node_launcher.daemon = True
node_launcher.start()

# Expand local file_mounts to allow ~ in the paths. This can't be done
# earlier when the config is written since we might be on different
# platform and the expansion would result in wrong path.
Expand Down Expand Up @@ -278,6 +353,7 @@ def _update(self):
return

self.last_update_time = time.time()
num_pending = self.num_launches_pending.value
nodes = self.workers()
print(self.debug_string(nodes))
self.load_metrics.prune_active_ips(
Expand Down Expand Up @@ -318,9 +394,11 @@ def _update(self):

# Launch new nodes if needed
target_num = self.target_num_workers()
if len(nodes) < target_num:
self.launch_new_node(
min(self.max_concurrent_launches, target_num - len(nodes)))
num_nodes = len(nodes) + num_pending
if num_nodes < target_num:
max_allowed = min(self.max_launch_batch,
self.max_concurrent_launches - num_pending)
self.launch_new_node(min(max_allowed, target_num - num_nodes))
print(self.debug_string())

# Process any completed updates
Expand Down Expand Up @@ -453,27 +531,19 @@ def can_update(self, node_id):

def launch_new_node(self, count):
print("StandardAutoscaler: Launching {} new nodes".format(count))
num_before = len(self.workers())
self.provider.create_node(
self.config["worker_nodes"], {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(
self.config["cluster_name"]),
TAG_RAY_NODE_TYPE: "worker",
TAG_RAY_NODE_STATUS: "uninitialized",
TAG_RAY_LAUNCH_CONFIG: self.launch_hash,
}, count)
if len(self.workers()) <= num_before:
print("Warning: Num nodes failed to increase after node creation")
self.num_launches_pending.inc(count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count))

def workers(self):
return self.provider.nodes(tag_filters={
TAG_RAY_NODE_TYPE: "worker",
})
return self.provider.nodes(tag_filters={TAG_RAY_NODE_TYPE: "worker"})

def debug_string(self, nodes=None):
if nodes is None:
nodes = self.workers()
suffix = ""
if self.num_launches_pending:
suffix += " ({} pending)".format(self.num_launches_pending.value)
if self.updaters:
suffix += " ({} updating)".format(len(self.updaters))
if self.num_failed_updates:
Expand Down
5 changes: 5 additions & 0 deletions python/ray/ray_constants.py
Expand Up @@ -16,6 +16,11 @@ def env_integer(key, default):
# is a safety feature to prevent e.g. runaway node launches.
AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5)

# The maximum number of nodes to launch in a single request.
# Multiple requests may be made for this batch size, up to
# the limit of AUTOSCALER_MAX_CONCURRENT_LAUNCHES.
AUTOSCALER_MAX_LAUNCH_BATCH = env_integer("AUTOSCALER_MAX_LAUNCH_BATCH", 5)

# Max number of nodes to launch at a time.
AUTOSCALER_MAX_CONCURRENT_LAUNCHES = env_integer(
"AUTOSCALER_MAX_CONCURRENT_LAUNCHES", 10)
Expand Down

0 comments on commit 6ef3b25

Please sign in to comment.