# Intro to Forge

Forge is a PyTorch-native framework designed for rapid experimentation and large-scale training of Reinforcement Learning (RL) algorithms with Large Language Models (LLMs). It's designed to:
- Express RL algorithms as naturally as psuedocode, while scaling seamlessly across clusters
- Support varying degrees of asynchrony - from fully synchronous/on-policy, to fully asynchronous/off-policy training
- Separate infrastructural concerns from algorithmic implementation
- Bias towards composable, reusable components that can be mixed and matched for different RL approaches

Forge is built on top of proven components:
- **[Monarch](https://github.com/meta-pytorch/monarch)** - PyTorch-native single-controller framework
- **[torchtitan](https://github.com/pytorch/torchtitan)** - PyTorch-native large-scale LLM training platform
- **[vLLM](https://github.com/vllm-project/vllm)** - A high-throughput, memory efficient inference and serving engine for LLMs

Our mission is to accelerate innovation in reinforcement learning by empowering researchers and developers to explore new RL algorithms and infrastructure techniques. Whether you're designing novel training methods or optimizing distributed systems, Forge provides a foundation to build upon.

## Brief Intro to Monarch
Before diving into Forge, we need to first establish the foundation. Forge is built on top of Monarch, PyTorch's native single-controller framework for distributed execution.

Forge builds many of its abstractions on top of Monarch, so it's worth introducing a few of its key concepts first. The following sections borrow from Monarch's Getting Started Guide (not public yet).

### Defining an Actor
At its core, Monarch uses [actors](https://en.wikipedia.org/wiki/Actor_model) as a way to create multi-machine programs. Actors are Python objects that expose a number of endpoint functions. These functions can be called by other actors in the system and their responses gathered asynchronously.

In [None]:
import asyncio
from monarch.actor import Actor, endpoint, this_proc

class Counter(Actor):
    def __init__(self, initial_value: int):
        self.value = initial_value

    @endpoint
    def increment(self) -> None:
        self.value += 1

    @endpoint
    def get_value(self) -> int:
        return self.value


The decorator `@endpoint` specifies functions of the Actor that can be called remotely from other actors.

### Spawning An Actor In The Local Process

We spawn actors in the current running process like so:

In [None]:
counter: Counter = this_proc().spawn("counter", Counter, initial_value=0)

`this_proc()` is a handle to a process, giving us direct control over where an actor runs. Monarch is very literal about where things are run, so that code can be written in the most efficient way. 

### Sending A Simple Message
Once an actor is spawned, we can send messages to the actor:

In [None]:
from monarch.actor import Future

fut: Future[int] = counter.get_value.call_one()

value = await fut

print(f"Counter value: {value}")

Here we invoke the `get_value` message, returning 0, the current value of the Counter. `call_one` here is referred to as an "adverb" because it modified how results of the endpoint are handled. `call_one` just invokes a single actor and gets its value.

Notice that the return value is a `Future[int]` - the message is sent asynchronously, letting the sender do other things before it needs the reply. We can `await` on the result.

### Multiple Actors at Once
Monarch scales to thousands of machines because of its ability to broadcast a single message to many actors at once, rather than send many point-to-point messages.

Monarch expresses broadcasted communication by organizing actors into a `Mesh` - a multi-dimensional container with named dimensions. An example cluster may have dimensions `{"hosts": 32, "gpus": 8}`. To create a mesh of actors, we'll create a mesh of processes and spawn an actor on them:

In [None]:
from monarch.actor import ProcMesh, this_host

procs: ProcMesh = this_host().spawn_procs(per_host={"gpus": 8})
counters: Counter = procs.spawn("counters", Counter, 0)

### Broadcasting Messages
Now messages can be sent to all actors in the mesh:

In [None]:
await counters.increment.call()

Note that here, we use the `call()` adverb. You will see other adverbs in Monarch code as well:
- `call_one()` - invoke a single actor and get its value (what we saw before)
- `choose()` - randomly invoke a single actor and gets its value from within a mesh of actors
- `call()` - invoke all actors in an actor mesh, and return its values as a `ValueMesh` 
- `broadcast()` - fire-and-forget all actors in an actor mesh
- `stream()` - invoke all actors and return its values as an iterator

There's much more to cover with Monarch, but these foundations provide the building blocks needed to understand how Forge creates its RL-specific services on top of this distributed actor system.

In [None]:
await procs.stop()

## Forge Services
Forge introduces *Services* - a higher-level abstraction built on top of Monarch actors. Services handle all the operational complexity of managing distributed ActorMeshes: spawning actors across nodes, fault tolerance, load balancing, and intelligent routing.

### Creating a Forge Service
Creating a Forge service requires minimal changes to actors like we've created above. You replace your Actor base with a ForgeActor, and change how you spawn the actor:

In [None]:
from forge.controller import ForgeActor
from forge.controller.service import ServiceConfig, spawn_service, shutdown_service
from monarch.actor import endpoint


class ForgeCounter(ForgeActor):
    def __init__(self, initial_value: int):
        self.value = initial_value

    @endpoint
    def increment(self) -> int:
        self.value += 1
        return self.value

    @endpoint
    def get_value(self) -> int:
        return self.value

    @endpoint
    async def reset(self):
        self.value = 0

    @endpoint
    def fail(self):
        raise RuntimeError("I was asked to fail")


counter_service = await spawn_service(
    ServiceConfig(procs_per_replica=1, num_replicas=4),
    ForgeCounter,
    initial_value=0)

Here, we've created a simple "Counter service" with 4 replicas, each running on 1 process.

### Service Adverbs: Operating at the Replica Level
Services introduce new adverbs that operate at the replica level, not individual actors. Since replicas can be spawned across multiple processes, each replica is essentially an ActorMesh in Monarch terms:

In [None]:
# choose() - routes to one replica (load balanced, and which may contain multiple actors)
await counter_service.increment.choose()

# call() - runs on ALL replicas
results = await counter_service.increment.call()

print(results)

Key distinction:
- Monarch's `choose()` picks a single actor from an `ActorMesh`
- Forge's `choose()` picks a single replica (which could be an entire `ActorMesh` of actors)

This abstraction lets you think in terms of logical compute units (replicas) rather than individual processes.

### Load Balancing in Action
Services handle load balancing:

In [None]:
await counter_service.reset.call()
print("Increment on different replicas:")
for i in range(8):
    result = await counter_service.increment.choose()
    print(f"Call {i}: Counter value = {result}")

Each replica maintains its own state, and requests are distributed evenly.

### Sticky Session for Stateful Operations
When you need to interact with the same replica consistently:

In [None]:
# Use sticky sessions to stay on one replica
async with counter_service.session():
    await counter_service.reset.choose()
    print(await counter_service.increment.choose())
    print(await counter_service.increment.choose())
    print(await counter_service.increment.choose())
          
    final_value = await counter_service.get_value.choose()
    print(f"Final value on this replica: {final_value}")

Sticky sessions can be crucial for efficiency, i.e. whenever you need to maintain KV cache state across multiple turns.

### Automatic Fault Tolerance
Services automatically handle failures:

In [None]:
# This will fail on one replica
try:
    await counter_service.fail.choose()
except ValueError:
    print("Replica failed, but service continues...")

# Subsequent calls automatically route around the failed replica
result = await counter_service.increment.choose()
print(f"Still working: {result}")

# The failed replica will be automatically recovered

Behind the scenes: Forge marks unhealthy replicas, routes traffic away from them, and spawns replacements automatically.

### Why This Matters for RL
These service abstractions solve critical RL infrastructure challenges:

1. Load balancing: Distribute rollouts across policy replicas efficiently
2. Sticky sessions: Maintain state between rollouts and their associated replicas, i.e. KV cache consistency
3. Fault tolerance: Keep training running even when individual nodes fail
4. Operational simplicity: No infrastructure code in your RL algorithms

### Performance: Control Plane vs Data Plane
One important area we haven't covered yet is how Forge separates the **control plane** (service coordination) from the **data plane** (tensor transfers). You might reasonably wonder about performance implications if all data flows through TCP in a service-based architecture.

We're actively developing **TorchStore** - our solution for high-performance tensor storage and retrieval over high-bandwidth interconnects like RDMA. This separation ensures that while Forge services handle coordination and routing, heavy tensor operations bypass the service layer entirely.

*TorchStore will be covered in detail before our official release.*


Next, we'll see how these building blocks enable elegant RL algorithm expression.

In [None]:
await shutdown_service(counter_service)

## Forge-Native Services
Now let's see the power of this abstraction in action. Forge provides service implementations of common RL components that constitute typical training workloads, for instance:
- Policy: Responsible for generating trajectories and responses
- Trainer: Responsible for updating policy weights
- Reference Model: Responsible for computing reference logprobs to prevent policy drift
- Reward: Responsible for evaluating trajectory quality
- Dataset: Responsible for serving prompts and target answers
- Advantage: Responsible for computing advantages from trajectories


### Building a Synchronous RL Workflow
Let's demonstrate by building a simple on-policy RL workflow. We'll start by spinning up multiple services using a small Qwen model:

In [None]:
from forge.actors.policy import Policy, PolicyConfig, SamplingOverrides, WorkerConfig
from forge.actors.replay_buffer import ReplayBuffer
from forge.controller.actor import ForgeActor
from forge.controller.service import ServiceConfig, shutdown_service, spawn_service
from forge.data.rewards import MathReward, ThinkingReward
from apps.grpo.main import Trainer, RewardActor, ComputeAdvantages, RefModel, DatasetActor, Group, Episode


model = "Qwen/Qwen3-1.7B"
group_size = 1

(
    dataloader,
    policy,
    trainer,
    replay_buffer,
    compute_advantages,
    ref_model,
    reward_actor,
) = await asyncio.gather(
        spawn_service(
            ServiceConfig(procs_per_replica=1, num_replicas=1),
            DatasetActor,
            path="openai/gsm8k",
            config_name="main",
            split="train",
            streaming=True,
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, with_gpus=True, num_replicas=1),
            Policy,
            config=PolicyConfig(
                worker_params=WorkerConfig(model=model),
                sampling_params=SamplingOverrides(
                    num_samples=group_size, max_tokens=16
                ),
            ),
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, with_gpus=True, num_replicas=1),
            Trainer,
            learning_rate=1e-5,
            beta=0.1,
            model_name=model,
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, num_replicas=1),
            ReplayBuffer,
            batch_size=2,
            max_policy_age=1,
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, num_replicas=1),
            ComputeAdvantages,
            gamma=0.99,
            lambda_=0.95,
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, num_replicas=1, with_gpus=True),
            RefModel,
            model_name=model,
        ),
        spawn_service(
            ServiceConfig(procs_per_replica=1, num_replicas=1),
            RewardActor,
            reward_functions=[MathReward(), ThinkingReward()],
        ))
    

What's happening here:
- Each service is spawned independently with its own configuration
- GPU services like the `policy`, `trainer`, and `ref_model` get GPU allocation
- All services run concurrently and can be scaled independently
- The same model is used for policy and reference, but they're separate services

Notice what we're not doing:
- Managing CUDA placement across services
- Coordinating distributed training setup
- Handling inter-service communication protocols
- Writing fault tolerance and retry logic

All of this is handled by our Service abstraction.

Let's check that the services indeed work as expected:

In [None]:
prompt = "What is 3 + 5?"
responses = await policy.generate.choose(prompt=prompt)

print(responses)

The response quality isn't great (it's only a 1.7B model!), but the service infrastructure is working perfectly.

## Building the RL Training Loop
### The Role of RL in Post-Training
One way to interpret the role of RL in post-training is to align a base pre-trained model towards hard-to-define targets. The goal is "sampling" the right data that we think will best align the model.

This is the role of "rollouts" - creating the dataset used to update our policy. Rather than training on a static dataset, RL dynamically generates training data by having the current policy interact with the environment.

Let's build a step-by-step synchronous training loop to see how these services work together. The basic RL cycle is:

1. Collect Experience: Get a prompt, generate a response, evaluate the reward
2. Compute Rewards: Calculate how much better/worse each action was than expected
3. Store Experience: Add the episode to our replay buffer
4. Sample & Train: Get a batch of experiences and update the policy
5. Repeat: Continue this cycle to improve the policy


In [None]:
from apps.grpo.main import Episode, Group


async def simple_rl_step():
    """Execute one complete RL training step"""
    
    # ===== Generate a rollout =====
    sample = await dataloader.__next__.choose()
    prompt, target = sample["question"], sample["answer"]
    
    print(f"Prompt: {prompt}")
    print(f"Target: {target}")
    
    actions = await policy.generate.choose(prompt=prompt)
    print(f"Policy response: {actions[0].text}")
    
    ref_logprobs = await ref_model.forward.choose(actions[0].token_ids)    
    reward = await reward_actor.evaluate_response.choose(
        prompt=prompt, 
        response=actions[0].text, 
        target=target
    )
    print(f"Reward: {reward}")
    
    episode = Episode(
        episode_id=0,
        prompt=prompt,
        target=target, 
        policy_version=0,
    )
    
    episode.add_group(Group(
        response=actions[0].text,
        ref_logprobs=ref_logprobs,
        reward=reward,
    ))
    
    advantages = await compute_advantages.__call__.choose(episode.groups)
    episode.groups[0].advantage = advantages[0]
    print(f"Advantage: {advantages[0]}")    
    await replay_buffer.add.choose(episode)
    print("Episode stored in replay buffer")
    
    # ===== Train on the batch ===== 
    batch = await replay_buffer.sample.choose(curr_policy_version=0)
    if batch is not None:
        print("Training on batch...")
        training_result = await trainer.train_step.choose(batch)
        loss = training_result.get("loss", 0.0)
        print(f"Training loss: {loss}")
        return loss
    else:
        print("Not enough data in buffer yet")
        return None


for step in range(10):
    print(f"\n--- RL Step {step + 1} ---")
    loss = await simple_rl_step()
    if loss:
        print(f"Step {step + 1} complete, loss: {loss:.4f}")
    else:
        print(f"Step {step + 1} complete, building buffer...")

**Note**: The model responses aren't great (1.7B parameters + 16 token limit = not exactly o1!), but notice how clean the RL algorithm code is. The power of these abstractions is that you can focus on the algorithm logic while all the distributed coordination happens automatically behind the scenes.

TODO - conclude this with trainer->inference weight sync and demonstrate how the response changes

## Next Steps
This simple example demonstrates the core concepts, but for a production-ready implementation, check out our full GRPO (Group Relative Policy Optimization) example at apps/grpo/main.py. It includes the complete async training loops, proper logging, model weight synchronization, and all the optimizations needed for large-scale RL training.

In [None]:
await asyncio.gather(
    shutdown_service(policy),
    shutdown_service(trainer),
    shutdown_service(replay_buffer),
    shutdown_service(dataloader),
    shutdown_service(compute_advantages),
    shutdown_service(ref_model),
    shutdown_service(reward_actor))