From 31cc8f3690ebbbeffa0cecdfca4712c9cd9dfd0f Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Fri, 10 Oct 2025 15:08:14 -0700 Subject: [PATCH 1/5] add metrics readme --- src/forge/observability/README.md | 295 ++++++++++++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 src/forge/observability/README.md diff --git a/src/forge/observability/README.md b/src/forge/observability/README.md new file mode 100644 index 000000000..ae8f093a4 --- /dev/null +++ b/src/forge/observability/README.md @@ -0,0 +1,295 @@ +# Observability in Forge + +We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. + +## Your Superpowers + +### Call `record_metric` from Anywhere + +Simple to use, with no need to pass dictionaries around. + +Full example: +```python +import asyncio +from forge.observability import get_or_create_metric_logger, record_metric, Reduce + +async def main(): + # Setup logger + mlogger = await get_or_create_metric_logger(process_name="Controller") + await mlogger.init_backends.call_one({"console": {"logging_mode": "global_reduce"}}) + + # Have this in any process + def my_fn(number): + record_metric("my_sum_metric", number, Reduce.SUM) # sum(1,2,3) + record_metric("my_max_metric", number, Reduce.MAX) # max(1,2,3) + record_metric("my_mean_metric", number, Reduce.MEAN) # mean(1,2,3) + + # Accumulate metrics + for number in range(1, 4): # 1, 2, 3 + my_fn(number) + + # Flush + await mlogger.flush.call_one(global_step=0) # Flushes and resets metric accumulators + + # Shutdown when done + await mlogger.shutdown.call_one() + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Output: +```bash +=== [GlobalReduce] - METRICS STEP 0 === +my_sum_metric: 6.0 +my_max_metric: 3.0 +my_mean_metric: 2.0 +``` + +### Track Performance: Timing and Memory + +Use `Tracer` for tracking durations and memory usage. Overhead is minimal, and GPU timing is non-blocking. Set `timer="gpu"` for kernel-level precision. Tracer leverages `record_metric` in the backend. + +```python +from forge.observability.perf_tracker import Tracer +import torch + +# ... Initialize logger (as shown in previous example) + +def my_fn(): + a, b = torch.randn(1000, 1000, device="cuda"), torch.randn( + 1000, 1000, device="cuda" + ) + + tracer = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu") + tracer.start() + for _ in range(3): + torch.mm(a, b) + tracer.step("my_metric_mm_a_b") + tracer.stop() + +# Accumulate metrics +for _ in range(2): + my_fn() + +await mlogger.flush(global_step=0) # Flush and reset +``` + +Output: +```bash +=== [GlobalReduce] - METRICS STEP 0 === +my_cuda_loop/memory_delta_end_start_avg_gb: 0.015 +my_cuda_loop/memory_peak_max_gb: 0.042 +my_cuda_loop/my_metric_mm_a_b/duration_avg_s: 0.031 +my_cuda_loop/my_metric_mm_a_b/duration_max_s: 0.186 +my_cuda_loop/total_duration_avg_s: 0.094 +my_cuda_loop/total_duration_max_s: 0.187 +``` + +For convenience, you can also use `Tracer` as a context manager or decorator: + +```python +from forge.observability.perf_tracker import trace + +with trace(prefix="train_step", track_memory=True, timer="gpu") as t: + t.step("fwd") + loss = model(x) + t.step("bwd") + loss.backward() +``` + +```python +from forge.observability.perf_tracker import trace + +@trace(prefix="fwd_pass", track_memory=False, timer="cpu") +async def reward_fn(x): # Supports both synchronous and asynchronous functions + return 1.0 if x > 0 else 0.0 +``` + +### Logging Modes + +Defined per backend. You have three options: + +- **global_reduce**: N ranks = 1 charts. Ranks accumulate → controller reduces → 1 entry per flush. Ideal for a single aggregated view (e.g., average loss chart). +- **per_rank_reduce**: N ranks = N charts. Each rank reduces locally → log once per rank per flush. Ideal for per-rank performance debugging (e.g., GPU utilization). +- **per_rank_no_reduce**: N ranks = N charts. Values are logged immediately without reduction. Ideal for real-time streams. + + +Consider an example with an actor running on 2 replicas, each with 2 processes, for a total of 4 ranks. We will record the sum of the rank values. For example, rank_0 records 0, and rank_1 records 1. + +```python +import asyncio + +from forge.controller.actor import ForgeActor +from forge.observability import get_or_create_metric_logger, record_metric, Reduce +from monarch.actor import current_rank, endpoint + +# Your distributed actor +class MyActor(ForgeActor): + @endpoint + async def my_fn(self): + rank = current_rank().rank # 0 or 1 per replica + record_metric("my_sum_rank_metric", rank, Reduce.SUM) + +async def main(): + # Setup logger + mlogger = await get_or_create_metric_logger(process_name="Controller") + await mlogger.init_backends.call_one( + {"console": {"logging_mode": "global_reduce"}} # <--- Define logging_mode here + ) + + # Setup actor + service_config = {"procs": 2, "num_replicas": 2, "with_gpus": False} + my_actor = await MyActor.options(**service_config).as_service() + + # Accumulate metrics + for _ in range(2): # 2 steps + await my_actor.my_fn.fanout() + + # Flush + await mlogger.flush.call_one(global_step=0) # Flush and reset + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Output: +```bash +=== [GlobalReduce] - METRICS STEP 0 === +my_sum_rank_metric: 4.0 # (rank_0 + rank_1) * 2 steps * 2 replicas +=============== +``` + +Now, let’s set `"logging_mode": "per_rank_reduce"`: +```bash +=== [MyActor_661W_r0] - METRICS STEP 0 === +my_sum_rank_metric: 0.0 # (rank_0) * 2 steps +=============== +=== [MyActor_661W_r1] - METRICS STEP 0 === +my_sum_rank_metric: 2.0 # (rank_1) * 2 steps +=============== +=== [MyActor_wQ1g_r0] - METRICS STEP 0 === +my_sum_rank_metric: 0.0 # (rank_0) * 2 steps +=============== +=== [MyActor_wQ1g_r1] - METRICS STEP 0 === +my_sum_rank_metric: 2.0 # (rank_1) * 2 steps +=============== +``` + +Finally, with `"logging_mode": "per_rank_no_reduce"` +```bash +[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 +[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 +[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1 +[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1 +[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 +[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 +[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1 +[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1 +``` + +### Using Multiple Backends + +For example, you can log reduced metrics to Weights & Biases while using "per_rank_no_reduce" for debugging logs. We support multiple backends during logger initialization: + +```python +mlogger = await get_or_create_metric_logger(process_name="Controller") +await mlogger.init_backends.call_one({ + "console": {"logging_mode": "per_rank_no_reduce"}, + "wandb": {"logging_mode": "global_reduce"} +}) +``` + +### Adding a New Backend + +Extend `LoggerBackend` for custom logging, such as saving data to JSONL files, sending Slack notifications when a metric hits a threshold, or supporting tools like MLFlow or Grafana. After writing your backend, register it with `forge.observability.metrics.get_logger_backend_class`. + +# TODO: we need a better solution here that doesn't involve commiting to forge +# e.g. register_new_backend_type(my_custom_backend_type) + +```python +class ConsoleBackend(LoggerBackend): + def __init__(self, logger_backend_config: dict[str, Any]) -> None: + super().__init__(logger_backend_config) + + async def init(self, process_name: str | None = None, *args, **kwargs) -> None: + self.process_name = process_name + + async def log_batch(self, metrics: list[Metric], global_step: int, *args, **kwargs) -> None: + # Called on flush + print(self.process_name, metrics) + + def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None: + # Called on `record_metric` if "logging_mode": "per_rank_no_reduce" + print(metric) +``` + +### Adding a New Reduce Type + +Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. By tracking `sum` and `count`, it efficiently supports accurate global reduction. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`. + +# TODO: we need a better solution here that doesn't involve commiting to forge +# e.g. register_new_reduce_type(my_custom_reduce_type) + +```python +class MeanAccumulator(MetricAccumulator): + def __init__(self, reduction: Reduce) -> None: + super().__init__(reduction) + self.sum = 0.0 + self.count = 0 + + def append(self, value: Any) -> None: + # Called after record_metric(key, value, reduce.TYPE) + v = float(value.item() if hasattr(value, "item") else value) + self.sum += v + self.count += 1 + + def get_value(self) -> float: + return self.sum / self.count if self.count > 0 else 0.0 + + def get_state(self) -> dict[str, Any]: + return {"reduction_type": self.reduction_type.value, "sum": self.sum, "count": self.count} + + @classmethod + def get_reduced_value_from_states(cls, states: list[dict[str, Any]]) -> float: + # Useful for global reduce; called before flush + total_sum = sum(s["sum"] for s in states) + total_count = sum(s["count"] for s in states) + return total_sum / total_count if total_count > 0 else 0.0 + + def reset(self) -> None: + self.sum = 0.0 + self.count = 0 +``` + +### Behind the Scenes + +We have two main requirements: +1. Metrics must be accumulated somewhere. +2. Metrics must be collected from all ranks. + +To address #1, we use a `MetricCollector` per process to store state. For example, with 10 ranks, there are 10 `MetricCollector` instances. Within each rank, `MetricCollector` is a singleton, ensuring the same object is returned after the first call. This eliminates the need to pass dictionaries between functions. + +For example, users can simply write: + +```python +def my_fn(): + record_metric(key, value, reduce) # Calls MetricCollector().push(key, value, reduce) +``` + +This is simpler than: + +```python +def my_fn(my_metrics): + my_metrics[key] = value + return my_metrics +``` + +To address #2, we automatically spawn a `LocalFetcherActor` for each process and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which actors to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`. + +In summary: +1. One `GlobalLoggingActor` serves as the controller. +2. For each process, `forge.controller.provisioner.py::get_proc_mesh` spawns a `LocalFetcherActor`, so N ranks = N `LocalFetcherActor` instances. These are registered with the `GlobalLoggingActor`. +3. Each rank has a singleton `MetricCollector`, acting as the local storage for metrics. +4. Calling `record_metric(key, value, reduce_type)` stores metrics locally in the `MetricCollector`. +5. When GlobalLoggingActor.flush() -> all LocalFetcherActor.flush() --> MetricCollector.flush() From 36cbfa36e9a9ee4e4cb73e38ae3b169f3af420a1 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Fri, 10 Oct 2025 15:14:24 -0700 Subject: [PATCH 2/5] formatting --- src/forge/observability/README.md | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/forge/observability/README.md b/src/forge/observability/README.md index ae8f093a4..e150a59d5 100644 --- a/src/forge/observability/README.md +++ b/src/forge/observability/README.md @@ -2,7 +2,7 @@ We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. -## Your Superpowers +## 1. Your Superpowers ### Call `record_metric` from Anywhere @@ -78,12 +78,12 @@ await mlogger.flush(global_step=0) # Flush and reset Output: ```bash === [GlobalReduce] - METRICS STEP 0 === -my_cuda_loop/memory_delta_end_start_avg_gb: 0.015 -my_cuda_loop/memory_peak_max_gb: 0.042 +my_cuda_loop/memory_delta_end_start_avg_gb: 0.015 +my_cuda_loop/memory_peak_max_gb: 0.042 my_cuda_loop/my_metric_mm_a_b/duration_avg_s: 0.031 my_cuda_loop/my_metric_mm_a_b/duration_max_s: 0.186 -my_cuda_loop/total_duration_avg_s: 0.094 -my_cuda_loop/total_duration_max_s: 0.187 +my_cuda_loop/total_duration_avg_s: 0.094 +my_cuda_loop/total_duration_max_s: 0.187 ``` For convenience, you can also use `Tracer` as a context manager or decorator: @@ -101,12 +101,12 @@ with trace(prefix="train_step", track_memory=True, timer="gpu") as t: ```python from forge.observability.perf_tracker import trace -@trace(prefix="fwd_pass", track_memory=False, timer="cpu") +@trace(prefix="my_reward_fn", track_memory=False, timer="cpu") async def reward_fn(x): # Supports both synchronous and asynchronous functions return 1.0 if x > 0 else 0.0 ``` -### Logging Modes +## 2. Logging Modes Defined per backend. You have three options: @@ -188,7 +188,7 @@ Finally, with `"logging_mode": "per_rank_no_reduce"` [1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1 ``` -### Using Multiple Backends +## 3. Using Multiple Backends For example, you can log reduced metrics to Weights & Biases while using "per_rank_no_reduce" for debugging logs. We support multiple backends during logger initialization: @@ -204,8 +204,7 @@ await mlogger.init_backends.call_one({ Extend `LoggerBackend` for custom logging, such as saving data to JSONL files, sending Slack notifications when a metric hits a threshold, or supporting tools like MLFlow or Grafana. After writing your backend, register it with `forge.observability.metrics.get_logger_backend_class`. -# TODO: we need a better solution here that doesn't involve commiting to forge -# e.g. register_new_backend_type(my_custom_backend_type) +TODO: we need a better solution here that doesn't involve commiting to forge, e.g. register_new_backend_type(my_custom_backend_type) ```python class ConsoleBackend(LoggerBackend): @@ -224,12 +223,11 @@ class ConsoleBackend(LoggerBackend): print(metric) ``` -### Adding a New Reduce Type +## 4. Adding a New Reduce Type Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. By tracking `sum` and `count`, it efficiently supports accurate global reduction. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`. -# TODO: we need a better solution here that doesn't involve commiting to forge -# e.g. register_new_reduce_type(my_custom_reduce_type) +TODO: we need a better solution here that doesn't involve commiting to forge, e.g. register_new_reduce_type(my_custom_reduce_type) ```python class MeanAccumulator(MetricAccumulator): @@ -262,7 +260,7 @@ class MeanAccumulator(MetricAccumulator): self.count = 0 ``` -### Behind the Scenes +## 5. Behind the Scenes We have two main requirements: 1. Metrics must be accumulated somewhere. From ec4971afcfd7742757bc4831fd2f49b469922c97 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Fri, 10 Oct 2025 15:34:17 -0700 Subject: [PATCH 3/5] updates --- src/forge/observability/README.md | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/forge/observability/README.md b/src/forge/observability/README.md index e150a59d5..440dd691f 100644 --- a/src/forge/observability/README.md +++ b/src/forge/observability/README.md @@ -4,7 +4,7 @@ We aim to make distributed observability effortless. You can call `record_metric ## 1. Your Superpowers -### Call `record_metric` from Anywhere +### 1.1 Call `record_metric` from Anywhere Simple to use, with no need to pass dictionaries around. @@ -46,7 +46,7 @@ my_max_metric: 3.0 my_mean_metric: 2.0 ``` -### Track Performance: Timing and Memory +### 1.2 Track Performance: Timing and Memory Use `Tracer` for tracking durations and memory usage. Overhead is minimal, and GPU timing is non-blocking. Set `timer="gpu"` for kernel-level precision. Tracer leverages `record_metric` in the backend. @@ -61,12 +61,12 @@ def my_fn(): 1000, 1000, device="cuda" ) - tracer = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu") - tracer.start() + t = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu") + t.start() for _ in range(3): torch.mm(a, b) - tracer.step("my_metric_mm_a_b") - tracer.stop() + t.step("my_metric_mm_a_b") + t.stop() # Accumulate metrics for _ in range(2): @@ -96,21 +96,16 @@ with trace(prefix="train_step", track_memory=True, timer="gpu") as t: loss = model(x) t.step("bwd") loss.backward() -``` - -```python -from forge.observability.perf_tracker import trace @trace(prefix="my_reward_fn", track_memory=False, timer="cpu") async def reward_fn(x): # Supports both synchronous and asynchronous functions return 1.0 if x > 0 else 0.0 ``` - ## 2. Logging Modes Defined per backend. You have three options: -- **global_reduce**: N ranks = 1 charts. Ranks accumulate → controller reduces → 1 entry per flush. Ideal for a single aggregated view (e.g., average loss chart). +- **global_reduce**: N ranks = 1 charts. Ranks accumulate → Controller reduces → 1 entry per flush. Ideal for a single aggregated view (e.g., average loss chart). - **per_rank_reduce**: N ranks = N charts. Each rank reduces locally → log once per rank per flush. Ideal for per-rank performance debugging (e.g., GPU utilization). - **per_rank_no_reduce**: N ranks = N charts. Values are logged immediately without reduction. Ideal for real-time streams. @@ -153,7 +148,7 @@ if __name__ == "__main__": asyncio.run(main()) ``` -Output: +Output : ```bash === [GlobalReduce] - METRICS STEP 0 === my_sum_rank_metric: 4.0 # (rank_0 + rank_1) * 2 steps * 2 replicas @@ -190,7 +185,7 @@ Finally, with `"logging_mode": "per_rank_no_reduce"` ## 3. Using Multiple Backends -For example, you can log reduced metrics to Weights & Biases while using "per_rank_no_reduce" for debugging logs. We support multiple backends during logger initialization: +For example, you can do `global_reduce` with Weights & Biases while using `per_rank_no_reduce` for debugging logs on the console. ```python mlogger = await get_or_create_metric_logger(process_name="Controller") @@ -200,13 +195,14 @@ await mlogger.init_backends.call_one({ }) ``` -### Adding a New Backend +### 3.1 Adding a New Backend Extend `LoggerBackend` for custom logging, such as saving data to JSONL files, sending Slack notifications when a metric hits a threshold, or supporting tools like MLFlow or Grafana. After writing your backend, register it with `forge.observability.metrics.get_logger_backend_class`. TODO: we need a better solution here that doesn't involve commiting to forge, e.g. register_new_backend_type(my_custom_backend_type) ```python +# Example of a custom backend class ConsoleBackend(LoggerBackend): def __init__(self, logger_backend_config: dict[str, Any]) -> None: super().__init__(logger_backend_config) @@ -225,11 +221,12 @@ class ConsoleBackend(LoggerBackend): ## 4. Adding a New Reduce Type -Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. By tracking `sum` and `count`, it efficiently supports accurate global reduction. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`. +Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`. TODO: we need a better solution here that doesn't involve commiting to forge, e.g. register_new_reduce_type(my_custom_reduce_type) ```python +# Example of a custom reduce type class MeanAccumulator(MetricAccumulator): def __init__(self, reduction: Reduce) -> None: super().__init__(reduction) @@ -285,6 +282,10 @@ def my_fn(my_metrics): To address #2, we automatically spawn a `LocalFetcherActor` for each process and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which actors to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`. +So you may ask: "what about the backends"? They live in two places: + a) In each MetricCollector if the backend is marked as per_rank. + b) In the GlobalLoggingActor if the backend is marked as global_reduce. + In summary: 1. One `GlobalLoggingActor` serves as the controller. 2. For each process, `forge.controller.provisioner.py::get_proc_mesh` spawns a `LocalFetcherActor`, so N ranks = N `LocalFetcherActor` instances. These are registered with the `GlobalLoggingActor`. From 0177654e6fe6f3b2bc94f15621bb6a98eee30fb8 Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Fri, 10 Oct 2025 15:38:34 -0700 Subject: [PATCH 4/5] update --- src/forge/observability/README.md | 44 ++++++++++++++----------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/forge/observability/README.md b/src/forge/observability/README.md index 440dd691f..ee1e386af 100644 --- a/src/forge/observability/README.md +++ b/src/forge/observability/README.md @@ -1,15 +1,28 @@ # Observability in Forge -We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. +We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. You can visualize aggregated results globally, per-rank or as a stream. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. ## 1. Your Superpowers ### 1.1 Call `record_metric` from Anywhere -Simple to use, with no need to pass dictionaries around. +Simple to use, with no need to pass dictionaries around. For example, users can simply write: -Full example: ```python +def my_fn(): + record_metric(key, value, reduce) +``` + +Instead of: + +```python +def my_fn(my_metrics): + my_metrics[key] = value + return my_metrics +``` + +```python +#full example import asyncio from forge.observability import get_or_create_metric_logger, record_metric, Reduce @@ -57,9 +70,7 @@ import torch # ... Initialize logger (as shown in previous example) def my_fn(): - a, b = torch.randn(1000, 1000, device="cuda"), torch.randn( - 1000, 1000, device="cuda" - ) + a, b = torch.randn(1000, 1000, device="cuda"), torch.randn(1000, 1000, device="cuda") t = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu") t.start() @@ -98,7 +109,7 @@ with trace(prefix="train_step", track_memory=True, timer="gpu") as t: loss.backward() @trace(prefix="my_reward_fn", track_memory=False, timer="cpu") -async def reward_fn(x): # Supports both synchronous and asynchronous functions +async def reward_fn(x): # Supports both sync/async functions return 1.0 if x > 0 else 0.0 ``` ## 2. Logging Modes @@ -124,7 +135,7 @@ class MyActor(ForgeActor): @endpoint async def my_fn(self): rank = current_rank().rank # 0 or 1 per replica - record_metric("my_sum_rank_metric", rank, Reduce.SUM) + record_metric("my_sum_rank_metric", rank, Reduce.SUM) # <--- your metric async def main(): # Setup logger @@ -148,7 +159,7 @@ if __name__ == "__main__": asyncio.run(main()) ``` -Output : +Output when `"logging_mode": "global_reduce"` ```bash === [GlobalReduce] - METRICS STEP 0 === my_sum_rank_metric: 4.0 # (rank_0 + rank_1) * 2 steps * 2 replicas @@ -265,21 +276,6 @@ We have two main requirements: To address #1, we use a `MetricCollector` per process to store state. For example, with 10 ranks, there are 10 `MetricCollector` instances. Within each rank, `MetricCollector` is a singleton, ensuring the same object is returned after the first call. This eliminates the need to pass dictionaries between functions. -For example, users can simply write: - -```python -def my_fn(): - record_metric(key, value, reduce) # Calls MetricCollector().push(key, value, reduce) -``` - -This is simpler than: - -```python -def my_fn(my_metrics): - my_metrics[key] = value - return my_metrics -``` - To address #2, we automatically spawn a `LocalFetcherActor` for each process and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which actors to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`. So you may ask: "what about the backends"? They live in two places: From df85ebf7f64a67fa88811e383c72b6aa366c1d0e Mon Sep 17 00:00:00 2001 From: Felipe Mello Date: Fri, 10 Oct 2025 15:50:38 -0700 Subject: [PATCH 5/5] updates --- src/forge/observability/README.md | 38 ++++++++++++++++--------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/forge/observability/README.md b/src/forge/observability/README.md index ee1e386af..c5eb8203a 100644 --- a/src/forge/observability/README.md +++ b/src/forge/observability/README.md @@ -1,6 +1,6 @@ # Observability in Forge -We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. You can visualize aggregated results globally, per-rank or as a stream. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. +We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. You can visualize aggregated results globally, per-rank or as a stream. No boilerplate required - just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`. ## 1. Your Superpowers @@ -21,8 +21,8 @@ def my_fn(my_metrics): return my_metrics ``` +Simple example (for a distributed one, check the next section) ```python -#full example import asyncio from forge.observability import get_or_create_metric_logger, record_metric, Reduce @@ -33,7 +33,7 @@ async def main(): # Have this in any process def my_fn(number): - record_metric("my_sum_metric", number, Reduce.SUM) # sum(1,2,3) + record_metric("my_sum_metric", number, Reduce.SUM) # sum(1,2,3) record_metric("my_max_metric", number, Reduce.MAX) # max(1,2,3) record_metric("my_mean_metric", number, Reduce.MEAN) # mean(1,2,3) @@ -42,7 +42,7 @@ async def main(): my_fn(number) # Flush - await mlogger.flush.call_one(global_step=0) # Flushes and resets metric accumulators + await mlogger.flush.call_one(global_step=0) # Shutdown when done await mlogger.shutdown.call_one() @@ -70,13 +70,13 @@ import torch # ... Initialize logger (as shown in previous example) def my_fn(): - a, b = torch.randn(1000, 1000, device="cuda"), torch.randn(1000, 1000, device="cuda") + a = torch.randn(1000, 1000, device="cuda") t = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu") t.start() for _ in range(3): - torch.mm(a, b) - t.step("my_metric_mm_a_b") + torch.mm(a, a) + t.step("my_metric_mm") t.stop() # Accumulate metrics @@ -91,8 +91,8 @@ Output: === [GlobalReduce] - METRICS STEP 0 === my_cuda_loop/memory_delta_end_start_avg_gb: 0.015 my_cuda_loop/memory_peak_max_gb: 0.042 -my_cuda_loop/my_metric_mm_a_b/duration_avg_s: 0.031 -my_cuda_loop/my_metric_mm_a_b/duration_max_s: 0.186 +my_cuda_loop/my_metric_mm/duration_avg_s: 0.031 +my_cuda_loop/my_metric_mm/duration_max_s: 0.186 my_cuda_loop/total_duration_avg_s: 0.094 my_cuda_loop/total_duration_max_s: 0.187 ``` @@ -116,10 +116,9 @@ async def reward_fn(x): # Supports both sync/async functions Defined per backend. You have three options: -- **global_reduce**: N ranks = 1 charts. Ranks accumulate → Controller reduces → 1 entry per flush. Ideal for a single aggregated view (e.g., average loss chart). -- **per_rank_reduce**: N ranks = N charts. Each rank reduces locally → log once per rank per flush. Ideal for per-rank performance debugging (e.g., GPU utilization). -- **per_rank_no_reduce**: N ranks = N charts. Values are logged immediately without reduction. Ideal for real-time streams. - +- **global_reduce**: N ranks = 1 chart. Reduces metrics across all ranks. Ideal for a single aggregated view (e.g., average loss chart). +- **per_rank_reduce**: N ranks = N charts. Each rank reduces locally and logs to its own logger. Ideal for per-rank performance debugging (e.g., GPU utilization). +- **per_rank_no_reduce**: N ranks = N charts. Each rank streams to its own logger without reduction. Ideal for real-time streams. Consider an example with an actor running on 2 replicas, each with 2 processes, for a total of 4 ranks. We will record the sum of the rank values. For example, rank_0 records 0, and rank_1 records 1. @@ -162,18 +161,21 @@ if __name__ == "__main__": Output when `"logging_mode": "global_reduce"` ```bash === [GlobalReduce] - METRICS STEP 0 === -my_sum_rank_metric: 4.0 # (rank_0 + rank_1) * 2 steps * 2 replicas +my_sum_rank_metric: 4.0 # (0 + 1) * 2 steps * 2 replicas =============== ``` Now, let’s set `"logging_mode": "per_rank_reduce"`: ```bash +# replica 1 === [MyActor_661W_r0] - METRICS STEP 0 === my_sum_rank_metric: 0.0 # (rank_0) * 2 steps =============== === [MyActor_661W_r1] - METRICS STEP 0 === my_sum_rank_metric: 2.0 # (rank_1) * 2 steps =============== + +# replica 2 === [MyActor_wQ1g_r0] - METRICS STEP 0 === my_sum_rank_metric: 0.0 # (rank_0) * 2 steps =============== @@ -182,7 +184,7 @@ my_sum_rank_metric: 2.0 # (rank_1) * 2 steps =============== ``` -Finally, with `"logging_mode": "per_rank_no_reduce"` +Finally, with `"logging_mode": "per_rank_no_reduce"`, we have a stream with no reduction: ```bash [0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 [0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0 @@ -279,12 +281,12 @@ To address #1, we use a `MetricCollector` per process to store state. For exampl To address #2, we automatically spawn a `LocalFetcherActor` for each process and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which actors to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`. So you may ask: "what about the backends"? They live in two places: - a) In each MetricCollector if the backend is marked as per_rank. - b) In the GlobalLoggingActor if the backend is marked as global_reduce. +- In each MetricCollector if the backend is marked as per_rank. +- In the GlobalLoggingActor if the backend is marked as global_reduce. In summary: 1. One `GlobalLoggingActor` serves as the controller. 2. For each process, `forge.controller.provisioner.py::get_proc_mesh` spawns a `LocalFetcherActor`, so N ranks = N `LocalFetcherActor` instances. These are registered with the `GlobalLoggingActor`. -3. Each rank has a singleton `MetricCollector`, acting as the local storage for metrics. +3. Each rank has a singleton `MetricCollector`, holding accumulated metrics and per_rank backends. 4. Calling `record_metric(key, value, reduce_type)` stores metrics locally in the `MetricCollector`. 5. When GlobalLoggingActor.flush() -> all LocalFetcherActor.flush() --> MetricCollector.flush()