From 6cd02d71f8ee87c2ef9079d2a205082f91debfac Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 21 Dec 2016 18:53:12 -0800 Subject: [PATCH] Fixes and cleanups for the multinode setting. (#143) * Add function for driver to get address info from Redis. * Use Redis address instead of Redis port. * Configure Redis to run in unprotected mode. * Add method for starting Ray processes on non-head node. * Pass in correct node ip address to start_plasma_manager. * Script for starting Ray processes. * Handle the case where an object already exists in the store. Maybe this should also compare the object hashes. * Have driver get info from Redis when start_ray_local=False. * Fix. * Script for killing ray processes. * Catch some errors when the main_loop in a worker throws an exception. * Allow redirecting stdout and stderr to /dev/null. * Wrap start_ray.py in a shell script. * More helpful error messages. * Fixes. * Wait for redis server to start up before configuring it. * Allow seeding of deterministic object ID generation. * Small change. --- lib/python/ray/services.py | 167 +++++++++++++++--- lib/python/ray/worker.py | 160 +++++++++++++---- lib/python/ray/workers/default_worker.py | 53 ++++-- scripts/start_ray.py | 40 +++++ scripts/start_ray.sh | 5 + scripts/stop_ray.sh | 3 + src/common/redis_module/ray_redis_module.c | 26 ++- src/common/state/redis.c | 14 +- .../lib/python/global_scheduler_services.py | 25 +-- src/photon/photon/photon_services.py | 25 +-- src/photon/photon_scheduler.c | 26 +-- src/photon/photon_scheduler.h | 1 + src/photon/test/photon_tests.c | 1 + src/plasma/plasma/plasma.py | 47 +++-- 14 files changed, 467 insertions(+), 126 deletions(-) create mode 100644 scripts/start_ray.py create mode 100755 scripts/start_ray.sh create mode 100755 scripts/stop_ray.sh diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 5b949756bbf1e..3733bdb595e90 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -5,6 +5,7 @@ import psutil import os import random +import redis import signal import string import subprocess @@ -62,7 +63,8 @@ def cleanup(): continue successfully_shut_down = False if successfully_shut_down: - print("Successfully shut down Ray.") + if len(all_processes) > 0: + print("Successfully shut down Ray.") else: print("Ray did not shut down properly.") all_processes = [] @@ -70,7 +72,7 @@ def cleanup(): def all_processes_alive(): return all([p.poll() is None for p in all_processes]) -def start_redis(num_retries=20, cleanup=True): +def start_redis(num_retries=20, cleanup=True, redirect_output=False): """Start a Redis server. Args: @@ -78,6 +80,8 @@ def start_redis(num_retries=20, cleanup=True): cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Returns: The port used by Redis. @@ -94,18 +98,48 @@ def start_redis(num_retries=20, cleanup=True): if counter > 0: print("Redis failed to start, retrying now.") port = new_port() - p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module]) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning", "--loadmodule", redis_module], stdout=stdout, stderr=stderr) time.sleep(0.1) # Check if Redis successfully started (or at least if it the executable did # not exit within 0.1 seconds). if p.poll() is None: if cleanup: all_processes.append(p) - return port + break counter += 1 - raise Exception("Couldn't start Redis.") + if counter == num_retries: + raise Exception("Couldn't start Redis.") -def start_global_scheduler(redis_address, cleanup=True): + # Create a Redis client just for configuring Redis. + redis_client = redis.StrictRedis(host="127.0.0.1", port=port) + + # Wait for the Redis server to start. + counter = 0 + while counter < num_retries: + try: + # Run some random command and see if it worked. + redis_client.client_list() + except redis.ConnectionError as e: + # Wait a little bit. + time.sleep(1) + counter += 1 + else: + break + if counter == num_retries: + raise Exception("The Redis server did not start properly.") + + # Configure Redis to generate keyspace notifications. TODO(rkn): Change this + # to only generate notifications for the export keys. + redis_client.config_set("notify-keyspace-events", "Kl") + # Configure Redis to not run in protected mode so that processes on other + # hosts can connect to it. TODO(rkn): Do this in a more secure way. + redis_client.config_set("protected-mode", "no") + return port + +def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): """Start a global scheduler process. Args: @@ -113,12 +147,14 @@ def start_global_scheduler(redis_address, cleanup=True): cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. """ - p = global_scheduler.start_global_scheduler(redis_address) + p = global_scheduler.start_global_scheduler(redis_address, redirect_output=redirect_output) if cleanup: all_processes.append(p) -def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True): +def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True, redirect_output=False): """Start a local scheduler process. Args: @@ -131,24 +167,28 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, pla cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Return: The name of the local scheduler socket. """ - local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER) + local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output) if cleanup: all_processes.append(p) return local_scheduler_name -def start_objstore(node_ip_address, redis_address, cleanup=True): +def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output=False): """This method starts an object store process. Args: - node_ip_address (str): The ip address of the node running the object store. + node_ip_address (str): The IP address of the node running the object store. redis_address (str): The address of the Redis instance to connect to. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Return: A tuple of the Plasma store socket name, the Plasma manager socket name, and @@ -164,16 +204,16 @@ def start_objstore(node_ip_address, redis_address, cleanup=True): else: plasma_store_memory = int(system_memory * 0.75) # Start the Plasma store. - plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=plasma_store_memory, use_profiler=RUN_PLASMA_STORE_PROFILER) + plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=plasma_store_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, redirect_output=redirect_output) # Start the plasma manager. - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER) + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) if cleanup: all_processes.append(p1) all_processes.append(p2) return plasma_store_name, plasma_manager_name, plasma_manager_port -def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_port, worker_path, cleanup=True): +def start_worker(node_ip_address, object_store_name, object_store_manager_name, local_scheduler_name, redis_address, worker_path, cleanup=True, redirect_output=False): """This method starts a worker process. Args: @@ -182,12 +222,14 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, object_store_name (str): The name of the object store. object_store_manager_name (str): The name of the object store manager. local_scheduler_name (str): The name of the local scheduler. - redis_port (int): The port that the Redis server is listening on. + redis_address (int): The address that the Redis server is listening on. worker_path (str): The path of the source code which the worker process will run. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by services.cleanup() when the Python process that imported services exits. This is True by default. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. """ command = ["python", worker_path, @@ -195,12 +237,15 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, "--object-store-name=" + object_store_name, "--object-store-manager-name=" + object_store_manager_name, "--local-scheduler-name=" + local_scheduler_name, - "--redis-port=" + str(redis_port)] - p = subprocess.Popen(command) + "--redis-address=" + str(redis_address)] + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + p = subprocess.Popen(command, stdout=stdout, stderr=stderr) if cleanup: all_processes.append(p) -def start_webui(redis_port, cleanup=True): +def start_webui(redis_port, cleanup=True, redirect_output=False): """This method starts the web interface. Args: @@ -208,23 +253,88 @@ def start_webui(redis_port, cleanup=True): cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by services.cleanup() when the Python process that imported services exits. This is True by default. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. """ executable = "nodejs" if sys.platform == "linux" or sys.platform == "linux2" else "node" command = [executable, os.path.join(os.path.abspath(os.path.dirname(__file__)), "../webui/index.js"), str(redis_port)] with open("/tmp/webui_out.txt", "wb") as out: - p = subprocess.Popen(command, stdout=out) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else out + stderr = FNULL if redirect_output else None + p = subprocess.Popen(command, stdout=stdout, stderr=stderr) if cleanup: all_processes.append(p) -def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedulers=1, worker_path=None): +def start_ray_node(node_ip_address, redis_address, num_workers=0, num_local_schedulers=1, worker_path=None, cleanup=True, redirect_output=False): + """Start the Ray processes for a single node. + + This assumes that the Ray processes on some master node have already been + started. + + Args: + node_ip_address (str): The IP address of this node. + redis_address (str): The address of the Redis server. + num_workers (int): The number of workers to start. + num_local_schedulers (int): The number of local schedulers to start. This is + also the number of plasma stores and plasma managers to start. + worker_path (str): The path of the source code that will be run by the + worker. + cleanup (bool): If cleanup is true, then the processes started here will be + killed by services.cleanup() when the Python process that called this + method exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. + """ + if worker_path is None: + worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py") + object_store_names = [] + object_store_manager_names = [] + local_scheduler_names = [] + for _ in range(num_local_schedulers): + # Start Plasma. + object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=cleanup, redirect_output=redirect_output) + object_store_names.append(object_store_name) + object_store_manager_names.append(object_store_manager_name) + time.sleep(0.1) + # Start the local scheduler. + plasma_address = "{}:{}".format(node_ip_address, object_store_manager_port) + local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=cleanup, redirect_output=redirect_output) + local_scheduler_names.append(local_scheduler_name) + time.sleep(0.1) + # Aggregate the address information together. + address_info = {"node_ip_address": node_ip_address, + "object_store_names": object_store_names, + "object_store_manager_names": object_store_manager_names, + "local_scheduler_names": local_scheduler_names} + # Start the workers. + for i in range(num_workers): + start_worker(address_info["node_ip_address"], + address_info["object_store_names"][i % num_local_schedulers], + address_info["object_store_manager_names"][i % num_local_schedulers], + address_info["local_scheduler_names"][i % num_local_schedulers], + redis_address, + worker_path, + cleanup=cleanup, + redirect_output=redirect_output) + # Return the addresses of the relevant processes. + return address_info + +def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedulers=1, worker_path=None, cleanup=True, redirect_output=False): """Start Ray in local mode. Args: + node_ip_address (str): The IP address of this node. num_workers (int): The number of workers to start. num_local_schedulers (int): The number of local schedulers to start. This is also the number of plasma stores and plasma managers to start. worker_path (str): The path of the source code that will be run by the worker. + cleanup (bool): If cleanup is true, then the processes started here will be + killed by services.cleanup() when the Python process that called this + method exits. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Returns: This returns a dictionary of the address information for the processes that @@ -233,28 +343,28 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedu if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py") # Start Redis. - redis_port = start_redis(cleanup=True) + redis_port = start_redis(cleanup=cleanup, redirect_output=redirect_output) redis_address = address(node_ip_address, redis_port) time.sleep(0.1) # Start the global scheduler. - start_global_scheduler(redis_address, cleanup=True) + start_global_scheduler(redis_address, cleanup=cleanup, redirect_output=redirect_output) object_store_names = [] object_store_manager_names = [] local_scheduler_names = [] for _ in range(num_local_schedulers): # Start Plasma. - object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) + object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=cleanup, redirect_output=redirect_output) object_store_names.append(object_store_name) object_store_manager_names.append(object_store_manager_name) time.sleep(0.1) # Start the local scheduler. plasma_address = "{}:{}".format(node_ip_address, object_store_manager_port) - local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=True) + local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, object_store_name, object_store_manager_name, plasma_address=plasma_address, cleanup=cleanup, redirect_output=redirect_output) local_scheduler_names.append(local_scheduler_name) time.sleep(0.1) # Aggregate the address information together. address_info = {"node_ip_address": node_ip_address, - "redis_port": redis_port, + "redis_address": redis_address, "object_store_names": object_store_names, "object_store_manager_names": object_store_manager_names, "local_scheduler_names": local_scheduler_names} @@ -264,9 +374,10 @@ def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, num_local_schedu address_info["object_store_names"][i % num_local_schedulers], address_info["object_store_manager_names"][i % num_local_schedulers], address_info["local_scheduler_names"][i % num_local_schedulers], - redis_port, + redis_address, worker_path, - cleanup=True) + cleanup=cleanup, + redirect_output=redirect_output) # Return the addresses of the relevant processes. - start_webui(redis_port) + start_webui(redis_port, cleanup=cleanup, redirect_output=redirect_output) return address_info diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index ae1b420a37994..df6fbfdde8768 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -419,7 +419,16 @@ def put_object(self, objectid, value): # Serialize and put the object in the object store. schema, size, serialized = numbuf_serialize(value) size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary. - buff = self.plasma_client.create(objectid.id(), size, bytearray(schema)) + try: + buff = self.plasma_client.create(objectid.id(), size, bytearray(schema)) + except RuntimeError as e: + if e.args != ("an object with this ID could not be created",): + raise + # The object already exists in the object store, so there is no need to + # add it again. TODO(rkn): We need to compare the hashes and make sure + # that the objects are in fact the same. + print("This object already exists in the object store.") + return data = np.frombuffer(buff.buffer, dtype="byte")[8:] metadata_offset = numbuf.write_to_buffer(serialized, memoryview(data)) np.frombuffer(buff.buffer, dtype="int64", count=1)[0] = metadata_offset @@ -617,7 +626,52 @@ def objectid_custom_deserializer(serialized_obj): register_class(RayGetError) register_class(RayGetArgumentError) -def init(start_ray_local=False, num_workers=None, num_local_schedulers=1, driver_mode=SCRIPT_MODE): +def get_address_info_from_redis_helper(redis_address, node_ip_address): + redis_host, redis_port = redis_address.split(":") + # For this command to work, some other client (on the same machine as Redis) + # must have run "CONFIG SET protected-mode no". + redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + # The client table prefix must be kept in sync with the file + # "src/common/redis_module/ray_redis_module.c" where it is defined. + REDIS_CLIENT_TABLE_PREFIX = "CL:" + client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX)) + # Filter to clients on the same node and do some basic checking. + plasma_managers = [] + local_schedulers = [] + for key in client_keys: + info = redis_client.hgetall(key) + assert b"ray_client_id" in info + assert b"node_ip_address" in info + assert b"client_type" in info + if info[b"node_ip_address"].decode("ascii") == node_ip_address: + if info[b"client_type"].decode("ascii") == "plasma_manager": + plasma_managers.append(info) + elif info[b"client_type"].decode("ascii") == "photon": + local_schedulers.append(info) + # Make sure that we got at one plasma manager and local scheduler. + assert len(plasma_managers) == 1 + assert len(local_schedulers) == 1 + client_info = {"node_ip_address": node_ip_address, + "redis_address": redis_address, + "store_socket_name": plasma_managers[0][b"store_socket_name"].decode("ascii"), + "manager_socket_name": plasma_managers[0][b"manager_socket_name"].decode("ascii"), + "local_scheduler_socket_name": local_schedulers[0][b"local_scheduler_socket_name"].decode("ascii")} + return client_info + +def get_address_info_from_redis(redis_address, node_ip_address, num_retries=10): + counter = 0 + while True: + try: + return get_address_info_from_redis_helper(redis_address, node_ip_address) + except Exception as e: + if counter == num_retries: + raise + # Some of the information may not be in Redis yet, so wait a little bit. + print("Some processes that the driver needs to connect to have not registered with Redis, so retrying.") + time.sleep(1) + counter += 1 + +def init(node_ip_address="127.0.0.1", redis_address=None, start_ray_local=False, object_id_seed=None, num_workers=None, num_local_schedulers=None, driver_mode=SCRIPT_MODE): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -625,44 +679,72 @@ def init(start_ray_local=False, num_workers=None, num_local_schedulers=1, driver with a Ray cluster and attach to the newly started cluster. Args: - start_ray_local (Optional[bool]): If True then this will start a scheduler - an object store, and some workers. If False, this will attach to an - existing Ray cluster. - num_workers (Optional[int]): The number of workers to start if + node_ip_address (str): The IP address of the node that we are on. + redis_address (str): The address of the Redis server to connect to. This + should only be provided if start_ray_local is False. + start_ray_local (bool): If True then this will start Redis, a global + scheduler, a local scheduler, a plasma store, a plasma manager, and some + workers. It will also kill these processes when Python exits. If False, + this will attach to an existing Ray cluster. + object_id_seed (int): Used to seed the deterministic generation of object + IDs. The same value can be used across multiple runs of the same job in + order to generate the object IDs in a consistent manner. However, the same + ID should not be used for different jobs. + num_workers (int): The number of workers to start. This is only provided if start_ray_local is True. - num_local_schedulers (Optional[int]): The number of local schedulers to - start if start_ray_local is True. - driver_mode (Optional[bool]): The mode in which to start the driver. This - should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE. + num_local_schedulers (int): The number of local schedulers to start. This is + only provided if start_ray_local is True. + driver_mode (bool): The mode in which to start the driver. This should be + one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. Returns: - The address of the Redis server. + Address information about the started processes. Raises: Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ check_main_thread() + if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: + raise Exception("Driver_mode must be in [ray.SCRIPT_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].") if driver_mode == PYTHON_MODE: # If starting Ray in PYTHON_MODE, don't start any other processes. - address_info = {} + info = {} elif start_ray_local: # In this case, we launch a scheduler, a new object store, and some workers, # and we connect to them. - if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: - raise Exception("If start_ray_local=True, then driver_mode must be in [ray.SCRIPT_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].") + if redis_address is not None: + raise Exception("If start_ray_local=True, then redis_address cannot be provided because ray.init will start a new Redis server.") # Use the address 127.0.0.1 in local mode. + node_ip_address = "127.0.0.1" if node_ip_address is None else node_ip_address + # Use 1 worker if num_workers is not provided. num_workers = 1 if num_workers is None else num_workers + # Use 1 local scheduler if num_local_schedulers is not provided. + num_local_schedulers = 1 if num_local_schedulers is None else num_local_schedulers # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. - address_info = services.start_ray_local(num_workers=num_workers, num_local_schedulers=num_local_schedulers) + address_info = services.start_ray_local(node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers) + info = {"node_ip_address": node_ip_address, + "redis_address": address_info["redis_address"], + "store_socket_name": address_info["object_store_names"][0], + "manager_socket_name": address_info["object_store_manager_names"][0], + "local_scheduler_socket_name": address_info["local_scheduler_names"][0]} else: - raise Exception("This mode is currently not enabled.") + if redis_address is None: + raise Exception("If start_ray_local=False, then redis_address must be provided.") + if node_ip_address is None: + raise Exception("If start_ray_local=False, then node_ip_address must be provided.") + if num_workers is not None: + raise Exception("If start_ray_local=False, then num_workers must not be provided.") + if num_local_schedulers is not None: + raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.") + # Get the address info of the processes to connect to from Redis. + info = get_address_info_from_redis(redis_address, node_ip_address) # Connect this driver to Redis, the object store, and the local scheduler. The # corresponing call to disconnect will happen in the call to cleanup() when # the Python script exits. - connect(address_info, driver_mode, worker=global_worker) - return address_info + connect(info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker) + return info def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. @@ -828,17 +910,23 @@ def import_thread(worker): worker.redis_client.hincrby(worker_info_key, "export_counter", 1) worker.worker_import_counter += 1 -def connect(address_info, mode=WORKER_MODE, worker=global_worker): - """Connect this worker to the scheduler and an object store. +def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): + """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: - address_info (dict): This contains the entries node_ip_address, - redis_address, object_store_name, object_store_manager_name, and - local_scheduler_name. + info (dict): A dictionary with address of the Redis server and the sockets + of the plasma store, plasma manager, and local scheduler. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ check_main_thread() + # Do some basic checking to make sure we didn't call ray.init twice. + error_message = "Perhaps you called ray.init twice by accident?" + assert not worker.connected, error_message + assert worker.cached_functions_to_run is not None, error_message + assert worker.cached_remote_functions is not None, error_message + assert reusables._cached_reusables is not None, error_message + # Initialize some fields. worker.worker_id = random_string() worker.connected = True worker.set_mode(mode) @@ -847,13 +935,13 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker): if mode == PYTHON_MODE: return # Create a Redis client. - worker.redis_client = redis.StrictRedis(host=address_info["node_ip_address"], port=address_info["redis_port"]) - worker.redis_client.config_set("notify-keyspace-events", "AKE") + redis_host, redis_port = info["redis_address"].split(":") + worker.redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) worker.lock = threading.Lock() # Create an object store client. - worker.plasma_client = plasma.PlasmaClient(address_info["object_store_names"][0], address_info["object_store_manager_names"][0]) + worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. - worker.photon_client = photon.PhotonClient(address_info["local_scheduler_names"][0]) + worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"]) # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: worker.redis_client.rpush("Drivers", worker.worker_id) @@ -861,10 +949,22 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker): worker.redis_client.rpush("Workers", worker.worker_id) else: raise Exception("This code should be unreachable.") - # If this is a driver, set the current task ID to a specific fixed value and - # set the task index to 0. + # If this is a driver, set the current task ID and set the task index to 0. if mode in [SCRIPT_MODE, SILENT_MODE]: - worker.current_task_id = photon.ObjectID("".join(chr(i) for i in range(20))) + # If the user provided an object_id_seed, then set the current task ID + # deterministically based on that seed (without altering the state of the + # user's random number generator). Otherwise, set the current task ID + # randomly to avoid object ID collisions. + numpy_state = np.random.get_state() + if object_id_seed is not None: + np.random.seed(object_id_seed) + else: + # Try to use true randomness. + np.random.seed(None) + worker.current_task_id = photon.ObjectID(np.random.bytes(20)) + # Reset the state of the numpy random number generator. + np.random.set_state(numpy_state) + # Set other fields needed for computing task IDs. worker.task_index = 0 worker.put_index = 0 # If this is a worker, then start a thread to import exports from the driver. diff --git a/lib/python/ray/workers/default_worker.py b/lib/python/ray/workers/default_worker.py index f3a7349f877f5..e96fe75c2866a 100644 --- a/lib/python/ray/workers/default_worker.py +++ b/lib/python/ray/workers/default_worker.py @@ -2,26 +2,59 @@ from __future__ import division from __future__ import print_function -import sys import argparse import numpy as np +import redis +import traceback import ray parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") -parser.add_argument("--redis-port", required=True, type=int, help="the port to use for Redis") +parser.add_argument("--redis-address", required=True, type=str, help="the address to use for Redis") parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name") parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name") parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name") +def random_string(): + return np.random.bytes(20) + if __name__ == "__main__": args = parser.parse_args() - address_info = {"node_ip_address": args.node_ip_address, - "redis_port": args.redis_port, - "object_store_names": [args.object_store_name], - "object_store_manager_names": [args.object_store_manager_name], - "local_scheduler_names": [args.local_scheduler_name]} - ray.worker.connect(address_info, ray.WORKER_MODE) - - ray.worker.main_loop() + info = {"redis_address": args.redis_address, + "store_socket_name": args.object_store_name, + "manager_socket_name": args.object_store_manager_name, + "local_scheduler_socket_name": args.local_scheduler_name} + ray.worker.connect(info, ray.WORKER_MODE) + + error_explanation = """ +This error is unexpected and should not have happened. Somehow a worker crashed +in an unanticipated way causing the main_loop to throw an exception, which is +being caught in "lib/python/ray/workers/default_worker.py". +""" + + while True: + try: + # This call to main_loop should never return if things are working. Most + # exceptions that are thrown (e.g., inside the execution of a task) should + # be caught and handled inside of the call to main_loop. If an exception + # is thrown here, then that means that there is some error that we didn't + # anticipate. + ray.worker.main_loop() + except Exception as e: + traceback_str = traceback.format_exc() + error_explanation + error_key = "WorkerError:{}".format(random_string()) + redis_host, redis_port = args.redis_address.split(":") + # For this command to work, some other client (on the same machine as + # Redis) must have run "CONFIG SET protected-mode no". + redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + redis_client.hmset(error_key, {"message": traceback_str, + "note": "This error is unexpected and should not have happened."}) + redis_client.rpush("ErrorKeys", error_key) + # TODO(rkn): Note that if the worker was in the middle of executing a + # task, the any worker or driver that is blocking in a get call and + # waiting for the output of that task will hang. We need to address this. + + # After putting the error message in Redis, this worker will attempt to + # reenter the main loop. TODO(rkn): We should probably reset it's state and + # call connect again. diff --git a/scripts/start_ray.py b/scripts/start_ray.py new file mode 100644 index 0000000000000..667fcd5316b6c --- /dev/null +++ b/scripts/start_ray.py @@ -0,0 +1,40 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse + +import ray.services as services + +parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") +parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") +parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis") +parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") +parser.add_argument("--head", action="store_true", help="provide this argument for the head node") + +if __name__ == "__main__": + args = parser.parse_args() + + # Note that we redirect stdout and stderr to /dev/null because otherwise + # attempts to print may cause exceptions if a process is started inside of an + # SSH connection and the SSH connection dies. TODO(rkn): This is a temporary + # fix. We should actually redirect stdout and stderr to Redis in some way. + + if args.head: + # Start Ray on the head node. + if args.redis_address is not None: + raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.") + address_info = services.start_ray_local(node_ip_address=args.node_ip_address, + num_workers=args.num_workers, + cleanup=False, + redirect_output=True) + else: + # Start Ray on a non-head node. + if args.redis_address is None: + raise Exception("If --head is not passed in, --redis-address must be provided.") + address_info = services.start_ray_node(node_ip_address=args.node_ip_address, + redis_address=args.redis_address, + num_workers=args.num_workers, + cleanup=False, + redirect_output=True) + print(address_info) diff --git a/scripts/start_ray.sh b/scripts/start_ray.sh new file mode 100755 index 0000000000000..dad9920676bdc --- /dev/null +++ b/scripts/start_ray.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +python "$ROOT_DIR/start_ray.py" "$@" diff --git a/scripts/stop_ray.sh b/scripts/stop_ray.sh new file mode 100755 index 0000000000000..0727d3cb26c7e --- /dev/null +++ b/scripts/stop_ray.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +killall redis-server global_scheduler plasma_store plasma_manager photon_scheduler diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index 9b2d094b62572..79974be6f9d13 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -61,14 +61,25 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, /** * Register a client with Redis. This is called from a client with the command: * - * RAY.CONNECT
+ * RAY.CONNECT + * ... + * + * The command can take an arbitrary number of pairs of field names and keys, + * and these will be stored in a hashmap associated with this client. Several + * fields are singled out for special treatment: + * + * address: This is provided by plasma managers and it should be an address + * like "127.0.0.1:1234". It is returned by RAY.GET_CLIENT_ADDRESS so + * that other plasma managers know how to fetch objects. + * aux_address: This is provided by local schedulers and should be the + * address of the plasma manager that the local scheduler is connected + * to. This is published to the "db_clients" channel by the RAY.CONNECT + * command and is used by the global scheduler to determine which plasma + * managers and local schedulers are connected. * - * @param client_type The type of the client (e.g., plasma_manager). - * @param address The address of the client. * @param ray_client_id The db client ID of the client. - * @param aux_address An auxiliary address. This is currently just used by the - * local scheduler to record the address of the plasma manager that it is - * connected to. + * @param node_ip_address The IP address of the node the client is on. + * @param client_type The type of the client (e.g., plasma_manager). * @return OK if the operation was successful. */ int Connect_RedisCommand(RedisModuleCtx *ctx, @@ -95,7 +106,8 @@ int Connect_RedisCommand(RedisModuleCtx *ctx, RedisModule_CreateString(ctx, "aux_address", strlen("aux_address")); RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS, - "node_ip_address", node_ip_address, NULL); + "ray_client_id", ray_client_id, "node_ip_address", + node_ip_address, "client_type", client_type, NULL); for (int i = 4; i < argc; i += 2) { RedisModuleString *key = argv[i]; diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 486d3f22bf615..c9a657eaffd07 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -160,11 +160,19 @@ db_handle *db_connect(const char *db_address, "could not establish synchronous connection to redis " "%s:%d", db_address, db_port); - /* Enable keyspace events. */ - reply = redisCommand(context, "CONFIG SET notify-keyspace-events AKE"); + /* Configure Redis to generate keyspace notifications for list events. This + * should only need to be done once (by whoever started Redis), but since + * Redis may be started in multiple places (e.g., for testing or when starting + * processes by hand), it is easier to do it multiple times. */ + reply = redisCommand(context, "CONFIG SET notify-keyspace-events Kl"); CHECKM(reply != NULL, "db_connect failed on CONFIG SET"); freeReplyObject(reply); - /* Add new client using optimistic locking. */ + /* Also configure Redis to not run in protected mode, so clients on other + * hosts can connect to it. */ + reply = redisCommand(context, "CONFIG SET protected-mode no"); + CHECKM(reply != NULL, "db_connect failed on CONFIG SET"); + freeReplyObject(reply); + /* Create a client ID for this client. */ db_client_id client = globally_unique_id(); /* Construct the argument arrays for RAY.CONNECT. */ diff --git a/src/global_scheduler/lib/python/global_scheduler_services.py b/src/global_scheduler/lib/python/global_scheduler_services.py index 86dffff93e5b3..505c0da13f6f9 100644 --- a/src/global_scheduler/lib/python/global_scheduler_services.py +++ b/src/global_scheduler/lib/python/global_scheduler_services.py @@ -6,7 +6,7 @@ import subprocess import time -def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False): +def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False, redirect_output=False): """Start a global scheduler process. Args: @@ -15,6 +15,8 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the global scheduler should be started inside a profiler. If this is True, use_valgrind must be False. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Return: The process ID of the global scheduler process. @@ -23,13 +25,16 @@ def start_global_scheduler(redis_address, use_valgrind=False, use_profiler=False raise Exception("Cannot use valgrind and profiler at the same time.") global_scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/global_scheduler") command = [global_scheduler_executable, "-r", redis_address] - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) - time.sleep(1.0) - else: - pid = subprocess.Popen(command) - time.sleep(0.1) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) + time.sleep(0.1) return pid diff --git a/src/photon/photon/photon_services.py b/src/photon/photon/photon_services.py index 034647be6ffdb..5aa629cc2ac83 100644 --- a/src/photon/photon/photon_services.py +++ b/src/photon/photon/photon_services.py @@ -10,7 +10,7 @@ def random_name(): return str(random.randint(0, 99999999)) -def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_address=None, node_ip_address="127.0.0.1", redis_address=None, use_valgrind=False, use_profiler=False): +def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_address=None, node_ip_address="127.0.0.1", redis_address=None, use_valgrind=False, use_profiler=False, redirect_output=False): """Start a local scheduler process. Args: @@ -29,6 +29,8 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_ad valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the local scheduler should be started inside a profiler. If this is True, use_valgrind must be False. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Return: A tuple of the name of the local scheduler socket and the process ID of the @@ -47,13 +49,16 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, plasma_ad command += ["-r", redis_address] if plasma_address is not None: command += ["-a", plasma_address] - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) - time.sleep(1.0) - else: - pid = subprocess.Popen(command) - time.sleep(0.1) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) + time.sleep(0.1) return local_scheduler_name, pid diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 88a9422995711..06d8f000742aa 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -29,6 +29,7 @@ local_scheduler_state *init_local_scheduler( event_loop *loop, const char *redis_addr, int redis_port, + const char *local_scheduler_socket_name, const char *plasma_store_socket_name, const char *plasma_manager_socket_name, const char *plasma_manager_address, @@ -40,19 +41,24 @@ local_scheduler_state *init_local_scheduler( utarray_new(state->workers, &worker_icd); /* Connect to Redis if a Redis address is provided. */ if (redis_addr != NULL) { - int num_args = 0; + int num_args; const char **db_connect_args = NULL; if (plasma_manager_address != NULL) { + num_args = 4; + db_connect_args = malloc(sizeof(char *) * num_args); + db_connect_args[0] = "local_scheduler_socket_name"; + db_connect_args[1] = local_scheduler_socket_name; + db_connect_args[2] = "aux_address"; + db_connect_args[3] = plasma_manager_address; + } else { num_args = 2; db_connect_args = malloc(sizeof(char *) * num_args); - db_connect_args[0] = "aux_address"; - db_connect_args[1] = plasma_manager_address; + db_connect_args[0] = "local_scheduler_socket_name"; + db_connect_args[1] = local_scheduler_socket_name; } state->db = db_connect(redis_addr, redis_port, "photon", node_ip_address, num_args, db_connect_args); - if (num_args != 0) { - free(db_connect_args); - }; + free(db_connect_args); db_attach(state->db, loop, false); } else { state->db = NULL; @@ -295,10 +301,10 @@ void start_server(const char *node_ip_address, bool global_scheduler_exists) { int fd = bind_ipc_sock(socket_name, true); event_loop *loop = event_loop_create(); - g_state = - init_local_scheduler(node_ip_address, loop, redis_addr, redis_port, - plasma_store_socket_name, plasma_manager_socket_name, - plasma_manager_address, global_scheduler_exists); + g_state = init_local_scheduler( + node_ip_address, loop, redis_addr, redis_port, socket_name, + plasma_store_socket_name, plasma_manager_socket_name, + plasma_manager_address, global_scheduler_exists); /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index d08d6d837ea56..b8417c4432a67 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -69,6 +69,7 @@ local_scheduler_state *init_local_scheduler( event_loop *loop, const char *redis_addr, int redis_port, + const char *local_scheduler_socket_name, const char *plasma_manager_socket_name, const char *plasma_store_socket_name, const char *plasma_manager_address, diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index 2cde831dab0a1..fa5ce87b6ecca 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -59,6 +59,7 @@ photon_mock *init_photon_mock() { CHECK(mock->plasma_fd >= 0 && mock->photon_fd >= 0); mock->photon_state = init_local_scheduler( "127.0.0.1", mock->loop, redis_addr, redis_port, + utstring_body(photon_socket_name), utstring_body(plasma_manager_socket_name), utstring_body(plasma_store_socket_name), NULL, false); /* Connect a Photon client. */ diff --git a/src/plasma/plasma/plasma.py b/src/plasma/plasma/plasma.py index f7d66592c5519..853cecb5006ed 100644 --- a/src/plasma/plasma/plasma.py +++ b/src/plasma/plasma/plasma.py @@ -234,7 +234,7 @@ def get_next_notification(self): def random_name(): return str(random.randint(0, 99999999)) -def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False): +def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=False, use_profiler=False, redirect_output=False): """Start a plasma store process. Args: @@ -242,6 +242,8 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the plasma store should be started inside a profiler. If this is True, use_valgrind must be False. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Return: A tuple of the name of the plasma store socket and the process ID of the @@ -252,25 +254,31 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "plasma_store") plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] - if use_valgrind: - pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - time.sleep(1.0) - elif use_profiler: - pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) - time.sleep(1.0) - else: - pid = subprocess.Popen(command) - time.sleep(0.1) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + if use_valgrind: + pid = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout, stderr=stderr) + time.sleep(0.1) return plasma_store_name, pid -def start_plasma_manager(store_name, redis_address, num_retries=20, use_valgrind=False, run_profiler=False): +def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", num_retries=20, use_valgrind=False, run_profiler=False, redirect_output=False): """Start a plasma manager and return the ports it listens on. Args: store_name (str): The name of the plasma store socket. redis_address (str): The address of the Redis server. + node_ip_address (str): The IP address of the node. use_valgrind (bool): True if the Plasma manager should be started inside of valgrind and False otherwise. + redirect_output (bool): True if stdout and stderr should be redirected to + /dev/null. Returns: A tuple of the Plasma manager socket name, the process ID of the Plasma @@ -291,15 +299,18 @@ def start_plasma_manager(store_name, redis_address, num_retries=20, use_valgrind command = [plasma_manager_executable, "-s", store_name, "-m", plasma_manager_name, - "-h", "127.0.0.1", + "-h", node_ip_address, "-p", str(port), "-r", redis_address] - if use_valgrind: - process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - elif run_profiler: - process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command) - else: - process = subprocess.Popen(command) + with open(os.devnull, "w") as FNULL: + stdout = FNULL if redirect_output else None + stderr = FNULL if redirect_output else None + if use_valgrind: + process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command, stdout=stdout, stderr=stderr) + elif run_profiler: + process = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, stdout=stdout, stderr=stderr) + else: + process = subprocess.Popen(command, stdout=stdout, stderr=stderr) # This sleep is critical. If the plasma_manager fails to start because the # port is already in use, then we need it to fail within 0.1 seconds. time.sleep(0.1)