# 03 — Reward Design Fundamentals: Real Implementations

> **Purpose:** Implement reward functions for LLM reasoning training. Each component is production-ready code.

| Component | What It Does |
|-----------|-------------|
| Verifiable Rewards | Binary accuracy + format checking |
| Outcome Reward Model | Learned scalar from (prompt, response) |
| RLAIF Scorer | Extract soft labels from LLM logits |
| Reward Hacking Prevention | Length penalty, KL control |
| Production Stack | Layered reward system |

---

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import re
import numpy as np
from typing import List, Tuple, Optional, Dict

torch.manual_seed(42)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {device}")

Device: cpu


## 1. Verifiable Reward Model (VRM)

DeepSeek R1's exact reward: `1.0 × accuracy + 0.2 × format`

**Key insight:** No neural network in the reward loop → unhackable.

In [2]:
class VerifiableRewardModel:
    """
    Rule-based reward model for verifiable tasks.
    Based on DeepSeek R1's approach.
    
    Components:
    - Accuracy reward: Binary (correct or not)
    - Format reward: Structure compliance
    """
    
    def __init__(self, accuracy_weight: float = 1.0, format_weight: float = 0.2):
        self.accuracy_weight = accuracy_weight
        self.format_weight = format_weight
        
        # Expected format: <think>...</think><answer>...</answer>
        self.format_pattern = re.compile(
            r'<think>.*?</think>\s*<answer>.*?</answer>',
            re.DOTALL
        )
        
        # Answer extraction pattern
        self.answer_pattern = re.compile(
            r'<answer>(.*?)</answer>',
            re.DOTALL
        )
    
    def extract_answer(self, response: str) -> Optional[str]:
        """Extract the answer from structured response."""
        match = self.answer_pattern.search(response)
        if match:
            return match.group(1).strip()
        
        # Fallback: try to extract last number or boxed answer
        boxed = re.search(r'\\boxed{([^}]+)}', response)
        if boxed:
            return boxed.group(1).strip()
        
        # Last number in response
        numbers = re.findall(r'-?\d+(?:\.\d+)?', response)
        return numbers[-1] if numbers else None
    
    def check_accuracy(self, response: str, ground_truth: str) -> float:
        """Check if extracted answer matches ground truth."""
        predicted = self.extract_answer(response)
        if predicted is None:
            return 0.0
        
        # Normalize for comparison
        predicted = predicted.lower().strip()
        ground_truth = str(ground_truth).lower().strip()
        
        # Exact match
        if predicted == ground_truth:
            return 1.0
        
        # Numeric comparison (handle floating point)
        try:
            if abs(float(predicted) - float(ground_truth)) < 1e-6:
                return 1.0
        except (ValueError, TypeError):
            pass
        
        return 0.0
    
    def check_format(self, response: str) -> float:
        """Check if response follows expected format."""
        if self.format_pattern.search(response):
            return 1.0
        return 0.0
    
    def compute_reward(self, response: str, ground_truth: str) -> Dict[str, float]:
        """
        Compute total reward with breakdown.
        
        Returns:
            Dict with 'accuracy', 'format', and 'total' rewards
        """
        accuracy = self.check_accuracy(response, ground_truth)
        format_score = self.check_format(response)
        
        total = (self.accuracy_weight * accuracy + 
                 self.format_weight * format_score)
        
        return {
            'accuracy': accuracy,
            'format': format_score,
            'total': total
        }

In [3]:
# TEST: Verifiable Reward Model

vrm = VerifiableRewardModel()

test_cases = [
    # Good format, correct answer
    ("<think>2 + 2 = 4</think><answer>4</answer>", "4"),
    # Good format, wrong answer
    ("<think>Let me compute...</think><answer>5</answer>", "4"),
    # Bad format, correct answer (fallback extraction)
    ("The answer is 4.", "4"),
    # LaTeX boxed answer
    ("Therefore, \\boxed{42}", "42"),
    # No answer extractable
    ("I don't know how to solve this.", "4"),
]

print(f"{'Response (truncated)':<50} {'GT':>5} {'Acc':>5} {'Fmt':>5} {'Total':>6}")
print("-" * 75)
for response, gt in test_cases:
    result = vrm.compute_reward(response, gt)
    display = response[:47] + "..." if len(response) > 50 else response
    print(f"{display:<50} {gt:>5} {result['accuracy']:>5.1f} {result['format']:>5.1f} {result['total']:>6.2f}")

Response (truncated)                                  GT   Acc   Fmt  Total
---------------------------------------------------------------------------
<think>2 + 2 = 4</think><answer>4</answer>             4   1.0   1.0   1.20
<think>Let me compute...</think><answer>5</answer>     4   0.0   1.0   0.20
The answer is 4.                                       4   1.0   0.0   1.00
Therefore, \boxed{42}                                 42   1.0   0.0   1.00
I don't know how to solve this.                        4   0.0   0.0   0.00


## 2. Code Execution Reward

For programming tasks: execute code against test cases.

In [4]:
import subprocess
import tempfile
import os

class CodeExecutionReward:
    """
    Reward model for code generation tasks.
    Executes code against test cases.
    """
    
    def __init__(self, timeout: float = 5.0):
        self.timeout = timeout
    
    def extract_code(self, response: str) -> Optional[str]:
        """Extract Python code from response."""
        # Try to find code blocks
        patterns = [
            r'```python\n(.*?)```',
            r'```\n(.*?)```',
            r'<code>(.*?)</code>',
        ]
        for pattern in patterns:
            match = re.search(pattern, response, re.DOTALL)
            if match:
                return match.group(1).strip()
        return None
    
    def execute_with_tests(self, code: str, test_cases: List[Tuple[str, str]]) -> Dict:
        """
        Execute code and run test cases.
        
        Args:
            code: Python code to execute
            test_cases: List of (input, expected_output) tuples
        
        Returns:
            Dict with 'passed', 'total', 'reward'
        """
        passed = 0
        total = len(test_cases)
        
        for test_input, expected in test_cases:
            # Create test script
            test_script = f"""
{code}

# Run test
result = solution({test_input})
print(result)
"""
            try:
                with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
                    f.write(test_script)
                    temp_path = f.name
                
                result = subprocess.run(
                    ['python', temp_path],
                    capture_output=True,
                    text=True,
                    timeout=self.timeout
                )
                
                os.unlink(temp_path)
                
                if result.returncode == 0:
                    output = result.stdout.strip()
                    if output == str(expected).strip():
                        passed += 1
            except Exception:
                pass
        
        return {
            'passed': passed,
            'total': total,
            'reward': passed / total if total > 0 else 0.0
        }
    
    def compute_reward(self, response: str, test_cases: List[Tuple]) -> Dict:
        """Compute reward for code response."""
        code = self.extract_code(response)
        if code is None:
            return {'passed': 0, 'total': len(test_cases), 'reward': 0.0}
        
        return self.execute_with_tests(code, test_cases)

## 3. Outcome Reward Model (ORM)

A neural network that scores (prompt, response) pairs.

In [5]:
class OutcomeRewardModel(nn.Module):
    """
    Learned reward model that scores complete responses.
    
    Architecture:
    - Embed prompt + response
    - Pool to fixed size
    - Project to scalar reward
    """
    
    def __init__(self, vocab_size: int = 50000, hidden_size: int = 256):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, hidden_size)
        self.encoder = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.reward_head = nn.Linear(hidden_size, 1)
    
    def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
        """
        Compute reward for input sequences.
        
        Args:
            input_ids: (batch, seq_len) token IDs for prompt + response
        
        Returns:
            rewards: (batch,) scalar rewards
        """
        h = self.embed(input_ids)  # (batch, seq_len, hidden)
        _, h_final = self.encoder(h)  # (1, batch, hidden)
        h_final = h_final.squeeze(0)  # (batch, hidden)
        rewards = self.reward_head(h_final).squeeze(-1)  # (batch,)
        return rewards
    
    def compute_preference_loss(self, chosen_ids: torch.Tensor, 
                                 rejected_ids: torch.Tensor) -> torch.Tensor:
        """
        Bradley-Terry preference loss for training.
        
        Args:
            chosen_ids: Token IDs for preferred responses
            rejected_ids: Token IDs for rejected responses
        
        Returns:
            loss: Scalar loss to minimize
        """
        r_chosen = self.forward(chosen_ids)
        r_rejected = self.forward(rejected_ids)
        
        # Log-sigmoid of reward difference
        # Loss: -log(sigmoid(r_chosen - r_rejected))
        loss = -F.logsigmoid(r_chosen - r_rejected).mean()
        return loss

In [6]:
# TEST: ORM forward pass

orm = OutcomeRewardModel(vocab_size=1000, hidden_size=64).to(device)

# Simulate batch of (prompt + response) sequences
batch_size = 4
seq_len = 100
input_ids = torch.randint(0, 1000, (batch_size, seq_len), device=device)

rewards = orm(input_ids)
print(f"Input shape: {input_ids.shape}")
print(f"Output rewards: {rewards.detach().cpu().numpy()}")
print(f"Reward shape: {rewards.shape}")

Input shape: torch.Size([4, 100])
Output rewards: [0.2161513  0.23310584 0.133057   0.15649924]
Reward shape: torch.Size([4])


## 4. RLAIF: AI Feedback with Soft Labels

Use an LLM to score responses on a 1-10 scale.

In [7]:
class RLAIFScorer:
    """
    RLAIF/d-RLAIF scorer using LLM logits.
    
    Key techniques:
    - Soft labels (probability distribution over scores)
    - Position bias mitigation (average both orderings)
    """
    
    def __init__(self, score_tokens: List[str] = None):
        # Token IDs for scores 1-10
        # In practice, this maps to actual tokenizer IDs
        self.score_tokens = score_tokens or ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
    
    def compute_expected_score(self, logits: torch.Tensor, 
                                score_token_ids: List[int]) -> float:
        """
        Compute expected score from logits.
        
        Args:
            logits: (vocab_size,) logits for next token
            score_token_ids: Token IDs for '1', '2', ..., '10'
        
        Returns:
            Expected score in [1, 10]
        """
        # Extract logits for score tokens
        score_logits = logits[score_token_ids]
        
        # Convert to probabilities
        probs = F.softmax(score_logits, dim=0)
        
        # Compute expected score: sum(i * P(i))
        scores = torch.arange(1, len(score_token_ids) + 1, dtype=torch.float, device=logits.device)
        expected = (scores * probs).sum()
        
        return expected.item()
    
    def normalize_score(self, score: float, min_val: float = 1.0, 
                        max_val: float = 10.0) -> float:
        """
        Normalize score to [-1, 1] for RL.
        """
        return 2 * (score - min_val) / (max_val - min_val) - 1
    
    def mitigate_position_bias(self, score_a_first: float, 
                                score_b_first: float) -> Tuple[float, float]:
        """
        Mitigate position bias by averaging both orderings.
        
        When comparing A vs B:
        - First inference: A presented first → get score_A1
        - Second inference: B presented first → get score_A2
        - Final: average(score_A1, 1 - score_B_when_first)
        """
        # For pairwise comparison, we'd run both orderings
        # Here we just demonstrate the averaging
        avg = (score_a_first + score_b_first) / 2
        return avg

In [8]:
# DEMONSTRATION: Soft label extraction

def demo_soft_labels():
    """Show how soft labels are extracted from LLM logits."""
    
    # Simulate LLM logits for next token (vocab size 100)
    vocab_size = 100
    logits = torch.randn(vocab_size)
    
    # Assume tokens 0-9 correspond to scores '1'-'10'
    score_token_ids = list(range(10))
    
    # Make the model "prefer" scores 7-8
    logits[6] = 3.0  # Score 7
    logits[7] = 2.5  # Score 8
    
    scorer = RLAIFScorer()
    expected = scorer.compute_expected_score(logits, score_token_ids)
    normalized = scorer.normalize_score(expected)
    
    print("RLAIF Soft Label Extraction:")
    print(f"  Raw logits (scores 1-10): {logits[:10].numpy().round(2)}")
    probs = F.softmax(logits[:10], dim=0).numpy()
    print(f"  Probabilities: {probs.round(3)}")
    print(f"  Expected score: {expected:.2f}")
    print(f"  Normalized reward: {normalized:.3f}")

demo_soft_labels()

RLAIF Soft Label Extraction:
  Raw logits (scores 1-10): [-1.07 -0.09 -0.82 -0.41  0.48 -0.83  3.    2.5   0.17  0.66]
  Probabilities: [0.009 0.023 0.011 0.017 0.041 0.011 0.505 0.306 0.03  0.049]
  Expected score: 7.16
  Normalized reward: 0.369


## 5. Reward Hacking Prevention

### 5.1 Length Penalty

In [9]:
class LengthPenalizedReward:
    """
    Prevent length hacking by penalizing excessive verbosity.
    
    Methods:
    - Linear penalty: reward - alpha * length
    - Quadratic penalty: reward - alpha * (length - target)^2
    - Ratio penalty: reward * min(1, target_length / actual_length)
    """
    
    def __init__(self, method: str = 'ratio', target_length: int = 500, alpha: float = 0.001):
        self.method = method
        self.target_length = target_length
        self.alpha = alpha
    
    def apply_penalty(self, reward: float, response_length: int) -> float:
        """
        Apply length penalty to reward.
        
        Args:
            reward: Raw reward score
            response_length: Number of tokens in response
        
        Returns:
            Penalized reward
        """
        if self.method == 'linear':
            # Simple linear penalty
            penalty = self.alpha * max(0, response_length - self.target_length)
            return reward - penalty
        
        elif self.method == 'quadratic':
            # Quadratic penalty for deviation from target
            diff = response_length - self.target_length
            penalty = self.alpha * diff ** 2 if diff > 0 else 0
            return reward - penalty
        
        elif self.method == 'ratio':
            # Scale reward by efficiency ratio
            if response_length <= self.target_length:
                return reward
            ratio = self.target_length / response_length
            return reward * ratio
        
        return reward

In [10]:
# TEST: Length penalty effects

lp = LengthPenalizedReward(method='ratio', target_length=500)

print("Length Penalty Demo (target=500 tokens):")
print(f"{'Length':>10} {'Raw Reward':>12} {'Penalized':>12} {'Reduction':>12}")
print("-" * 50)

for length in [200, 500, 1000, 2000, 5000]:
    raw = 1.0
    penalized = lp.apply_penalty(raw, length)
    reduction = (1 - penalized / raw) * 100
    print(f"{length:>10} {raw:>12.2f} {penalized:>12.2f} {reduction:>11.1f}%")

Length Penalty Demo (target=500 tokens):
    Length   Raw Reward    Penalized    Reduction
--------------------------------------------------
       200         1.00         1.00         0.0%
       500         1.00         1.00         0.0%
      1000         1.00         0.50        50.0%
      2000         1.00         0.25        75.0%
      5000         1.00         0.10        90.0%


### 5.2 Adaptive KL Controller

In [11]:
class AdaptiveKLController:
    """
    Adaptive KL penalty coefficient.
    
    Adjusts beta (KL coefficient) to keep KL divergence
    within a target range. Based on Anthropic's approach.
    
    If KL > target: increase beta (penalize more)
    If KL < target: decrease beta (allow more exploration)
    """
    
    def __init__(self, init_beta: float = 0.1, target_kl: float = 6.0,
                 horizon: int = 10000):
        self.beta = init_beta
        self.target_kl = target_kl
        self.horizon = horizon  # Steps to adjust over
    
    def update(self, current_kl: float) -> float:
        """
        Update beta based on current KL divergence.
        
        Args:
            current_kl: Measured KL divergence
        
        Returns:
            Updated beta value
        """
        # Proportional control
        error = current_kl - self.target_kl
        adjustment = 1 + error / self.horizon
        
        # Clamp adjustment to prevent instability
        adjustment = max(0.5, min(2.0, adjustment))
        
        self.beta *= adjustment
        
        # Clamp beta to reasonable range
        self.beta = max(0.001, min(1.0, self.beta))
        
        return self.beta
    
    def compute_kl_penalty(self, log_probs: torch.Tensor, 
                           ref_log_probs: torch.Tensor) -> torch.Tensor:
        """
        Compute KL penalty for given log probabilities.
        
        KL(policy || reference) ≈ E[log(policy) - log(reference)]
        
        Args:
            log_probs: (batch, seq_len) current policy log probs
            ref_log_probs: (batch, seq_len) reference model log probs
        
        Returns:
            kl_penalty: (batch,) per-sequence KL penalty
        """
        kl_per_token = log_probs - ref_log_probs  # (batch, seq_len)
        kl_per_sequence = kl_per_token.mean(dim=1)  # (batch,)
        
        return self.beta * kl_per_sequence

In [12]:
# TEST: Adaptive KL controller

kl_ctrl = AdaptiveKLController(init_beta=0.1, target_kl=6.0)

print("Adaptive KL Controller Demo:")
print(f"{'Step':>6} {'Current KL':>12} {'Beta':>10} {'Action':>15}")
print("-" * 50)

# Simulate KL divergence over training
simulated_kl = [3.0, 4.5, 7.0, 10.0, 8.0, 6.5, 6.0, 5.5, 6.0, 6.2]

for step, kl in enumerate(simulated_kl):
    old_beta = kl_ctrl.beta
    new_beta = kl_ctrl.update(kl)
    
    if kl > kl_ctrl.target_kl:
        action = "↑ penalize more"
    elif kl < kl_ctrl.target_kl:
        action = "↓ relax penalty"
    else:
        action = "→ maintain"
    
    print(f"{step:>6} {kl:>12.1f} {new_beta:>10.4f} {action:>15}")

Adaptive KL Controller Demo:
  Step   Current KL       Beta          Action
--------------------------------------------------
     0          3.0     0.1000 ↓ relax penalty
     1          4.5     0.1000 ↓ relax penalty
     2          7.0     0.1000 ↑ penalize more
     3         10.0     0.1000 ↑ penalize more
     4          8.0     0.1000 ↑ penalize more
     5          6.5     0.1000 ↑ penalize more
     6          6.0     0.1000      → maintain
     7          5.5     0.1000 ↓ relax penalty
     8          6.0     0.1000      → maintain
     9          6.2     0.1000 ↑ penalize more


## 6. Production Reward Stack

Combine all components into a layered reward system.

In [13]:
class ProductionRewardStack:
    """
    Three-layer reward architecture:
    
    Layer 1 (Foundation): Verifiable rewards (unhackable)
    Layer 2 (Quality): Learned preferences (RLAIF/ORM)
    Layer 3 (Weighting): Dynamic combination based on training phase
    
    Plus: KL penalty and length penalty
    """
    
    def __init__(self, 
                 vrm: VerifiableRewardModel,
                 orm: Optional[OutcomeRewardModel] = None,
                 kl_controller: Optional[AdaptiveKLController] = None,
                 length_penalty: Optional[LengthPenalizedReward] = None,
                 verifiable_weight: float = 0.9,
                 quality_weight: float = 0.1):
        
        self.vrm = vrm
        self.orm = orm
        self.kl_controller = kl_controller or AdaptiveKLController()
        self.length_penalty = length_penalty or LengthPenalizedReward()
        
        self.verifiable_weight = verifiable_weight
        self.quality_weight = quality_weight
    
    def compute_reward(self, 
                       response: str,
                       ground_truth: str,
                       response_ids: Optional[torch.Tensor] = None,
                       log_probs: Optional[torch.Tensor] = None,
                       ref_log_probs: Optional[torch.Tensor] = None) -> Dict:
        """
        Compute full reward stack.
        
        Returns:
            Dict with all reward components and total
        """
        result = {}
        
        # Layer 1: Verifiable rewards
        vrm_result = self.vrm.compute_reward(response, ground_truth)
        result['verifiable'] = vrm_result['total']
        result['accuracy'] = vrm_result['accuracy']
        result['format'] = vrm_result['format']
        
        # Layer 2: Quality rewards (if ORM provided)
        if self.orm is not None and response_ids is not None:
            with torch.no_grad():
                quality = self.orm(response_ids).item()
            result['quality'] = quality
        else:
            result['quality'] = 0.0
        
        # Dynamic weighting
        combined = (self.verifiable_weight * result['verifiable'] + 
                   self.quality_weight * result['quality'])
        
        # Apply length penalty
        response_length = len(response.split())  # Simple word count
        combined = self.length_penalty.apply_penalty(combined, response_length)
        result['length_penalized'] = combined
        
        # KL penalty (if log probs provided)
        if log_probs is not None and ref_log_probs is not None:
            kl_penalty = self.kl_controller.compute_kl_penalty(
                log_probs, ref_log_probs
            )
            result['kl_penalty'] = kl_penalty.item()
            combined = combined - result['kl_penalty']
        else:
            result['kl_penalty'] = 0.0
        
        result['total'] = combined
        return result
    
    def adjust_weights(self, training_progress: float):
        """
        Dynamically adjust layer weights based on training progress.
        
        Early training: Rely more on verifiable (90%)
        Late training: Allow more quality influence (70%)
        
        Args:
            training_progress: Float in [0, 1], 0=start, 1=end
        """
        # Linear interpolation
        early_v = 0.9
        late_v = 0.7
        
        self.verifiable_weight = early_v - (early_v - late_v) * training_progress
        self.quality_weight = 1.0 - self.verifiable_weight

In [14]:
# TEST: Production Reward Stack

# Initialize components
vrm = VerifiableRewardModel()
lp = LengthPenalizedReward(target_length=100)  # Short target for demo
kl = AdaptiveKLController()

stack = ProductionRewardStack(
    vrm=vrm,
    length_penalty=lp,
    kl_controller=kl,
    verifiable_weight=0.9,
    quality_weight=0.1
)

# Test responses
test_cases = [
    ("<think>2+2=4</think><answer>4</answer>", "4"),
    ("<think>" + "Let me think... " * 50 + "</think><answer>4</answer>", "4"),  # Verbose
    ("<think>Wrong calculation</think><answer>5</answer>", "4"),  # Wrong answer
]

print("Production Reward Stack Demo:")
print(f"{'Case':>10} {'Accuracy':>10} {'Format':>8} {'Length':>8} {'Total':>8}")
print("-" * 50)

for i, (response, gt) in enumerate(test_cases):
    result = stack.compute_reward(response, gt)
    print(f"{i+1:>10} {result['accuracy']:>10.1f} {result['format']:>8.1f} "
          f"{result['length_penalized']:>8.2f} {result['total']:>8.2f}")

Production Reward Stack Demo:
      Case   Accuracy   Format   Length    Total
--------------------------------------------------
         1        1.0      1.0     1.08     1.08
         2        1.0      1.0     0.72     0.72
         3        0.0      1.0     0.18     0.18


## 7. Summary: Reward Design Principles

**Implementation Hierarchy:**

```python
# 1. Start with verifiable rewards (if applicable)
vrm = VerifiableRewardModel()

# 2. Add quality signals for subjective aspects
orm = OutcomeRewardModel()  # or RLAIFScorer()

# 3. Prevent hacking
length_penalty = LengthPenalizedReward()
kl_controller = AdaptiveKLController()

# 4. Combine into production stack
stack = ProductionRewardStack(vrm, orm, kl_controller, length_penalty)
```

**Key Principles:**

1. **Verifiable > Learned** — Ground in objective truth when possible
2. **Simple > Complex** — DeepSeek R1 uses just 2 signals
3. **Prevent hacking** — KL penalty + length penalty are essential
4. **Dynamic weighting** — Trust verifiable more early in training

---
**Tier 1 Complete!** You now have the foundational understanding for production RL training of reasoning models.