-
Notifications
You must be signed in to change notification settings - Fork 18
Metric Logging updates 5/N - enable streaming #363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
77488cf
feb4771
41ceaa4
8a24e71
3f3bc51
d82c354
4fe2611
8759bc8
fbb4a9e
d81a4ed
1e2255d
a94c612
5b477e8
f2b3eed
471b88a
1a02784
fa4895f
7bb1fe7
43d5d27
c97eb98
75355a2
70e9c67
12f77c9
1186aec
a02ea75
aa00898
7d89f5c
370c4e4
9e77930
93b0cad
84363b1
e3c7a99
77e426b
e901ad5
e42059b
f52408e
6fc11bb
72660f5
69f9f8c
7e74326
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
BackendRole, | ||
get_logger_backend_class, | ||
LoggerBackend, | ||
LoggingMode, | ||
MetricCollector, | ||
reduce_metrics_states, | ||
) | ||
|
@@ -63,9 +64,9 @@ async def get_or_create_metric_logger( | |
mlogger = await get_or_create_metric_logger(process_name="Controller") | ||
|
||
# Initialize logging backends | ||
await mlogger.init_backends.call_one({ | ||
"console": {"reduce_across_ranks": True}, | ||
"wandb": {"project": "my_project", "reduce_across_ranks": False} | ||
await mlogger.init_backends({ | ||
"console": {"logging_mode": "global_reduce"}, | ||
"wandb": {"project": "my_project", "logging_mode": "per_rank_reduce"} | ||
}) | ||
|
||
# Initialize services... | ||
|
@@ -165,20 +166,20 @@ async def flush( | |
@endpoint | ||
async def init_backends( | ||
self, | ||
metadata_per_primary_backend: dict[str, dict[str, Any]], | ||
metadata_per_controller_backend: dict[str, dict[str, Any]], | ||
config: dict[str, Any], | ||
global_step: int = 0, | ||
) -> None: | ||
"""Init per-rank logger backends and MetricCollector. | ||
|
||
Args: | ||
metadata_per_primary_backend (dict[str, dict[str, Any]]): Metadata from primary backends for shared state. | ||
metadata_per_controller_backend (dict[str, dict[str, Any]]): Metadata from controller backends for shared state. | ||
config (dict[str, Any]): Backend configurations with logging modes and settings. | ||
global_step (int): Initial step for metrics. | ||
""" | ||
collector = MetricCollector() | ||
await collector.init_backends( | ||
metadata_per_primary_backend, | ||
metadata_per_controller_backend, | ||
config, | ||
global_step, | ||
process_name=self.process_name, | ||
|
@@ -196,9 +197,10 @@ class GlobalLoggingActor(Actor): | |
Supports multiple logging backends (e.g., WandB, TensorBoard, etc.), | ||
with per-rank and/or global reduction logging modes. | ||
|
||
If a backend config has flag `reduce_across_ranks=False`, an instance of the backend | ||
is initialized per-rank, otherwise it is done once globally. | ||
|
||
This GlobalLoggingActor should be spawned once in the controller. A LocalFetcherActor | ||
is automatically spawned per-rank in `forge.controller.provisioner.py` and registered | ||
with this actor. The LocalFetcherActor is responsible for instantiating | ||
the per-rank MetricCollector and working as a bridge between GlobalLoggingActor and processes. | ||
|
||
Flow: | ||
GlobalLoggingActor.method() -> per-procmesh LocalFetcherActor.method() -> per-rank MetricCollector.method() -> logger | ||
|
@@ -208,57 +210,112 @@ def __init__(self): | |
self.fetchers: dict[str, LocalFetcherActor] = {} | ||
self.config: dict[str, Any] | None = None | ||
self.global_logger_backends: dict[str, LoggerBackend] = {} | ||
self.metadata_per_primary_backend: dict[str, dict[str, Any]] = {} | ||
self.metadata_per_controller_backend: dict[str, dict[str, Any]] = {} | ||
|
||
def _validate_backend_config( | ||
self, backend_name: str, config: dict[str, Any] | ||
) -> dict[str, Any]: | ||
"""Validate and normalize backend configuration.""" | ||
if "logging_mode" not in config: | ||
logger.debug( | ||
f"logging_mode not provided for backend {backend_name}. Defaulting to global_reduce." | ||
) | ||
|
||
mode_str = config.get("logging_mode", "global_reduce") | ||
mode = LoggingMode(mode_str) | ||
|
||
# Validate per_rank_share_run configuration | ||
share_run = config.get("per_rank_share_run", False) | ||
if mode == LoggingMode.GLOBAL_REDUCE and share_run: | ||
logger.warning( | ||
f"{backend_name}: per_rank_share_run=True is ignored in {mode.value} mode. " | ||
"Setting it to False." | ||
) | ||
share_run = False | ||
|
||
# WandB-specific warning for suboptimal configuration | ||
if ( | ||
backend_name == "wandb" | ||
and mode == LoggingMode.PER_RANK_REDUCE | ||
and share_run | ||
): | ||
logger.warning( | ||
"WandB: Using 'per_rank_reduce' with 'per_rank_share_run=True' is not recommended. " | ||
"This configuration can lead to confusing metrics where reduced values from multiple ranks " | ||
"are written to the same run/step, displaying only one of them. Consider either:\n" | ||
" 1. Set 'per_rank_share_run=False' to create separate runs per rank, OR\n" | ||
" 2. Use 'per_rank_no_reduce' for real-time streaming to a shared run" | ||
) | ||
|
||
return { | ||
**config, | ||
"logging_mode": mode, | ||
"per_rank_share_run": share_run, | ||
} | ||
|
||
@endpoint | ||
async def init_backends(self, config: dict[str, Any]) -> None: | ||
""" | ||
Sets config in global actor, so other actors can get it, then eagerly initializes backend and MetricCollectors | ||
"""Sets config in global actor, initializes controller backends and eagerly initializes MetricCollectors | ||
in all registered fetchers. | ||
|
||
A backend is always initialized in the controller (primary backend) and can be used as a logger or as a source | ||
for metadata to be shared with per-rank backends, e.g. shared run IDs for wandb. | ||
|
||
The backend instantiation is controlled by the backend config flag `reduce_across_ranks`: if False, | ||
a per-rank backend is initialized, i.e. if there are 2 ranks, each will have its own backend, | ||
and will log independently, i.e. each rank will have its own run in wandb. | ||
|
||
Else, if True, the GlobalLoggingActor will fetch all local metrics collectors to get their states | ||
and reduce them to a single value, which will be logged by the primary backend in this controller. | ||
The backend instantiation is controlled by the logging_mode field. Controller backends | ||
(instantiated in the controller) can provide metadata to be shared with rank backends, | ||
e.g. shared run IDs for WandB. For details on logging modes, see `forge.observability.metrics.LoggingMode`. | ||
|
||
Args: | ||
config (dict[str, Any]): Config for metric logging where keys are backend names, | ||
e.g. {"console": {"reduce_across_ranks": True}, "wandb": {"reduce_across_ranks": False}} | ||
config (dict[str, Any]): Config for metric logging where keys are backend names. | ||
Each backend config supports: | ||
- logging_mode (str | LoggingMode, default "global_reduce"): One of "global_reduce", | ||
"per_rank_reduce", or "per_rank_no_reduce". Can be specified as a string or LoggingMode enum. | ||
- per_rank_share_run (bool, default False): For per-rank modes only. Whether ranks | ||
share a single run/logger instance. Ignored for "global_reduce" mode. | ||
- Additional backend-specific options (e.g., "project" for WandB) | ||
|
||
Example: | ||
{ | ||
"console": {"logging_mode": "global_reduce"}, | ||
"wandb": { | ||
"logging_mode": "per_rank_no_reduce", | ||
"per_rank_share_run": True, | ||
"project": "my_project", | ||
} | ||
} | ||
|
||
Raises: | ||
ValueError: If backend config is invalid or missing required fields. | ||
""" | ||
self.config = config | ||
self.config = {} | ||
|
||
# Validate and normalize each backend config | ||
if FORGE_DISABLE_METRICS.get_value(): | ||
return | ||
|
||
for backend_name, backend_config in config.items(): | ||
self.config[backend_name] = self._validate_backend_config( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small thing, but i don't love |
||
backend_name, backend_config | ||
) | ||
|
||
# Initialize backends based on logging mode | ||
for backend_name, backend_config in self.config.items(): | ||
mode = backend_config["logging_mode"] | ||
|
||
backend = get_logger_backend_class(backend_name)(backend_config) | ||
await backend.init(role=BackendRole.GLOBAL, name="global_reduce") | ||
|
||
# Extract metadata from primary logger to be shared with secondary loggers | ||
# and store it | ||
reduce_across_ranks = backend_config.get("reduce_across_ranks", True) | ||
if not reduce_across_ranks: | ||
primary_backend_metadata = ( | ||
backend.get_metadata_for_secondary_ranks() or {} | ||
) | ||
self.metadata_per_primary_backend[ | ||
backend_name | ||
] = primary_backend_metadata | ||
# Extract metadata from controller logger to be shared with per-rank loggers | ||
if mode != LoggingMode.GLOBAL_REDUCE: | ||
controller_metadata = backend.get_metadata_for_secondary_ranks() or {} | ||
self.metadata_per_controller_backend[backend_name] = controller_metadata | ||
|
||
# Store global logger backends | ||
if reduce_across_ranks: | ||
# Store global logger backends for later flush | ||
if mode == LoggingMode.GLOBAL_REDUCE: | ||
self.global_logger_backends[backend_name] = backend | ||
|
||
# Eager init collectors on all registered fetchers in parallel, passing primary states and config | ||
# Eager init collectors on all registered fetchers in parallel, passing controller states and config | ||
if self.fetchers: | ||
tasks = [ | ||
fetcher.init_backends.call( | ||
self.metadata_per_primary_backend, self.config | ||
self.metadata_per_controller_backend, self.config | ||
) | ||
for fetcher in self.fetchers.values() | ||
] | ||
|
@@ -278,7 +335,7 @@ async def register_fetcher(self, fetcher: LocalFetcherActor, proc_id: str) -> No | |
if self.config: | ||
logger.debug(f"Initializing new LocalFetcherActor for proc_id={proc_id}") | ||
await fetcher.init_backends.call( | ||
self.metadata_per_primary_backend, self.config | ||
self.metadata_per_controller_backend, self.config | ||
) | ||
|
||
@endpoint | ||
|
@@ -306,19 +363,21 @@ async def flush(self, global_step: int) -> None: | |
config = self.config | ||
if config is None: | ||
logger.warning( | ||
"GlobalLoggingActor flush() called before init_backends(). " | ||
"No backends will be flushed." | ||
"Cannot flush collected metrics. GlobalLoggingActor.flush() called before init_backends()." | ||
" No backends will be flushed. Please call in your main file:\n" | ||
"`mlogger = await get_or_create_metric_logger(process_name='Controller')`\n" | ||
"`await mlogger.init_backends.call_one(logging_config)`\n" | ||
) | ||
return | ||
# if reduce_across_ranks=True, we need to reduce the states from all ranks | ||
# and log with the primary backend | ||
|
||
# Check if need to do reduce and retrieve states from fetchers | ||
requires_reduce = any( | ||
backend_config.get("reduce_across_ranks", True) | ||
backend_config["logging_mode"] == LoggingMode.GLOBAL_REDUCE | ||
for backend_config in config.values() | ||
) | ||
|
||
logger.debug( | ||
f"Global flush for global_step {global_step}: {len(self.fetchers)} fetchers" | ||
f"Global flush for global step {global_step}: {len(self.fetchers)} fetchers" | ||
) | ||
|
||
# Broadcast flush to all fetchers | ||
|
@@ -331,21 +390,25 @@ async def flush(self, global_step: int) -> None: | |
) | ||
|
||
if requires_reduce: | ||
# Handle exceptions and extract values from ValueMesh results | ||
all_local_states = [] | ||
for result in results: | ||
if isinstance(result, BaseException): | ||
logger.warning(f"Flush failed on a fetcher: {result}") | ||
continue | ||
|
||
# result is a generator that outputs a pair [{'gpus': i/N}, {metric_key1: metric_state1, ...}}] | ||
for gpu_info, local_metric_state in result.items(): | ||
if isinstance(local_metric_state, dict): | ||
all_local_states.append(local_metric_state) | ||
else: | ||
logger.warning( | ||
f"Unexpected result from fetcher. {gpu_info=}, {local_metric_state=}" | ||
) | ||
|
||
def extract_values_from_valuemesh(results) -> list[dict[str, Any]]: | ||
all_local_states = [] | ||
for result in results: | ||
if isinstance(result, BaseException): | ||
logger.warning(f"Flush failed on a fetcher: {result}") | ||
continue | ||
|
||
# result is a generator that outputs a pair [{'gpus': i/N}, {metric_key1: metric_state1, ...}}] | ||
for gpu_info, local_metric_state in result.items(): | ||
if isinstance(local_metric_state, dict): | ||
all_local_states.append(local_metric_state) | ||
else: | ||
logger.warning( | ||
f"Unexpected result from fetcher. {gpu_info=}, {local_metric_state=}" | ||
) | ||
return all_local_states | ||
|
||
all_local_states = extract_values_from_valuemesh(results) | ||
|
||
if not all_local_states: | ||
logger.warning(f"No states to reduce for global_step {global_step}") | ||
|
@@ -354,12 +417,9 @@ async def flush(self, global_step: int) -> None: | |
# Reduce metrics from states | ||
reduced_metrics = reduce_metrics_states(all_local_states) | ||
|
||
# Log to each global logger_backend | ||
for ( | ||
logger_backend_name, | ||
logger_backend, | ||
) in self.global_logger_backends.items(): | ||
await logger_backend.log(reduced_metrics, global_step) | ||
# Log to global backends | ||
for backend_name, backend in self.global_logger_backends.items(): | ||
await backend.log_batch(reduced_metrics, global_step) | ||
|
||
@endpoint | ||
def has_fetcher(self, proc_id: str) -> bool: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to duplicate logging_mode across different configs like this? feels like clunky UX to me
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is per backend. You could have scuba logging on streamining mode, console logging global_reduce and wandb logging per rank. If you have a single backend, you define it only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. I would like to consider some refactors here, but acknowledge that it's not really in the scope of this PR