# Training Reward Models

**Loss functions, optimization, and best practices**

## The Ranking Loss

The core training objective is the **Bradley-Terry ranking loss**:

$$\mathcal{L}_{\text{RM}} = -\mathbb{E}_{(x, y_w, y_l) \sim D} \left[ \log \sigma(r_\theta(x, y_w) - r_\theta(x, y_l)) \right]$$

where:
- $x$ = prompt
- $y_w$ = chosen (winner) response
- $y_l$ = rejected (loser) response
- $r_\theta(x, y)$ = reward model score
- $\sigma(z) = \frac{1}{1 + e^{-z}}$ = sigmoid function

**Goal:** Maximize $P(y_w \succ y_l | x) = \sigma(r_\theta(x, y_w) - r_\theta(x, y_l))$

In [1]:
import torch
import torch.nn.functional as F

def compute_ranking_loss(
    chosen_rewards: torch.Tensor,
    rejected_rewards: torch.Tensor,
    margin: float = 0.0
) -> torch.Tensor:
    """
    Compute ranking loss for reward model training.
    
    Loss = -log(sigmoid(chosen_reward - rejected_reward - margin))
    
    Args:
        chosen_rewards: Rewards for chosen responses, shape (batch_size,)
        rejected_rewards: Rewards for rejected responses, shape (batch_size,)
        margin: Optional margin to enforce minimum difference
    
    Returns:
        Ranking loss tensor
    """
    # Compute difference: chosen should be higher than rejected
    logits = chosen_rewards - rejected_rewards - margin
    
    # Apply log-sigmoid for numerical stability
    # -log(sigmoid(x)) = log(1 + exp(-x)) = softplus(-x)
    loss = F.softplus(-logits)
    
    return loss.mean()

# Example
batch_size = 4
chosen_rewards = torch.tensor([2.0, 1.5, 3.0, 0.5])
rejected_rewards = torch.tensor([1.0, 1.0, 2.0, 1.0])

loss = compute_ranking_loss(chosen_rewards, rejected_rewards)
print(f"Ranking loss: {loss.item():.4f}")

# Accuracy: how often is chosen > rejected?
accuracy = (chosen_rewards > rejected_rewards).float().mean()
print(f"Accuracy: {accuracy.item():.2%}")

Ranking loss: 0.5187
Accuracy: 75.00%


## Training Metrics

| Metric | What It Measures | Target |
|--------|------------------|--------|
| **Loss** | Preference prediction quality | Decreasing |
| **Accuracy** | % of pairs ranked correctly | > 70% |
| **Mean Margin** | Average reward difference | Positive, increasing |

In [2]:
def compute_ranking_loss_with_metrics(
    chosen_rewards: torch.Tensor,
    rejected_rewards: torch.Tensor,
    margin: float = 0.0
) -> dict:
    """Compute ranking loss with additional metrics."""
    loss = compute_ranking_loss(chosen_rewards, rejected_rewards, margin)
    
    # Accuracy: how often does model rank chosen higher?
    accuracy = (chosen_rewards > rejected_rewards).float().mean()
    
    # Mean rewards
    mean_chosen = chosen_rewards.mean()
    mean_rejected = rejected_rewards.mean()
    
    # Mean margin (reward difference)
    mean_margin = (chosen_rewards - rejected_rewards).mean()
    
    return {
        "loss": loss,
        "accuracy": accuracy,
        "mean_chosen_reward": mean_chosen,
        "mean_rejected_reward": mean_rejected,
        "mean_margin": mean_margin,
    }

metrics = compute_ranking_loss_with_metrics(chosen_rewards, rejected_rewards)
print("Training metrics:")
for k, v in metrics.items():
    print(f"  {k}: {v.item():.4f}")

Training metrics:
  loss: 0.5187
  accuracy: 0.7500
  mean_chosen_reward: 1.7500
  mean_rejected_reward: 1.2500
  mean_margin: 0.5000


## Complete Training Loop

In [3]:
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AutoModel, AutoTokenizer, get_linear_schedule_with_warmup
from tqdm import tqdm

class RewardModel(nn.Module):
    """Reward model for predicting human preferences."""
    
    def __init__(self, base_model, hidden_size):
        super().__init__()
        self.base_model = base_model
        self.value_head = nn.Sequential(
            nn.Dropout(0.1),
            nn.Linear(hidden_size, 1)
        )
    
    def get_rewards(self, input_ids, attention_mask):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )
        hidden_states = outputs.last_hidden_state
        
        # Get last token's hidden state
        seq_lengths = attention_mask.sum(dim=1) - 1
        batch_size = hidden_states.shape[0]
        last_hidden = hidden_states[
            torch.arange(batch_size, device=hidden_states.device),
            seq_lengths.long()
        ]
        
        return self.value_head(last_hidden).squeeze(-1)

In [4]:
def train_reward_model(model, train_loader, eval_loader, config, device):
    """Complete reward model training loop."""
    
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config['learning_rate'],
        weight_decay=0.01
    )
    
    total_steps = len(train_loader) * config['num_epochs']
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config['warmup_steps'],
        num_training_steps=total_steps
    )
    
    model.train()
    best_eval_accuracy = 0.0
    
    for epoch in range(config['num_epochs']):
        epoch_metrics = {'loss': 0, 'accuracy': 0}
        
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
        
        for batch in progress_bar:
            # Move batch to device
            batch = {k: v.to(device) for k, v in batch.items()}
            
            # Forward pass for chosen responses
            chosen_rewards = model.get_rewards(
                batch['chosen_input_ids'],
                batch['chosen_attention_mask']
            )
            
            # Forward pass for rejected responses
            rejected_rewards = model.get_rewards(
                batch['rejected_input_ids'],
                batch['rejected_attention_mask']
            )
            
            # Compute loss and metrics
            metrics = compute_ranking_loss_with_metrics(
                chosen_rewards, rejected_rewards
            )
            loss = metrics['loss']
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            
            optimizer.step()
            scheduler.step()
            
            # Update metrics
            epoch_metrics['loss'] += loss.item()
            epoch_metrics['accuracy'] += metrics['accuracy'].item()
            
            progress_bar.set_postfix({
                'loss': f"{loss.item():.4f}",
                'acc': f"{metrics['accuracy'].item():.2%}"
            })
        
        # End of epoch
        avg_loss = epoch_metrics['loss'] / len(train_loader)
        avg_acc = epoch_metrics['accuracy'] / len(train_loader)
        print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.2%}")
    
    return model

## Key Hyperparameters

| Parameter | Typical Value | Notes |
|-----------|---------------|-------|
| Learning rate | 1e-5 | Much lower than SFT |
| Batch size | 4 | Small (2 sequences per sample) |
| Epochs | 1 | Avoid overfitting |
| Gradient accumulation | 4 | Effective batch = 16 |

In [5]:
# Training configuration
config = {
    'learning_rate': 1e-5,
    'batch_size': 4,
    'num_epochs': 1,
    'warmup_steps': 100,
    'gradient_accumulation_steps': 4,
    'max_grad_norm': 1.0,
}

print("Reward Model Training Configuration:")
for k, v in config.items():
    print(f"  {k}: {v}")

Reward Model Training Configuration:
  learning_rate: 1e-05
  batch_size: 4
  num_epochs: 1
  warmup_steps: 100
  gradient_accumulation_steps: 4
  max_grad_norm: 1.0


## Common Training Issues

### Low Accuracy (< 60%)
- Model not learning preferences
- Try: Lower learning rate, check data quality

### Overfitting
- Training accuracy >> eval accuracy
- Try: Fewer epochs, more dropout, freeze base model

### Training Instability
- Loss spikes, NaN values
- Try: Lower learning rate, gradient clipping, longer warmup

## Next Steps

Now that we can train reward models, let's learn how to evaluate them properly.