In [54]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import torch.nn.functional as F

In [55]:
num_pairs = 10
dim_trajectory = 2

# Randomly generate some fake "trajectories"...
trajectories = [np.random.randn(dim_trajectory).astype(np.float32)
                for _ in range(2 * num_pairs)]
pairs = []
for i in range(num_pairs):
    tau1 = trajectories[2*i]
    tau2 = trajectories[2*i + 1]
    # For demonstration, let's define a ground-truth "secret" weight
    # that humans used to prefer one or the other:
    # e.g. prefer bigger L2 norm
    secret_weight = np.array([1.0, 1.0], dtype=np.float32)
    val1 = np.dot(tau1, secret_weight)
    val2 = np.dot(tau2, secret_weight)
    label = 1 if val1 > val2 else 0  # τ1 preferred if val1 > val2
    pairs.append((tau1, tau2, label))

class RewardModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 16),
            nn.Tanh(),
            nn.Linear(16, 1)
        )
    def forward(self, x):
        # x shape: [batch_size, input_dim]
        return self.net(x).squeeze(-1)  # output shape [batch_size]

reward_model = RewardModel(input_dim=dim_trajectory)
optimizer_rm = optim.Adam(reward_model.parameters(), lr=1e-3)

def pairwise_loss(tau1, tau2, label):
    """
    tau1, tau2: [batch_size, input_dim]
    label: [batch_size], 1 means τ1>τ2, 0 means τ2>τ1
    Bradley–Terry logistic: -log σ(R(τ1) - R(τ2)) for label=1
                            -log σ(R(τ2) - R(τ1)) for label=0
    """
    r1 = reward_model(tau1)  # shape [batch_size]
    r2 = reward_model(tau2)
    logits = r1 - r2  # shape [batch_size]
    # label=1 => want σ(logits) near 1
    # label=0 => want σ(logits) near 0 => σ(-logits) near 1
    # cross entropy:
    #   if label=1 => -log σ(logits)
    #   if label=0 => -log [1 - σ(logits)] = -log σ(-logits)
    # Implementation trick: use BCEWithLogitsLoss
    labels_tensor = label.float()
    criterion = nn.BCEWithLogitsLoss()
    loss = criterion(logits, labels_tensor)
    return loss

num_epochs = 50
for epoch in range(num_epochs):
    random.shuffle(pairs)
    epoch_loss = 0.0
    for (tau1_np, tau2_np, lbl) in pairs:
        tau1_tensor = torch.from_numpy(tau1_np).unsqueeze(0)  # [1, dim]
        tau2_tensor = torch.from_numpy(tau2_np).unsqueeze(0)  # [1, dim]
        lbl_tensor  = torch.tensor([lbl], dtype=torch.float32) # shape [1]

        optimizer_rm.zero_grad()
        loss = pairwise_loss(tau1_tensor, tau2_tensor, lbl_tensor)
        loss.backward()
        optimizer_rm.step()

        epoch_loss += loss.item()
    if (epoch+1)%10==0:
        print(f"Epoch {epoch+1}, loss={epoch_loss:.3f}")

print("Reward model training complete.\n")

class TinyEnv:
    """
    Toy environment with a 2D state s. We let the agent take action a in {0,1,2,...}
    We'll define the next state randomly. We'll run for a fixed # of steps.
    """
    def __init__(self):
        self.max_steps = 5
        self.t = 0
        self.state = np.zeros(2, dtype=np.float32)
    def reset(self):
        self.t = 0
        self.state = np.random.randn(2).astype(np.float32)
        return self.state
    def step(self, action):
        # For demonstration, random next state
        next_state = np.random.randn(2).astype(np.float32)
        self.state = next_state
        self.t += 1
        done = (self.t >= self.max_steps)
        # We'll define the "trajectory" as the next state for reward
        # In a real example, the trajectory would be the entire sequence, but
        # we show how to just apply Rψ on the next state as a stand-in
        r_tensor = torch.from_numpy(next_state).unsqueeze(0)
        reward = reward_model(r_tensor).item()  # use the reward model
        return next_state, reward, done, {}

# We'll define a very minimal policy network
class PolicyNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, action_dim=3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, action_dim)
        )
    def forward(self, x):
        return self.net(x)  # logits

policy = PolicyNet(input_dim=2, hidden_dim=16, action_dim=3)
optimizer_pi = optim.Adam(policy.parameters(), lr=1e-3)

# For PPO, we need some approximation. We'll skip advantage estimation here
# and do a naive policy gradient approach to keep it short.
def compute_logprob(logits, action):
    # logits shape [batch, act_dim], action shape [batch]
    log_probs = torch.log_softmax(logits, dim=-1)
    return torch.gather(log_probs, 1, action.unsqueeze(1)).squeeze(1)

def run_episode(env, policy):
    states, actions, rewards, logprobs = [], [], [], []
    s = env.reset()
    done = False
    while not done:
        s_t = torch.from_numpy(s).unsqueeze(0).float()
        logits = policy(s_t)  # shape [1, action_dim]
        dist = torch.distributions.Categorical(logits=logits)
        a = dist.sample()  # random sample
        lp = dist.log_prob(a)

        ns, r, done, _ = env.step(a.item())
        states.append(s)
        actions.append(a.item())
        rewards.append(r)
        logprobs.append(lp)
        s = ns
    return states, actions, rewards, logprobs

def update_policy(rewards, logprobs, gamma=0.99):
    # simple REINFORCE
    G = 0.0
    returns = []
    for r in reversed(rewards):
        G = r + gamma*G
        returns.insert(0, G)
    returns_t = torch.tensor(returns, dtype=torch.float32)
    # scale for stable training
    returns_t = (returns_t - returns_t.mean()) / (returns_t.std()+1e-8)

    loss = 0.0
    for lp, Gt in zip(logprobs, returns_t):
        loss += -lp * Gt
    return loss


Epoch 10, loss=6.811
Epoch 20, loss=5.465
Epoch 30, loss=4.360
Epoch 40, loss=3.480
Epoch 50, loss=2.829
Reward model training complete.



In [57]:
env = TinyEnv()

for iter_i in range(50):
    # gather one "rollout" from environment
    states, actions, rewards, logprobs = run_episode(env, policy)
    loss_pg = update_policy(rewards, logprobs)
    optimizer_pi.zero_grad()
    loss_pg.backward()
    optimizer_pi.step()
    if (iter_i+1) % 10 == 0:
        print(f"Iteration {iter_i+1}, total reward of rollout: {sum(rewards):.3f}")

print("Done with toy PPO training using the learned reward model.\n")

Iteration 10, total reward of rollout: -3.355
Iteration 20, total reward of rollout: -3.533
Iteration 30, total reward of rollout: 2.175
Iteration 40, total reward of rollout: 2.396
Iteration 50, total reward of rollout: 3.520
Done with toy PPO training using the learned reward model.

