Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Implement timeline and profiling API. #2306

Merged
merged 25 commits into from Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2aeefed
Add profile table and store profiling information there.
robertnishihara Jun 22, 2018
86f4116
Code for dumping timeline.
robertnishihara Jun 22, 2018
c1f21df
Improve color scheme.
robertnishihara Jun 24, 2018
78885b1
Push timeline events on driver only for raylet.
robertnishihara Jun 24, 2018
7a76d97
Improvements to profiling and timeline visualization
robertnishihara Jun 26, 2018
1228516
Some linting
robertnishihara Jun 26, 2018
38c4928
Small fix.
robertnishihara Jun 26, 2018
008fd06
Linting
robertnishihara Jun 26, 2018
748ed71
Propagate node IP address through profiling events.
robertnishihara Jun 27, 2018
c631a83
Fix test.
robertnishihara Jun 27, 2018
dc73eff
object_id.hex() should return byte string in python 2.
robertnishihara Jun 27, 2018
c04c31e
Include gcs.fbs in node_manager.fbs.
robertnishihara Jun 29, 2018
7b9135b
Remove flatbuffer definition duplication.
robertnishihara Jun 29, 2018
eaa1110
Decode to unicode in Python 3 and bytes in Python 2.
robertnishihara Jun 29, 2018
6ec75d6
Minor
robertnishihara Jun 29, 2018
ad50fd9
Submit profile events in a batch. Revert some CMake changes.
robertnishihara Jul 2, 2018
1840bce
Fix
robertnishihara Jul 3, 2018
8e6fde7
Workaround test failure.
robertnishihara Jul 3, 2018
52f435b
Fix linting
robertnishihara Jul 3, 2018
53a3b7d
Linting
robertnishihara Jul 3, 2018
bd1f79b
Don't return anything from chrome_tracing_dump when filename is provi…
robertnishihara Jul 3, 2018
66bba88
Remove some redundancy from profile table.
robertnishihara Jul 4, 2018
daf40f4
Linting
robertnishihara Jul 4, 2018
1ea0b9c
Move TODOs out of docstring.
robertnishihara Jul 4, 2018
be72d8a
Minor
robertnishihara Jul 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions CMakeLists.txt
Expand Up @@ -55,6 +55,17 @@ enable_testing()

include(ThirdpartyToolchain)


# TODO(rkn): Fix all of this. This include is needed for the following
# reason. The local scheduler depends on tables.cc which depends on
# node_manager_generated.h which depends on gcs_generated.h. However,
# the include statement for gcs_generated.h doesn't include the file
# path, so we include the relevant directory here.
set(GCS_FBS_OUTPUT_DIRECTORY
"${CMAKE_CURRENT_LIST_DIR}/src/ray/gcs/format")
include_directories(${GCS_FBS_OUTPUT_DIRECTORY})


include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
include_directories("${CMAKE_CURRENT_LIST_DIR}/src/")
Expand Down
1 change: 1 addition & 0 deletions doc/source/conf.py
Expand Up @@ -48,6 +48,7 @@
"ray.core.generated.GcsTableEntry",
"ray.core.generated.HeartbeatTableData",
"ray.core.generated.ErrorTableData",
"ray.core.generated.ProfileTableData",
"ray.core.generated.ObjectTableData",
"ray.core.generated.ray.protocol.Task",
"ray.core.generated.TablePrefix",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/__init__.py
Expand Up @@ -48,7 +48,7 @@

from ray.local_scheduler import ObjectID, _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect, get, put, wait,
remote, log_event, log_span, flush_log, get_gpu_ids,
remote, profile, flush_profile_data, get_gpu_ids,
get_resource_ids, get_webui_url,
register_custom_serializer) # noqa: E402
from ray.worker import (SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
Expand All @@ -65,7 +65,7 @@

__all__ = [
"error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "log_event", "log_span", "flush_log", "actor", "method",
"remote", "profile", "flush_profile_data", "actor", "method",
"get_gpu_ids", "get_resource_ids", "get_webui_url",
"register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE",
"SILENT_MODE", "global_state", "ObjectID", "_config", "__version__"
Expand Down
7 changes: 4 additions & 3 deletions python/ray/actor.py
Expand Up @@ -14,6 +14,7 @@
import ray.signature as signature
import ray.worker
from ray.utils import (
decode,
_random_string,
check_oversized_pickle,
is_cython,
Expand Down Expand Up @@ -292,10 +293,10 @@ def fetch_and_register_actor(actor_class_key, worker):
"checkpoint_interval", "actor_method_names"
])

class_name = class_name.decode("ascii")
module = module.decode("ascii")
class_name = decode(class_name)
module = decode(module)
checkpoint_interval = int(checkpoint_interval)
actor_method_names = json.loads(actor_method_names.decode("ascii"))
actor_method_names = json.loads(decode(actor_method_names))

# Create a temporary actor with some temporary methods so that if the actor
# fails to be unpickled, the temporary actor can be used (just to produce
Expand Down
245 changes: 202 additions & 43 deletions python/ray/experimental/state.py

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions python/ray/gcs_utils.py
Expand Up @@ -22,6 +22,7 @@
from ray.core.generated.GcsTableEntry import GcsTableEntry
from ray.core.generated.ClientTableData import ClientTableData
from ray.core.generated.ErrorTableData import ErrorTableData
from ray.core.generated.ProfileTableData import ProfileTableData
from ray.core.generated.HeartbeatTableData import HeartbeatTableData
from ray.core.generated.ObjectTableData import ObjectTableData
from ray.core.generated.ray.protocol.Task import Task
Expand All @@ -33,9 +34,9 @@
"SubscribeToNotificationsReply", "ResultTableReply",
"TaskExecutionDependencies", "TaskReply", "DriverTableMessage",
"LocalSchedulerInfoMessage", "SubscribeToDBClientTableReply", "TaskInfo",
"GcsTableEntry", "ClientTableData", "ErrorTableData", "HeartbeatTableData",
"ObjectTableData", "Task", "TablePrefix", "TablePubsub",
"construct_error_message"
"GcsTableEntry", "ClientTableData", "ErrorTableData", "ProfileTableData",
"HeartbeatTableData", "ObjectTableData", "Task", "TablePrefix",
"TablePubsub", "construct_error_message"
]

# These prefixes must be kept up-to-date with the definitions in
Expand All @@ -53,6 +54,7 @@
TablePrefix_RAYLET_TASK_string = "RAYLET_TASK"
TablePrefix_OBJECT_string = "OBJECT"
TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"


def construct_error_message(error_type, message, timestamp):
Expand Down
3 changes: 2 additions & 1 deletion python/ray/log_monitor.py
Expand Up @@ -10,6 +10,7 @@
from ray.services import get_ip_address
from ray.services import get_port
from ray.services import logger
import ray.utils


class LogMonitor(object):
Expand Down Expand Up @@ -70,7 +71,7 @@ def check_log_files_and_push_updates(self):
if len(new_lines) > 0:
self.log_files[log_filename] += new_lines
redis_key = "LOGFILE:{}:{}".format(
self.node_ip_address, log_filename.decode("ascii"))
self.node_ip_address, ray.utils.decode(log_filename))
self.redis_client.rpush(redis_key, *new_lines)

# Pass if we already failed to open the log file.
Expand Down
3 changes: 2 additions & 1 deletion python/ray/scripts/scripts.py
Expand Up @@ -10,6 +10,7 @@
import ray.services as services
from ray.autoscaler.commands import (create_or_update_cluster,
teardown_cluster, get_head_node_ip)
import ray.utils


def check_no_existing_redis_clients(node_ip_address, redis_client):
Expand All @@ -31,7 +32,7 @@ def check_no_existing_redis_clients(node_ip_address, redis_client):
if deleted:
continue

if info[b"node_ip_address"].decode("ascii") == node_ip_address:
if ray.utils.decode(info[b"node_ip_address"]) == node_ip_address:
raise Exception("This Redis instance is already connected to "
"clients with this IP address.")

Expand Down
6 changes: 3 additions & 3 deletions python/ray/services.py
Expand Up @@ -386,7 +386,7 @@ def check_version_info(redis_client):
if redis_reply is None:
return

true_version_info = tuple(json.loads(redis_reply.decode("ascii")))
true_version_info = tuple(json.loads(ray.utils.decode(redis_reply)))
version_info = _compute_version_info()
if version_info != true_version_info:
node_ip_address = ray.services.get_node_ip_address()
Expand Down Expand Up @@ -776,7 +776,7 @@ def start_ui(redis_address, stdout_file=None, stderr_file=None, cleanup=True):
new_env["REDIS_ADDRESS"] = redis_address
# We generate the token used for authentication ourselves to avoid
# querying the jupyter server.
token = binascii.hexlify(os.urandom(24)).decode("ascii")
token = ray.utils.decode(binascii.hexlify(os.urandom(24)))
command = [
"jupyter", "notebook", "--no-browser", "--port={}".format(port),
"--NotebookApp.iopub_data_rate_limit=10000000000",
Expand Down Expand Up @@ -1373,7 +1373,7 @@ def start_ray_processes(address_info=None,
redis_client = redis.StrictRedis(
host=redis_ip_address, port=redis_port)
redis_shards = redis_client.lrange("RedisShards", start=0, end=-1)
redis_shards = [shard.decode("ascii") for shard in redis_shards]
redis_shards = [ray.utils.decode(shard) for shard in redis_shards]
address_info["redis_shards"] = redis_shards

# Start the log monitor, if necessary.
Expand Down
2 changes: 2 additions & 0 deletions python/ray/utils.py
Expand Up @@ -170,6 +170,8 @@ def random_string():

def decode(byte_str):
"""Make this unicode in Python 3, otherwise leave it as bytes."""
if not isinstance(byte_str, bytes):
raise ValueError("The argument must be a bytes object.")
if sys.version_info >= (3, 0):
return byte_str.decode("ascii")
else:
Expand Down