Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 12 additions & 28 deletions docs/developer/ray-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

When `roar run` executes a Ray workload, the driver process patches Ray startup/shutdown and injects worker instrumentation through Ray `runtime_env`. That worker instrumentation emits per-task I/O fragments (local file + S3 touches with task metadata) to a detached in-cluster collector actor.

At shutdown, the driver gathers collected fragments/events, merges optional proxy/node-agent logs, deduplicates artifacts, reconstructs task ordering from read/write dependencies, and writes Ray task lineage into `ROAR_PROJECT_DIR/.roar/roar.db`.
At shutdown, the driver flushes the collector actor's fragment streamer. After `ray job submit` returns, `FragmentReconstituter` fetches/decrypts fragment batches from GLaaS and writes Ray task lineage into `ROAR_PROJECT_DIR/.roar/roar.db`.

## 2. Architecture overview

Expand Down Expand Up @@ -37,12 +37,9 @@ flowchart TD

- Patching gate: `tracking_import` patches Ray only when `ROAR_WRAP=1` and `ray` is imported.
- `ray.init` patch (`_patch_ray_init`):
- Loads `[ray]` config (`enabled`, `log_dir`, explicit `pip_install`).
- Loads `[ray]` config (`enabled`, explicit `pip_install`).
- Builds `runtime_env` and `env_vars`.
- Injects worker env vars:
- `ROAR_WORKER=1`
- `ROAR_LOG_DIR=<configured ray.log_dir or ROAR_LOG_DIR>`
- `ROAR_LOG_BACKEND=actor`
- `ROAR_JOB_ID=<generated or supplied>`
- `ROAR_DRIVER_JOB_UID=<driver ROAR_JOB_ID>`
- selected AWS vars passed through when present.
Expand All @@ -57,8 +54,8 @@ flowchart TD
- Sets `runtime_env["worker_process_setup_hook"] = "roar.ray.roar_worker._startup"` and mirrors it via internal env var.
- Sanitizes reserved setup-hook env key for Ray versions that reject manual export (`_sanitize_worker_runtime_env_for_ray`).
- After real `ray.init`, registers pre-shutdown collection and ensures `RoarLogCollectorActor` exists.
- `ray.shutdown` patch (`_patch_ray_shutdown`): collects Ray I/O first (`_collect_ray_io`), then calls real shutdown.
- `_collect_ray_io`: calls `roar.ray.collector.collect(project_dir, log_dir, proxy_logs)` when `ROAR_WRAP=1`.
- `ray.shutdown` patch (`_patch_ray_shutdown`): flushes collector actor fragments first (`_collect_ray_io`), then calls real shutdown.
- `_collect_ray_io`: looks up `roar-log-collector-<job_id>`, calls `flush_to_glaas()`, then kills the detached actor.

### b. `roar-worker` entrypoint (`roar/ray/roar_worker.py`)

Expand All @@ -76,30 +73,22 @@ flowchart TD
- Fragment emission:
- `TaskFragment` includes Ray IDs, function name, timing, exit code, and `reads`/`writes` of `ArtifactRef`.
- Local write hashing is streaming (`blake3` if installed, otherwise `sha256`) via `_TrackedWriteFile`.
- Local path capture is restricted to `/shared/...` (`_should_track_local_path`).
- Local path capture excludes pseudo-filesystems (`/proc`, `/sys`, `/dev`).
- S3 refs use `hash_algorithm="etag"` and size where available.
- `_emit_fragment()` sends snapshots to `RoarLogCollectorActor.append_fragment.remote(fragment.to_dict())`.

### c. `RoarLogCollectorActor` (`roar/ray/actor.py`)

- Detached, named actor (`roar-log-collector-<ROAR_JOB_ID>`, namespace `roar`).
- Aggregation point for worker payloads:
- `append_fragment` / `get_all_fragments` for fragment snapshots.
- `append_batch` / `get_all` for event batches.
- Thin pass-through to `GlaasFragmentStreamer`:
- `append_fragment` forwards fragments into the encrypted GLaaS batch stream.
- `flush_to_glaas` flushes any buffered batches.

### d. `collector.py`

- Driver-side shutdown collector.
- Collection order:
- `_collect_actor_payload()` first (events + fragments from named actor).
- If only fragments are present, can synthesize events (`_events_from_fragments`) and/or write fragments directly (`collect_fragments`).
- Falls back to filesystem logs (`*.jsonl` under `ROAR_LOG_DIR`) when actor data is unavailable.
- Merges optional node proxy logs (`_merge_proxy_logs`).
- Dedup/normalization:
- Event path rollup (`_aggregate_paths`) deduplicates by path and tracks read/write direction.
- Capture method preference is `python < proxy < tracer`.
- Keeps max observed size and best available hash.
- Fragment artifact upsert prefers `artifact_hashes (algorithm,digest)`; otherwise latest artifact by path.
- Fragments-only merge path.
- `collect_fragments(...)` inserts Ray task child jobs + artifact inputs/outputs from decrypted fragment batches.
- Artifact identity prefers `artifact_hashes (algorithm,digest)` and falls back to latest artifact by path when no digest exists.
- Step-number topology (`_assign_step_numbers`):
- Collapses incremental snapshots by `job_uid`.
- Builds DAG from artifact hash dependencies (producer writes hash, consumer reads hash).
Expand All @@ -115,7 +104,7 @@ flowchart TD

### f. `worker.py`

- Complementary worker setup hook (`setup()`) for event-style logging backends (`actor` or `filesystem`).
- Legacy compatibility worker setup hook (`setup()`) that forwards event payloads to the collector actor only.
- Extends coverage beyond `open()` with optional SDK/data patches:
- boto3 S3 ops
- pandas parquet writes
Expand Down Expand Up @@ -165,18 +154,13 @@ sequenceDiagram
| `ROAR_WRAP` | env var | driver (`roar run`) | Enables Ray monkey-patching in `sitecustomize.py` when set to `1`. |
| `ROAR_JOB_ID` | env var | driver + worker env | Ray integration job ID; used in actor naming and task UID derivation. |
| `ROAR_PROJECT_DIR` | env var | driver | Determines where collector writes (`<project>/.roar/roar.db`) and config lookup start dir. |
| `ROAR_LOG_DIR` | env var | driver + worker env | Log directory for filesystem fallback; default `/shared/.roar-logs`. |
| `ROAR_LOG_BACKEND` | env var | worker env | Backend hint (`actor`/`filesystem`); Ray patch sets `actor` for workers. |
| `ROAR_WORKER` | env var | worker env | Marker that process is a Ray worker under roar instrumentation. |
| `ROAR_DRIVER_JOB_UID` | env var | worker env | Parent driver job UID stored in Ray task fragments/jobs. |
| `ray.enabled` | `roar.toml` | `[ray]` | Turns Ray runtime_env injection on/off. |
| `ray.log_dir` | `roar.toml` | `[ray]` | Default worker log directory for Ray collection fallback. |
| `ray.pip_install` | `roar.toml` | `[ray]` | When explicitly enabled, injects current `roar` requirement into `runtime_env.pip`. |
| `ray.actor_attribution` | `roar.toml` | `[ray]` | Fragment boundary mode in worker: `per_call` or `per_actor`. |

## 6. Known limitations / caveats

- `roar_worker` local file capture is limited to `/shared/...` paths.
- Local read events from `open()` do not include content hashes by default.
- S3 hash identity uses ETag; multipart/object semantics can make ETag differ from full-content digest.
- Fragment emission to actor is best-effort; failures are intentionally swallowed to avoid breaking user workloads.
Expand Down
5 changes: 0 additions & 5 deletions docs/end-user/ray-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ User-facing Ray options in `[ray]`:

- `ray.enabled`
- Turn Ray tracing on or off.
- `ray.log_dir`
- Set the worker log directory used for fallback collection.
- `ray.actor_attribution`
- `per_call` (default): attribute by actor method call.
- `per_actor`: group attribution by actor.
Expand All @@ -64,8 +62,6 @@ Helpful environment variables:
- Enables Ray wrapping (normally set automatically by `roar run`).
- `ROAR_PROJECT_DIR`
- Controls where `.roar/roar.db` is created/read.
- `ROAR_LOG_DIR`
- Overrides worker log directory.

## 7. Viewing results

Expand Down Expand Up @@ -98,7 +94,6 @@ ORDER BY j.step_number, j.timestamp, io.kind;

## 8. Known limitations

- Local file capture is strongest for worker-visible shared paths (commonly `/shared`).
- Some read events may not include full content hashes.
- S3 identity is ETag-based, which is not always a full-content digest.
- If cluster/runtime policies block required `runtime_env` behavior, tracing may be partial.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "roar-cli"
version = "0.2.8"
version = "0.2.9"
description = "Reproducibility and provenance tracker for ML training pipelines"
authors = [
{ name="TReqs Team", email="info@treqs.ai" }
Expand Down Expand Up @@ -81,6 +81,7 @@ include = [
{ path = "LICENSE", format = "wheel" },
{ path = "roar/bin/*", format = "sdist" },
{ path = "roar/bin/*", format = "wheel" },
{ path = "roar_inject.pth", format = "wheel" },
]

[tool.pytest.ini_options]
Expand All @@ -97,6 +98,8 @@ markers = [
"cloud: Tests for cloud storage operations",
"happy_path: Happy path tests for core functionality",
"ray_e2e: Ray end-to-end tests requiring a running Docker cluster",
"ray_contract: User-facing Ray contract tests using `roar run ray job submit ...`",
"ray_diagnostic: Diagnostic Ray tests that intentionally inspect internal runtime details",
]
addopts = "-v --strict-markers -n auto --dist loadfile --ignore=tests/ebpf --ignore=tests/live_glaas --ignore=tests/benchmarks --ignore=tests/integration --ignore=tests/e2e"
timeout = 60
Expand Down
Binary file removed roar/bin/libroar_tracer_preload.so
Binary file not shown.
2 changes: 1 addition & 1 deletion roar/cli/commands/_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def validate_git_clean() -> str:
except (subprocess.CalledProcessError, FileNotFoundError):
# Ray jobs run from extracted working dirs that are not git repos.
# Allow execution there while preserving git checks everywhere else.
if "RAY_JOB_CONFIG_JSON_ENV_VAR" in os.environ:
if "RAY_JOB_ID" in os.environ:
return cwd
raise click.ClickException(
"roar requires the working directory to be inside a git repository."
Expand Down
135 changes: 102 additions & 33 deletions roar/cli/commands/_ray_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
from ...glaas_client import GlaasClient
from ...ray.fragment_key import generate_fragment_key, save_key

_ROAR_WORKER_PY_EXECUTABLE = "roar-worker"
_ROAR_WORKER_SETUP_HOOK = "roar.ray.roar_worker._startup"
_ROAR_DRIVER_ENTRYPOINT_MODULE = "roar.ray.driver_entrypoint"
_ROAR_JOB_INSTRUMENTED_ENV_VAR = "ROAR_JOB_INSTRUMENTED"
_ROAR_CLUSTER_GLAAS_URL_ENV = "ROAR_CLUSTER_GLAAS_URL"
_ROAR_CLUSTER_AWS_ENDPOINT_URL_ENV = "ROAR_CLUSTER_AWS_ENDPOINT_URL"


@dataclass(frozen=True)
class RayJobSubmitRewrite:
Expand All @@ -30,6 +37,7 @@ def maybe_rewrite_ray_job_submit(command: list[str]) -> RayJobSubmitRewrite:
entrypoint = list(command[separator_index + 1 :])
if not entrypoint:
return RayJobSubmitRewrite(command=command)
entrypoint = _wrap_entrypoint_for_driver_proxy(entrypoint)

runtime_env_json_arg = _find_runtime_env_json(before_separator)
runtime_env = _load_runtime_env(before_separator, runtime_env_json_arg)
Expand All @@ -38,16 +46,54 @@ def maybe_rewrite_ray_job_submit(command: list[str]) -> RayJobSubmitRewrite:
return RayJobSubmitRewrite(command=command)

merged_pip = _merge_roar_runtime_env_pip(runtime_env.get("pip"))
if merged_pip or ("pip" in runtime_env and merged_pip is not None):
if merged_pip:
runtime_env["pip"] = merged_pip
else:
runtime_env.pop("pip", None)

# py_executable is intentionally NOT set at job level — it would apply to the
# JobSupervisor/driver process which doesn't have roar installed yet (pip runs after
# the supervisor starts). worker_process_setup_hook is sufficient: it runs inside
# each worker process after the runtime env (and pip) is ready.
runtime_env["worker_process_setup_hook"] = _ROAR_WORKER_SETUP_HOOK

env_vars = dict(runtime_env.get("env_vars", {}) or {})
env_vars[_ROAR_JOB_INSTRUMENTED_ENV_VAR] = "1"
env_vars["ROAR_WRAP"] = "1"
env_vars["ROAR_RAY_NODE_AGENTS"] = "1"
# Stable job_id shared by driver + workers for node agent name resolution.
import uuid as _uuid

env_vars["ROAR_JOB_ID"] = _uuid.uuid4().hex[:8]

# Tell the job driver where roar.db lives (CWD inside the Ray job is the
# extracted working_dir, not the original project directory).
roar_dir = os.environ.get("ROAR_PROJECT_DIR", "")
if not roar_dir:
# Use CWD — `roar run` is invoked from the project root.
roar_dir = os.getcwd()
if roar_dir and os.path.isfile(os.path.join(roar_dir, ".roar", "roar.db")):
env_vars["ROAR_PROJECT_DIR"] = roar_dir

# Route S3 traffic through the per-node proxy (port 19191) for I/O capture.
# Save the REAL upstream endpoint (not the roar-run local proxy) so the
# cluster-side proxy can forward to the actual S3 service.
original_endpoint = (
os.environ.get("ROAR_UPSTREAM_S3_ENDPOINT") or os.environ.get("AWS_ENDPOINT_URL") or ""
)
cluster_upstream_endpoint = _resolve_cluster_upstream_s3_endpoint(original_endpoint)
if cluster_upstream_endpoint:
env_vars["ROAR_UPSTREAM_S3_ENDPOINT"] = cluster_upstream_endpoint
env_vars["ROAR_PROXY_PORT"] = "19191"
env_vars["AWS_ENDPOINT_URL"] = "http://127.0.0.1:19191"

fragment_session_id: str | None = None

glaas_url = _resolve_glaas_url()
if glaas_url:
env_vars["GLAAS_URL"] = glaas_url
env_vars["GLAAS_API_URL"] = glaas_url
cluster_glaas_url = _resolve_cluster_glaas_url(glaas_url)
if cluster_glaas_url:
env_vars["GLAAS_URL"] = cluster_glaas_url

key = generate_fragment_key()
try:
Expand All @@ -67,7 +113,6 @@ def maybe_rewrite_ray_job_submit(command: list[str]) -> RayJobSubmitRewrite:
runtime_env.pop("env_vars", None)

before_separator = _store_runtime_env(before_separator, runtime_env, runtime_env_json_arg)
entrypoint = _wrap_entrypoint(entrypoint)
return RayJobSubmitRewrite(
command=[*before_separator, "--", *entrypoint],
session_id=fragment_session_id,
Expand All @@ -84,6 +129,17 @@ def _is_ray_job_submit(command: list[str]) -> bool:
return binary == "ray" and noun in {"job", "jobs"} and verb == "submit"


def _wrap_entrypoint_for_driver_proxy(entrypoint: list[str]) -> list[str]:
if (
len(entrypoint) >= 3
and entrypoint[1] == "-m"
and entrypoint[2] == _ROAR_DRIVER_ENTRYPOINT_MODULE
):
return entrypoint

return ["python", "-m", _ROAR_DRIVER_ENTRYPOINT_MODULE, "--", *entrypoint]


def _find_runtime_env_json(command: list[str]) -> tuple[int, int | None] | None:
for index, arg in enumerate(command):
if arg == "--runtime-env-json":
Expand Down Expand Up @@ -137,27 +193,23 @@ def _store_runtime_env(
return command_out


def _wrap_entrypoint(entrypoint: list[str]) -> list[str]:
if len(entrypoint) >= 2 and Path(entrypoint[0]).name == "roar" and entrypoint[1] == "run":
return entrypoint
return ["roar", "run", *entrypoint]


def _merge_roar_runtime_env_pip(existing_pip: object) -> list[str] | None:
roar_req = _resolve_roar_requirement()
if roar_req is None:
# Local dev mode: vendor wheel present means cluster has roar pre-installed.
# Skip pip injection — preserve existing pip list unchanged.
existing = _coerce_runtime_env_pip(existing_pip)
return existing if existing else None
dependencies = _coerce_runtime_env_pip(existing_pip)
dependencies = [
dependency
for dependency in dependencies
if _requirement_name(dependency) not in {"roar-cli", "roar"}
# Also deduplicate URL-based requirements (e.g. presigned S3 URLs).
# _requirement_name() returns the full URL for these, so the name-based
# filter above never matches them — without this check the URL would
# survive the filter and get appended again, producing duplicates.
and dependency.strip() != roar_req.strip()
]
dependencies.append(roar_req)
return dependencies
# "skip" means roar is already installed on workers (e.g. Docker image with editable install).
if roar_req != "skip":
dependencies.append(roar_req)
return dependencies if dependencies else None


def _coerce_runtime_env_pip(value: object) -> list[str]:
Expand All @@ -183,24 +235,23 @@ def _requirement_name(requirement: str) -> str:
return text.strip().lower()


def _resolve_roar_requirement() -> str | None:
import os
def _resolve_roar_requirement() -> str:
import importlib.metadata as importlib_metadata

wheel_path = Path(os.getcwd()) / "vendor" / "roar-cli.whl"
if wheel_path.exists():
# Local dev mode: vendor wheel exists, cluster has roar pre-installed.
# Signal to skip pip injection entirely.
return None
# Allow overriding the pip requirement — useful for testing unreleased wheels via S3 URL
# without a PyPI publish. Set ROAR_CLUSTER_PIP_REQ=https://... in the environment.
override = os.environ.get("ROAR_CLUSTER_PIP_REQ", "").strip()
if override:
return override

import importlib.metadata as importlib_metadata
try:
version = importlib_metadata.version("roar-cli")
return f"roar-cli=={version}"
except importlib_metadata.PackageNotFoundError:
pass
except Exception:
pass

for package_name in ("roar-cli", "roar"):
try:
return f"{package_name}=={importlib_metadata.version(package_name)}"
except importlib_metadata.PackageNotFoundError:
continue
except Exception:
break
return "roar-cli"


Expand All @@ -209,10 +260,28 @@ def _resolve_glaas_url() -> str | None:

url = get_glaas_url()
if not url:
return None
return "https://api.glaas.ai"
return str(url)


def _resolve_cluster_glaas_url(host_glaas_url: str | None) -> str | None:
override = os.environ.get(_ROAR_CLUSTER_GLAAS_URL_ENV, "").strip()
if override:
return override
if not host_glaas_url:
return None
return str(host_glaas_url)


def _resolve_cluster_upstream_s3_endpoint(host_endpoint: str | None) -> str | None:
override = os.environ.get(_ROAR_CLUSTER_AWS_ENDPOINT_URL_ENV, "").strip()
if override:
return override
if not host_endpoint:
return None
return str(host_endpoint)


def _register_fragment_session(
glaas_url: str,
session_id: str,
Expand Down
2 changes: 0 additions & 2 deletions roar/cli/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@
enabled = true
# Inject roar-cli into runtime_env.pip for remote workers
pip_install = true
# Shared log directory for Ray worker I/O capture
log_dir = "/shared/.roar-logs"
# Actor attribution mode for Ray actor methods (per_call | per_actor)
actor_attribution = "per_call"

Expand Down
Loading