Skip to content

Commit

Permalink
orca: remove hardcode redis_password from master branch (#5154)
Browse files Browse the repository at this point in the history
  • Loading branch information
liu-shaojun committed Dec 20, 2022
1 parent d8e4877 commit 5577a86
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyzoo/zoo/orca/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def init_orca_context(cluster_mode="local", cores=2, memory="2g", num_nodes=1,
raise ValueError("cluster_mode can only be local, yarn-client, standalone or spark-submit, "
"but got: %s".format(cluster_mode))
ray_args = {}
for key in ["redis_port", "password", "object_store_memory", "verbose", "env",
for key in ["redis_port", "redis_password", "object_store_memory", "verbose", "env",
"extra_params", "num_ray_nodes", "ray_node_cpu_cores", "include_webui"]:
if key in kwargs:
ray_args[key] = kwargs[key]
Expand Down
3 changes: 2 additions & 1 deletion pyzoo/zoo/orca/data/ray_xshards.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ def init_ray_if_not(redis_address, redis_password):
if not ray.is_initialized():
init_params = dict(
address=redis_address,
_redis_password=redis_password,
ignore_reinit_error=True
)
if redis_password:
init_params["_redis_password"] = redis_password
if version.parse(ray.__version__) >= version.parse("1.4.0"):
init_params["namespace"] = "az"
ray.init(**init_params)
Expand Down
57 changes: 34 additions & 23 deletions pyzoo/zoo/ray/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ def _prepare_env(self):
return modified_env

def __init__(self, python_loc, redis_port, ray_node_cpu_cores,
password, object_store_memory, verbose=False, env=None,
redis_password, object_store_memory, verbose=False, env=None,
include_webui=False,
extra_params=None):
"""object_store_memory: integer in bytes"""
self.env = env
self.python_loc = python_loc
self.redis_port = redis_port
self.password = password
self.redis_password = redis_password
self.ray_node_cpu_cores = ray_node_cpu_cores
self.ray_exec = self._get_ray_exec()
self.object_store_memory = object_store_memory
Expand Down Expand Up @@ -149,17 +149,20 @@ def _enrich_command(command, object_store_memory, extra_params):
if object_store_memory:
command = command + " --object-store-memory {}".format(str(object_store_memory))
if extra_params:
for pair in extra_params.items():
command = command + " --{} {}".format(pair[0], pair[1])
for k, v in extra_params.items():
kw = k.replace("_", "-")
command = command + " --{} {}".format(kw, v)
return command

def _gen_master_command(self):
webui = "true" if self.include_webui else "false"
command = "{} start --head " \
"--include-dashboard {} --dashboard-host 0.0.0.0 --port {} " \
"--redis-password {} --num-cpus {}". \
format(self.ray_exec, webui, self.redis_port, self.password,
"--num-cpus {}". \
format(self.ray_exec, webui, self.redis_port,
self.ray_node_cpu_cores)
if self.redis_password:
command = command + " --redis-password {}".format(self.redis_password)
if self.labels:
command = command + " " + self.labels
return RayServiceFuncGenerator._enrich_command(command=command,
Expand All @@ -169,13 +172,15 @@ def _gen_master_command(self):
@staticmethod
def _get_raylet_command(redis_address,
ray_exec,
password,
redis_password,
ray_node_cpu_cores,
labels="",
object_store_memory=None,
extra_params=None):
command = "{} start --address {} --redis-password {} --num-cpus {}".format(
ray_exec, redis_address, password, ray_node_cpu_cores)
command = "{} start --address {} --num-cpus {}".format(
ray_exec, redis_address, ray_node_cpu_cores)
if redis_password:
command = command + " --redis-password {}".format(redis_password)
if labels:
command = command + " " + labels
return RayServiceFuncGenerator._enrich_command(command=command,
Expand Down Expand Up @@ -258,7 +263,7 @@ def _start_raylets(iter):
command=RayServiceFuncGenerator._get_raylet_command(
redis_address=redis_address,
ray_exec=self.ray_exec,
password=self.password,
redis_password=self.redis_password,
ray_node_cpu_cores=self.ray_node_cpu_cores,
labels=self.labels,
object_store_memory=self.object_store_memory,
Expand Down Expand Up @@ -308,7 +313,7 @@ def _start_ray_services(iter):
command=RayServiceFuncGenerator._get_raylet_command(
redis_address=redis_address,
ray_exec=self.ray_exec,
password=self.password,
redis_password=self.redis_password,
ray_node_cpu_cores=self.ray_node_cpu_cores,
labels=self.labels,
object_store_memory=self.object_store_memory,
Expand All @@ -325,7 +330,7 @@ def _start_ray_services(iter):
class RayContext(object):
_active_ray_context = None

def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None,
def __init__(self, sc, redis_port=None, redis_password=None, object_store_memory=None,
verbose=False, env=None, extra_params=None, include_webui=True,
num_ray_nodes=None, ray_node_cpu_cores=None):
"""
Expand All @@ -343,7 +348,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
:param sc: An instance of SparkContext.
:param redis_port: The redis port for the ray head node. Default is None.
The value would be randomly picked if not specified.
:param password: The password for redis. Default to be "123456" if not specified.
:param redis_password: The password for redis. Default to be None if not specified.
:param object_store_memory: The memory size for ray object_store in string.
This can be specified in bytes(b), kilobytes(k), megabytes(m) or gigabytes(g).
For example, "50b", "100k", "250m", "30g".
Expand Down Expand Up @@ -371,7 +376,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
self.initialized = False
self.is_local = is_local(sc)
self.verbose = verbose
self.redis_password = password
self.redis_password = redis_password
self.object_store_memory = resource_to_bytes(object_store_memory)
self.ray_processesMonitor = None
self.env = env
Expand Down Expand Up @@ -437,7 +442,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
python_loc=self.python_loc,
redis_port=self.redis_port,
ray_node_cpu_cores=self.ray_node_cpu_cores,
password=self.redis_password,
redis_password=self.redis_password,
object_store_memory=self.object_store_memory,
verbose=self.verbose,
env=self.env,
Expand Down Expand Up @@ -501,6 +506,14 @@ def _get_spark_local_cores(self):
else:
return int(local_symbol)

def _update_extra_params(self, extra_params):
kwargs = {}
if extra_params is not None:
for k, v in extra_params.items():
kw = k.replace("-", "_")
kwargs[kw] = v
return kwargs

def init(self, driver_cores=0):
"""
Initiate the ray cluster.
Expand All @@ -519,19 +532,16 @@ def init(self, driver_cores=0):
if self.env:
os.environ.update(self.env)
import ray
kwargs = {}
if self.extra_params is not None:
for k, v in self.extra_params.items():
kw = k.replace("-", "_")
kwargs[kw] = v
kwargs = self._update_extra_params(self.extra_params)
init_params = dict(
num_cpus=self.ray_node_cpu_cores,
_redis_password=self.redis_password,
object_store_memory=self.object_store_memory,
include_dashboard=self.include_webui,
dashboard_host="0.0.0.0",
)
init_params.update(kwargs)
if self.redis_password:
init_params["_redis_password"] = self.redis_password
if version.parse(ray.__version__) >= version.parse("1.4.0"):
init_params["namespace"] = "az"
self._address_info = ray.init(**init_params)
Expand Down Expand Up @@ -595,7 +605,7 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address):
command = RayServiceFuncGenerator._get_raylet_command(
redis_address=redis_address,
ray_exec="ray",
password=self.redis_password,
redis_password=self.redis_password,
ray_node_cpu_cores=num_cores,
object_store_memory=self.object_store_memory,
extra_params=extra_param)
Expand All @@ -617,9 +627,10 @@ def _start_driver(self, num_cores, redis_address):
ray.shutdown()
init_params = dict(
address=redis_address,
_redis_password=self.ray_service.password,
_node_ip_address=node_ip
)
if self.redis_password:
init_params["_redis_password"] = self.redis_password
if version.parse(ray.__version__) >= version.parse("1.4.0"):
init_params["namespace"] = "az"
return ray.init(**init_params)

0 comments on commit 5577a86

Please sign in to comment.