Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard] Select port in dashboard #13763

Merged
merged 14 commits into from
Feb 24, 2021
3 changes: 2 additions & 1 deletion cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ static void StartRayNode(int redis_port, std::string redis_password,
int node_manager_port) {
std::vector<std::string> cmdargs(
{"ray", "start", "--head", "--port", std::to_string(redis_port), "--redis-password",
redis_password, "--node-manager-port", std::to_string(node_manager_port)});
redis_password, "--node-manager-port", std::to_string(node_manager_port),
"--include-dashboard", "false"});
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
sleep(5);
Expand Down
2 changes: 1 addition & 1 deletion dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def _check_parent():
await runner.setup()
site = aiohttp.web.TCPSite(runner, self.ip, 0)
await site.start()
http_host, http_port = site._server.sockets[0].getsockname()
http_host, http_port, *_ = site._server.sockets[0].getsockname()
logger.info("Dashboard agent http address: %s:%s", http_host,
http_port)

Expand Down
1 change: 0 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
CONNECT_REDIS_INTERNAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
ORGANIZE_DATA_INTERVAL_SECONDS = 2
REDIS_KEY_DASHBOARD = "dashboard"
REDIS_KEY_DASHBOARD_RPC = "dashboard_rpc"
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress"
REPORT_METRICS_TIMEOUT_SECONDS = 2
Expand Down
50 changes: 38 additions & 12 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import sys

try:
import aiohttp.web
except ImportError:
print("The dashboard requires aiohttp to run.")
import sys
sys.exit(1)
# Set an exit code different from throwing an exception.
sys.exit(2)

import argparse
import asyncio
Expand All @@ -30,12 +32,16 @@
routes = dashboard_utils.ClassMethodRouteTable


class FrontendNotFoundError(OSError):
pass


def setup_static_dir():
build_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "client", "build")
module_name = os.path.basename(os.path.dirname(__file__))
if not os.path.isdir(build_dir):
raise OSError(
raise FrontendNotFoundError(
errno.ENOENT, "Dashboard build directory not found. If installing "
"from source, please follow the additional steps "
"required to build the dashboard"
Expand All @@ -59,6 +65,7 @@ class Dashboard:
Args:
host(str): Host address of dashboard aiohttp server.
port(int): Port number of dashboard aiohttp server.
port_retries(int): The retry times to select a valid port.
redis_address(str): GCS address of a Ray cluster
redis_password(str): Redis password to access GCS
log_dir(str): Log directory of dashboard.
Expand All @@ -67,19 +74,30 @@ class Dashboard:
def __init__(self,
host,
port,
port_retries,
redis_address,
redis_password=None,
log_dir=None):
self.dashboard_head = dashboard_head.DashboardHead(
http_host=host,
http_port=port,
http_port_retries=port_retries,
redis_address=redis_address,
redis_password=redis_password,
log_dir=log_dir)

# Setup Dashboard Routes
build_dir = setup_static_dir()
logger.info("Setup static dir for dashboard: %s", build_dir)
try:
build_dir = setup_static_dir()
logger.info("Setup static dir for dashboard: %s", build_dir)
except FrontendNotFoundError as ex:
# Not to raise FrontendNotFoundError due to NPM incompatibilities
# with Windows.
# Please refer to ci.sh::build_dashboard_front_end()
if sys.platform in ["win32", "cygwin"]:
logger.warning(ex)
else:
raise ex
dashboard_utils.ClassMethodRouteTable.bind(self)

@routes.get("/")
Expand All @@ -101,9 +119,7 @@ async def run(self):


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Parse Redis server for the "
"dashboard to connect to."))
parser = argparse.ArgumentParser(description="Ray dashboard.")
parser.add_argument(
"--host",
required=True,
Expand All @@ -114,6 +130,12 @@ async def run(self):
required=True,
type=int,
help="The port to use for the HTTP server.")
parser.add_argument(
"--port-retries",
required=False,
type=int,
default=0,
help="The retry times to select a valid port.")
parser.add_argument(
"--redis-address",
required=True,
Expand Down Expand Up @@ -187,11 +209,14 @@ async def run(self):
dashboard = Dashboard(
args.host,
args.port,
args.port_retries,
args.redis_address,
redis_password=args.redis_password,
log_dir=args.log_dir)
service_discovery = PrometheusServiceDiscoveryWriter(
args.redis_address, args.redis_password, args.temp_dir)
# Need daemon True to avoid dashboard hangs at exit.
service_discovery.daemon = True
service_discovery.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(dashboard.run())
Expand All @@ -200,12 +225,13 @@ async def run(self):
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The dashboard on node {} failed with the following "
"error:\n{}".format(platform.uname()[1], traceback_str))
message = f"The dashboard on node {platform.uname()[1]} " \
f"failed with the following " \
f"error:\n{traceback_str}"
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.DASHBOARD_DIED_ERROR, message)
if isinstance(e, OSError) and e.errno == errno.ENOENT:
if isinstance(e, FrontendNotFoundError):
logger.warning(message)
else:
logger.exception(message)
logger.error(message)
raise e
41 changes: 27 additions & 14 deletions dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ray._private.services
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.utils as dashboard_utils
from ray import ray_constants
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_service_pb2_grpc
from ray.new_dashboard.datacenter import DataSource, DataOrganizer
Expand All @@ -28,14 +29,15 @@ def gcs_node_info_to_dict(message):


class DashboardHead:
def __init__(self, http_host, http_port, redis_address, redis_password,
log_dir):
def __init__(self, http_host, http_port, http_port_retries, redis_address,
redis_password, log_dir):
# NodeInfoGcsService
self._gcs_node_info_stub = None
self._gcs_rpc_error_counter = 0
# Public attributes are accessible for all head modules.
self.http_host = http_host
self.http_port = http_port
self.http_port_retries = http_port_retries
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password
self.log_dir = log_dir
Expand All @@ -47,8 +49,6 @@ def __init__(self, http_host, http_port, redis_address, redis_password,
self.grpc_port = self.server.add_insecure_port("[::]:0")
logger.info("Dashboard head grpc address: %s:%s", self.ip,
self.grpc_port)
logger.info("Dashboard head http address: %s:%s", self.http_host,
self.http_port)

async def _get_nodes(self):
"""Read the client table.
Expand Down Expand Up @@ -177,13 +177,6 @@ async def run(self):
# Start a grpc asyncio server.
await self.server.start()

# Write the dashboard head port to redis.
await self.aioredis_client.set(dashboard_consts.REDIS_KEY_DASHBOARD,
self.ip + ":" + str(self.http_port))
await self.aioredis_client.set(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC,
self.ip + ":" + str(self.grpc_port))

async def _async_notify():
"""Notify signals from queue."""
while True:
Expand All @@ -198,8 +191,29 @@ async def _async_notify():
# Http server should be initialized after all modules loaded.
app = aiohttp.web.Application()
app.add_routes(routes=routes.bound_routes())
web_server = aiohttp.web._run_app(
app, host=self.http_host, port=self.http_port)

runner = aiohttp.web.AppRunner(app)
await runner.setup()
for i in range(1 + self.http_port_retries):
try:
site = aiohttp.web.TCPSite(runner, self.http_host,
self.http_port)
await site.start()
break
except OSError as e:
self.http_port += 1
logger.warning("Try to use port %s: %s", self.http_port, e)
else:
raise Exception("Failed to find a valid port for dashboard.")
http_host, http_port, *_ = site._server.sockets[0].getsockname()
logger.info("Dashboard head http address: %s:%s", http_host, http_port)

# Write the dashboard head port to redis.
await self.aioredis_client.set(ray_constants.REDIS_KEY_DASHBOARD,
f"{http_host}:{http_port}")
await self.aioredis_client.set(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC,
f"{self.ip}:{self.grpc_port}")

# Dump registered http routes.
dump_routes = [
Expand All @@ -216,7 +230,6 @@ async def _async_notify():
_async_notify(),
DataOrganizer.purge(),
DataOrganizer.organize(),
web_server,
]
await asyncio.gather(*concurrent_tasks,
*(m.run(self.server) for m in modules))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def check_node_details():
# Workers information should be in the detailed payload
assert "workers" in node
assert "logCount" in node
assert node["logCount"] == 2
# Two lines printed by ActorWithObjs
# One line printed by autoscaler: monitor.py:118 -- Monitor: Started
assert node["logCount"] > 2
print(node["workers"])
assert len(node["workers"]) == 2
assert node["workers"][0]["logCount"] == 1
Expand Down
50 changes: 49 additions & 1 deletion dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import logging
import asyncio
import subprocess
import collections

import numpy as np
Expand All @@ -21,6 +22,7 @@
wait_until_succeeded_without_exception)
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS_LEGACY,
DEBUG_AUTOSCALING_ERROR)
from ray.new_dashboard import dashboard
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.utils as dashboard_utils
import ray.new_dashboard.modules
Expand Down Expand Up @@ -144,7 +146,7 @@ def _search_agent(processes):

# Check redis keys are set.
logger.info("Check redis keys are set.")
dashboard_address = client.get(dashboard_consts.REDIS_KEY_DASHBOARD)
dashboard_address = client.get(ray_constants.REDIS_KEY_DASHBOARD)
assert dashboard_address is not None
dashboard_rpc_address = client.get(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC)
Expand Down Expand Up @@ -604,5 +606,51 @@ def test_http_proxy(enable_test_module, set_http_proxy, shutdown_only):
raise Exception("Timed out while testing.")


def test_dashboard_port_conflict(ray_start_with_dashboard):
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])
is True)
address_info = ray_start_with_dashboard
address = address_info["redis_address"]
address = address.split(":")
assert len(address) == 2

client = redis.StrictRedis(
host=address[0],
port=int(address[1]),
password=ray_constants.REDIS_DEFAULT_PASSWORD)

host, port = address_info["webui_url"].split(":")
temp_dir = "/tmp/ray"
log_dir = "/tmp/ray/session_latest/logs"
dashboard_cmd = [
sys.executable, dashboard.__file__, f"--host={host}", f"--port={port}",
f"--temp-dir={temp_dir}", f"--log-dir={log_dir}",
f"--redis-address={address[0]}:{address[1]}",
f"--redis-password={ray_constants.REDIS_DEFAULT_PASSWORD}"
]
logger.info("The dashboard should be exit: %s", dashboard_cmd)
p = subprocess.Popen(dashboard_cmd)
p.wait(5)

dashboard_cmd.append(f"--port-retries=10")
subprocess.Popen(dashboard_cmd)

timeout_seconds = 10
start_time = time.time()
while True:
time.sleep(1)
try:
dashboard_url = client.get(ray_constants.REDIS_KEY_DASHBOARD)
if dashboard_url:
new_port = int(dashboard_url.split(b":")[-1])
assert new_port > int(port)
break
except AssertionError as e:
logger.info("Retry because of %s", e)
finally:
if time.time() > start_time + timeout_seconds:
raise Exception("Timed out while testing.")


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
Loading