# Intro to Forge

Forge is a PyTorch-native framework designed for rapid experimentation and large-scale training of Reinforcement Learning (RL) algorithms with Large Language Models (LLMs). It's designed to:
- Express RL algorithms as naturally as psuedocode, while scaling seamlessly across clusters
- Support varying degrees of asynchrony - from fully synchronous/on-policy, to fully asynchronous/off-policy training
- Separate infrastructural concerns from algorithmic implementation
- Bias towards composable, reusable components that can be mixed and matched for different RL approaches

Forge is built on top of proven components:
- **[Monarch](https://github.com/meta-pytorch/monarch)** - PyTorch-native single-controller framework
- **[torchtitan](https://github.com/pytorch/torchtitan)** - PyTorch-native large-scale LLM training platform
- **[vLLM](https://github.com/vllm-project/vllm)** - A high-throughput, memory efficient inference and serving engine for LLMs

Our mission is to accelerate innovation in reinforcement learning by empowering researchers and developers to explore new RL algorithms and infrastructure techniques. Whether you're designing novel training methods or optimizing distributed systems, Forge provides a foundation to build upon.

## Brief Intro to Monarch
Before diving into Forge, we need to first establish the foundation. Forge is built on top of Monarch, PyTorch's native single-controller framework for distributed execution.

Forge builds many of its abstractions on top of Monarch, so it's worth introducing a few of its key concepts first. The following sections borrow from Monarch's Getting Started Guide (not public yet).

### Defining an Actor
At its core, Monarch uses [actors](https://en.wikipedia.org/wiki/Actor_model) as a way to create multi-machine programs. Actors are Python objects that expose a number of endpoint functions. These functions can be called by other actors in the system and their responses gathered asynchronously.

In [5]:
import asyncio
from monarch.actor import Actor, endpoint, this_proc

class Counter(Actor):
    def __init__(self, initial_value: int):
        self.value = initial_value

    @endpoint
    def increment(self) -> None:
        self.value += 1

    @endpoint
    def get_value(self) -> int:
        return self.value


The decorator `@endpoint` specifies functions of the Actor that can be called remotely from other actors.

### Spawning An Actor In The Local Process

We spawn actors in the current running process like so:

In [6]:
counter: Counter = this_proc().spawn("counter", Counter, initial_value=0)

`this_proc()` is a handle to a process, giving us direct control over where an actor runs. Monarch is very literal about where things are run, so that code can be written in the most efficient way. 

### Sending A Simple Message
Once an actor is spawned, we can send messages to the actor:

In [7]:
from monarch.actor import Future

fut: Future[int] = counter.get_value.call_one()

value = await fut

print(f"Counter value: {value}")

Counter value: 0


Here we invoke the `get_value` message, returning 0, the current value of the Counter. `call_one` here is referred to as an "adverb" because it modified how results of the endpoint are handled. `call_one` just invokes a single actor and gets its value.

Notice that the return value is a `Future[int]` - the message is sent asynchronously, letting the sender do other things before it needs the reply. We can `await` on the result.

### Multiple Actors at Once
Monarch scales to thousands of machines because of its ability to broadcast a single message to many actors at once, rather than send many point-to-point messages.

Monarch expresses broadcasted communication by organizing actors into a `Mesh` - a multi-dimensional container with named dimensions. An example cluster may have dimensions `{"hosts": 32, "gpus": 8}`. To create a mesh of actors, we'll create a mesh of processes and spawn an actor on them:

In [9]:
from monarch.actor import ProcMesh, this_host

procs: ProcMesh = this_host().spawn_procs(per_host={"gpus": 8})
counters: Counter = procs.spawn("counters", Counter, 0)

### Broadcasting Messages
Now messages can be sent to all actors in the mesh:

In [10]:
await counters.increment.call()

ValueMesh({gpus=8})

Note that here, we use the `call()` adverb. You will see other adverbs in Monarch code as well:
- `call_one()` - invoke a single actor and get its value (what we saw before)
- `choose()` - randomly invoke a single actor and gets its value from within a mesh of actors
- `call()` - invoke all actors in an actor mesh, and return its values as a `ValueMesh` 
- `broadcast()` - fire-and-forget all actors in an actor mesh
- `stream()` - invoke all actors and return its values as an iterator

There's much more to cover with Monarch, but these foundations provide the building blocks needed to understand how Forge creates its RL-specific services on top of this distributed actor system.

## Forge Services
Forge introduces *Services* - a higher-level abstraction built on top of Monarch actors. Services handle all the operational complexity of managing distributed ActorMeshes: spawning actors across nodes, fault tolerance, load balancing, and intelligent routing.

### Creating a Forge Service
Creating a Forge service requires minimal changes to actors like we've created above. You replace your Actor base with a ForgeActor, and change how you spawn the actor:

In [20]:
from forge.controller import ForgeActor
from forge.controller.service import ServiceConfig, spawn_service

class ForgeCounter(ForgeActor):
    def __init__(self, initial_value: int):
        self.value = initial_value

    @endpoint
    def increment(self) -> int:
        self.value += 1
        return self.value

    @endpoint
    def get_value(self) -> int:
        return self.value

    @endpoint
    async def reset():
        self.value = 0

    @endpoint
    def fail():
        raise ValueError()


counter_service = await spawn_service(
    ServiceConfig(procs_per_replica=1, num_replicas=4),
    ForgeCounter,
    initial_value=0)

[0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[36m>>> Aggregated Logs (2025-08-31 18:25:40) >>>[0m
[33m[1 similar log lines][0m MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[36m<<< Aggregated Logs (2025-08-31 18:25:43) <<<[0m

[0] [0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[36m>>> Aggregated Logs (2025-08-31 18:25:43) >>>[0m
[33m[4 similar log lines][0m [0] MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[36m<<< Aggregated Logs (2025-08-31 18:25:46) <<<[0m



Here, we've created a simple "Counter service" with 4 replicas, each running on 1 process.

### Service Adverbs: Operating at the Replica Level
Services introduce new adverbs that operate at the replica level, not individual actors. Since replicas can be spawned across multiple processes, each replica is essentially an ActorMesh in Monarch terms:

In [21]:
# choose() - routes to one replica (load balanced, and which may contain multiple actors)
await counter_service.increment.choose()

# call() - runs on ALL replicas
results = await counter_service.increment.call()

print(results)

[1, 2, 1, 1]


[0] [36m>>> Aggregated Logs (2025-08-31 18:25:44) >>>[0m
[0] [33m[1 similar log lines][0m MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [36m<<< Aggregated Logs (2025-08-31 18:25:47) <<<[0m
[0] 
[0] [36m>>> Aggregated Logs (2025-08-31 18:25:45) >>>[0m
[0] [33m[1 similar log lines][0m MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [36m<<< Aggregated Logs (2025-08-31 18:25:48) <<<[0m
[0] 
[0] [36m>>> Aggregated Logs (2025-08-31 18:25:45) >>>[0m
[0] [33m[1 similar log lines][0m MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [36m<<< Aggregated Logs (2025-08-31 18:25:48) <<<[0m
[0] 
[0] [36m>>> Aggregated Logs (2025-08-31 18:25:45) >>>[0m
[0] [33m[1 similar log lines][0m MAST is not supported on this platform. You can ignore this if you do not work at Meta.
[0] [36m<<< Aggregated Logs (2025-08-31 18:25:48) <<<[0m
[0] 
[36m>>> Aggrega

Key distinction:
- Monarch's `choose()` picks a single actor from an `ActorMesh`
- Forge's `choose()` picks a single replica (which could be an entire `ActorMesh` of actors)

This abstraction lets you think in terms of logical compute units (replicas) rather than individual processes.

### Load Balancing in Action
Services handle load balancing:

In [22]:
await counter_service.reset.call()
print("Increment on different replicas:")
for i in range(8):
    result = await counter_service.increment.choose()
    print(f"Call {i}: Counter value = {result}")

Increment on different replicas:


[0] Request to replica 0 failed during broadcast: too many positional arguments
[0] Request to replica 1 failed during broadcast: too many positional arguments
[0] Request to replica 2 failed during broadcast: too many positional arguments
[0] Request to replica 3 failed during broadcast: too many positional arguments
[0] CRITICAL:root:Unhandled exception in actor endpoint
[0] Traceback (most recent call last):
[0]   File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/monarch/_src/actor/actor_mesh.py", line 827, in instrumented
[0]     result = await the_method(*args, **kwargs)
[0]   File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 129, in call
[0]     return await self._call(sess_id, function, *args, **kwargs)
[0]   File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 161, in _call
[0]     replica = await self._get_replica(sess_id

ActorError: A remote actor call has failed.
 Traceback of where the remote call failed (most recent call last):
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/monarch/_src/actor/actor_mesh.py", line 837, in handle
    result = await instrumented()
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/monarch/_src/actor/actor_mesh.py", line 834, in instrumented
    raise e
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/monarch/_src/actor/actor_mesh.py", line 827, in instrumented
    result = await the_method(*args, **kwargs)
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 129, in call
    return await self._call(sess_id, function, *args, **kwargs)
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 161, in _call
    replica = await self._get_replica(sess_id)
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 495, in _get_replica
    replica = self._get_next_replica()
  File "/home/allencwang/.conda/envs/forge_test/lib/python3.10/site-packages/forge/controller/service/service.py", line 476, in _get_next_replica
    raise RuntimeError("No healthy replicas available for load balancing")
RuntimeError: No healthy replicas available for load balancing


[0] DEBUG:forge.controller.service.service:Recovering replicas: [Replica(idx=0, state=RECOVERING, active=0/10, queue=0),
[0]  Replica(idx=1, state=RECOVERING, active=0/10, queue=0),
[0]  Replica(idx=2, state=RECOVERING, active=0/10, queue=0),
[0]  Replica(idx=3, state=RECOVERING, active=0/10, queue=0)]
[0] DEBUG:forge.controller.service.replica:Starting recovery for replica 0
[0] DEBUG:forge.controller.service.replica:Starting recovery for replica 1
[0] DEBUG:forge.controller.service.replica:Starting recovery for replica 2
[0] DEBUG:forge.controller.service.replica:Starting recovery for replica 3
[0] DEBUG:forge.controller.service.replica:Replica 0 stopped processing
[0] DEBUG:forge.controller.service.replica:Replica 1 stopped processing
[0] DEBUG:forge.controller.service.replica:Replica 2 stopped processing
[0] DEBUG:forge.controller.service.replica:Replica 3 stopped processing
[0] DEBUG:forge.controller.service.replica:Old proc_mesh stopped for replica 2
[0] DEBUG:forge.controller.se

Notes, things I want to show:
1. New adverbs, you can do `choose()` which runs on a **replica**, not an individual actor and you can run `call()` which runs on **all replicas** (this is subject to change as I acknowledge it's confusing to re-use the adverbs from base Monarch)
3. When you do `choose()` normally, it will do a round robin load balancing
4. You can run with sticky sessions, with the context manager:
```
with counter_service.session:
    await counter_service.increment.choose()
```
which lets you 
5. Whenever the service encounters an actor failure, it will mark it as unhealthy and recover it automatically


In [2]:
from forge.actors.policy import Policy, PolicyConfig, SamplingOverrides, WorkerConfig
from forge.actors.replay_buffer import ReplayBuffer
from forge.controller.actor import ForgeActor
from forge.controller.service import ServiceConfig, shutdown_service, spawn_service
from forge.data.rewards import MathReward, ThinkingReward

from apps.grpo.main import 