
# Player vs Creator — Adversarial RL (PPO, PyTorch)

We study a two-agent adversarial problem in which a Player must reach a goal while a Creator generates obstacles that impede progress. Both agents are trained with proximal policy optimization (PPO) in a continuous 2D environment. The world includes static circles, axis-aligned rectangles, and moving circles; collisions are elastic so the Player never stalls. A curriculum on obstacle count, observation/return normalization, learning-rate annealing, and KL-based early stopping stabilize training. We report success rate and final-distance metrics and visualize optimization dynamics with comprehensive plots. An interactive demo renders episodes produced by the Creator and navigated by the Player.

In [8]:

!pip install -q gradio imageio
import os, math, time, random, json
from dataclasses import dataclass
import numpy as np
import imageio.v2 as imageio
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


Device: cpu


##environment
**State space.** 2D continuous world \([{-}1,1]^2\). Observations for the Player: \([x,y,\dot x,\dot y],\) goal location, vector-to-goal, and \(K\) obstacle descriptors; for the Creator: start and goal.  

**Action space.** Player: continuous acceleration \(\mathbf{a}\in[-1,1]^2\) (tanh-squashed). Creator: per-obstacle vector \([\text{type},x,y,s_1,s_2,v_x,v_y]\) for \(K\) slots; type∈{circle, rect, moving-circle}.  

**Physics.** Semi-implicit integration with damping and speed cap; walls and obstacles reflect velocity (elastic), ensuring non-stopping contact.  

**Rewards.** Player: shaped by progress toward goal plus terminal success bonus and small step cost. Creator: reward if the Player fails plus a distance-proportional term; penalties for illegal placements (out of bounds or near start/goal).


In [9]:

# @title Environment (circles, rectangles, moving circles)
@dataclass
class LevelConfig:
    world_min: float = -1.0
    world_max: float =  1.0
    max_obstacles: int = 6
    goal_r: float = 0.08
    start_r: float = 0.08
    min_clearance: float = 0.05
    max_steps: int = 220
    dt: float = 0.08
    vmax: float = 0.9
    accel_max: float = 0.8
    damping: float = 0.03
    collision_eps: float = 1e-4
    circle_r_min: float = 0.06
    circle_r_max: float = 0.25
    rect_w_min: float = 0.10
    rect_w_max: float = 0.45
    rect_h_min: float = 0.10
    rect_h_max: float = 0.45
    obs_vmax: float = 0.7

class PlayerCreatorEnvVec:
    TYPE_CIRCLE = 0
    TYPE_RECT   = 1
    TYPE_MCIRC  = 2

    def __init__(self, num_envs=32, level_cfg: LevelConfig = LevelConfig(), active_obstacles=None):
        self.N = num_envs
        self.cfg = level_cfg
        self.low, self.high = self.cfg.world_min, self.cfg.world_max
        self.max_obs = self.cfg.max_obstacles
        self.active_k = int(active_obstacles) if active_obstacles is not None else min(3, self.max_obs)

        self.dim_obs_player = 8 + 7*self.max_obs
        self.dim_obs_creator = 4

        self.reset_levels()
        self.reset_players()

    def sample_start_goal(self, rng):
        while True:
            s = rng.uniform(self.low+0.2, self.high-0.2, size=(2,))
            g = rng.uniform(self.low+0.2, self.high-0.2, size=(2,))
            if np.linalg.norm(s - g) > 0.8:
                return s, g

    def reset_levels(self, seeds=None):
        rng = np.random.default_rng()
        self.starts = np.zeros((self.N, 2), dtype=np.float32)
        self.goals  = np.zeros((self.N, 2), dtype=np.float32)
        for i in range(self.N):
            s, g = self.sample_start_goal(rng)
            self.starts[i] = s
            self.goals[i]  = g
        self.obs_params = np.zeros((self.N, self.max_obs, 7), dtype=np.float32)
        self.obs_type   = np.zeros((self.N, self.max_obs), dtype=np.int32)
        self.n_obs      = np.full((self.N,), self.active_k, dtype=np.int32)

    def reset_players(self):
        self.pos  = self.starts.copy()
        self.vel  = np.zeros_like(self.pos, dtype=np.float32)
        self.t    = np.zeros(self.N, dtype=np.int32)
        self.done = np.zeros(self.N, dtype=np.bool_)

    def _decode_set_obstacle(self, i, j, vec):
        v = np.clip(vec, -1.0, 1.0)
        tc = v[0]
        if tc < -1/3:
            typ = self.TYPE_CIRCLE
        elif tc < 1/3:
            typ = self.TYPE_RECT
        else:
            typ = self.TYPE_MCIRC
        x = self.low + (v[1] + 1.0) * 0.5 * (self.high - self.low)
        y = self.low + (v[2] + 1.0) * 0.5 * (self.high - self.low)
        if typ == self.TYPE_CIRCLE:
            r = self.cfg.circle_r_min + (v[3] + 1.0) * 0.5 * (self.cfg.circle_r_max - self.cfg.circle_r_min)
            s1, s2, vx, vy = r, 0.0, 0.0, 0.0
        elif typ == self.TYPE_RECT:
            w = self.cfg.rect_w_min + (v[3] + 1.0) * 0.5 * (self.cfg.rect_w_max - self.cfg.rect_w_min)
            h = self.cfg.rect_h_min + (v[4] + 1.0) * 0.5 * (self.cfg.rect_h_max - self.cfg.rect_h_min)
            s1, s2, vx, vy = w, h, 0.0, 0.0
        else:
            r = self.cfg.circle_r_min + (v[3] + 1.0) * 0.5 * (self.cfg.circle_r_max - self.cfg.circle_r_min)
            vx = v[5] * self.cfg.obs_vmax
            vy = v[6] * self.cfg.obs_vmax
            s1, s2 = r, 0.0
        self.obs_type[i, j]      = typ
        self.obs_params[i, j, :] = np.array([tc, x, y, s1, s2, vx, vy], dtype=np.float32)

    def set_obstacles_from_creator_action(self, a_creator):
        if isinstance(a_creator, torch.Tensor):
            a_creator = a_creator.detach().cpu().numpy()
        a_creator = np.clip(a_creator, -1.0, 1.0).reshape(self.N, self.max_obs, 7)
        for i in range(self.N):
            for j in range(self.max_obs):
                self._decode_set_obstacle(i, j, a_creator[i, j])

    def _reflect_velocity(self, v, n):
        dot = v[0]*n[0] + v[1]*n[1]
        return np.array([v[0] - 2*dot*n[0], v[1] - 2*dot*n[1]], dtype=np.float32)

    def _move_moving_obstacles(self):
        for i in range(self.N):
            for j in range(min(self.active_k, self.max_obs)):
                if self.obs_type[i, j] == self.TYPE_MCIRC:
                    _, x, y, r, _, vx, vy = self.obs_params[i, j]
                    x_new = x + vx * self.cfg.dt
                    y_new = y + vy * self.cfg.dt
                    if x_new - r < self.low or x_new + r > self.high:
                        vx = -vx
                        x_new = np.clip(x_new, self.low + r + self.cfg.collision_eps, self.high - r - self.cfg.collision_eps)
                    if y_new - r < self.low or y_new + r > self.high:
                        vy = -vy
                        y_new = np.clip(y_new, self.low + r + self.cfg.collision_eps, self.high - r - self.cfg.collision_eps)
                    self.obs_params[i, j, 1] = x_new
                    self.obs_params[i, j, 2] = y_new
                    self.obs_params[i, j, 5] = vx
                    self.obs_params[i, j, 6] = vy

    def _collide_circle(self, p, v, cx, cy, r):
        dx, dy = p[0]-cx, p[1]-cy
        dist = math.sqrt(dx*dx + dy*dy) + 1e-8
        if dist < r:
            n = np.array([dx/dist, dy/dist], dtype=np.float32)
            p = np.array([cx + n[0]*(r + self.cfg.collision_eps),
                          cy + n[1]*(r + self.cfg.collision_eps)], dtype=np.float32)
            v = self._reflect_velocity(v, n)
        return p, v

    def _collide_rect(self, p, v, cx, cy, w, h):
        left, right = cx - w/2, cx + w/2
        bottom, top = cy - h/2, cy + h/2
        inside = (left < p[0] < right) and (bottom < p[1] < top)
        if inside:
            pen_left   = abs(p[0] - left)
            pen_right  = abs(right - p[0])
            pen_bottom = abs(p[1] - bottom)
            pen_top    = abs(top - p[1])
            mins = [pen_left, pen_right, pen_bottom, pen_top]
            idx = int(np.argmin(mins))
            if idx == 0:
                n = np.array([-1.0, 0.0], dtype=np.float32)
                p = np.array([left - self.cfg.collision_eps, p[1]], dtype=np.float32)
            elif idx == 1:
                n = np.array([ 1.0, 0.0], dtype=np.float32)
                p = np.array([right + self.cfg.collision_eps, p[1]], dtype=np.float32)
            elif idx == 2:
                n = np.array([0.0, -1.0], dtype=np.float32)
                p = np.array([p[0], bottom - self.cfg.collision_eps], dtype=np.float32)
            else:
                n = np.array([0.0,  1.0], dtype=np.float32)
                p = np.array([p[0], top + self.cfg.collision_eps], dtype=np.float32)
            v = self._reflect_velocity(v, n)
        return p, v

    def _step_single(self, i, a):
        if self.done[i]: return
        a = np.clip(a, -1.0, 1.0) * self.cfg.accel_max
        self.vel[i] = (1.0 - self.cfg.damping)*self.vel[i] + a * self.cfg.dt
        sp = np.linalg.norm(self.vel[i]) + 1e-8
        if sp > self.cfg.vmax:
            self.vel[i] *= (self.cfg.vmax / sp)
        new_pos = self.pos[i] + self.vel[i] * self.cfg.dt

        for k in (0,1):
            if new_pos[k] < self.low:
                new_pos[k] = self.low + self.cfg.collision_eps
                self.vel[i][k] = abs(self.vel[i][k])
            elif new_pos[k] > self.high:
                new_pos[k] = self.high - self.cfg.collision_eps
                self.vel[i][k] = -abs(self.vel[i][k])

        for j in range(min(self.active_k, self.max_obs)):
            typ = self.obs_type[i, j]
            _, x, y, s1, s2, vx, vy = self.obs_params[i, j]
            if typ == self.TYPE_CIRCLE or typ == self.TYPE_MCIRC:
                new_pos, self.vel[i] = self._collide_circle(new_pos, self.vel[i], x, y, s1)
            else:
                new_pos, self.vel[i] = self._collide_rect(new_pos, self.vel[i], x, y, s1, s2)

        self.pos[i] = new_pos
        self.t[i]  += 1
        d_goal = np.linalg.norm(self.pos[i] - self.goals[i])
        if d_goal <= self.cfg.goal_r or self.t[i] >= self.cfg.max_steps:
            self.done[i] = True

    def step(self, actions_player):
        self._move_moving_obstacles()
        if isinstance(actions_player, torch.Tensor):
            actions_player = actions_player.detach().cpu().numpy()
        for i in range(self.N):
            self._step_single(i, actions_player[i])

    def get_player_obs(self):
        p = self.obs_params.copy()
        p = p[:, :, [1,2,3,4,5,6,0]]
        flat = p.reshape(self.N, -1)
        vec_to_goal = self.goals - self.pos
        base = np.concatenate([self.pos, self.vel, self.goals, vec_to_goal], axis=1)
        obs = np.concatenate([base, flat], axis=1)
        return torch.as_tensor(obs, dtype=torch.float32, device=device)

    def get_creator_obs(self):
        obs = np.concatenate([self.starts, self.goals], axis=1)
        return torch.as_tensor(obs, dtype=torch.float32, device=device)

    def compute_rewards(self, prev_dists):
        new_dists = np.linalg.norm(self.pos - self.goals, axis=1)
        progress = prev_dists - new_dists
        r = 1.5*progress - 0.005
        success = (new_dists <= self.cfg.goal_r)
        r = r + (success.astype(np.float32)*1.0)
        return r.astype(np.float32), new_dists

    def episode_creator_rewards(self):
        final_d = np.linalg.norm(self.pos - self.goals, axis=1)
        success = (final_d <= self.cfg.goal_r)
        base = (1.0 - success.astype(np.float32))
        dist_term = (final_d / (self.high - self.low)) * 0.5
        penalty = np.zeros(self.N, dtype=np.float32)
        for i in range(self.N):
            for j in range(min(self.active_k, self.max_obs)):
                typ = self.obs_type[i, j]
                tc, x, y, s1, s2, vx, vy = self.obs_params[i, j]
                if typ == self.TYPE_CIRCLE or typ == self.TYPE_MCIRC:
                    if not (self.low <= x-s1 and x+s1 <= self.high and self.low <= y-s1 and y+s1 <= self.high):
                        penalty[i] += 0.1
                else:
                    if not (self.low <= x-s1/2 and x+s1/2 <= self.high and self.low <= y-s2/2 and y+s2/2 <= self.high):
                        penalty[i] += 0.1
                if typ == self.TYPE_RECT:
                    if np.linalg.norm(np.array([x,y]) - self.starts[i]) < (self.cfg.start_r + 0.5*min(s1,s2) + self.cfg.min_clearance):
                        penalty[i] += 0.1
                    if np.linalg.norm(np.array([x,y]) - self.goals[i]) < (self.cfg.goal_r + 0.5*min(s1,s2) + self.cfg.min_clearance):
                        penalty[i] += 0.1
                else:
                    if np.linalg.norm(np.array([x,y]) - self.starts[i]) < (self.cfg.start_r + s1 + self.cfg.min_clearance):
                        penalty[i] += 0.1
                    if np.linalg.norm(np.array([x,y]) - self.goals[i]) < (self.cfg.goal_r + s1 + self.cfg.min_clearance):
                        penalty[i] += 0.1
                if typ == self.TYPE_MCIRC and (abs(vx) + abs(vy)) > 1.0*self.cfg.obs_vmax:
                    penalty[i] += 0.1
        rew = base + dist_term - penalty
        return rew.astype(np.float32)

    def render_frame(self, i, figsize=4, path=None):
        fig, ax = plt.subplots(figsize=(figsize, figsize))
        ax.set_xlim(self.low, self.high); ax.set_ylim(self.low, self.high)
        ax.set_aspect('equal'); ax.set_xticks([]); ax.set_yticks([])
        ax.plot([self.low, self.low, self.high, self.high, self.low],
                [self.low, self.high, self.high, self.low, self.low])
        for j in range(min(self.active_k, self.max_obs)):
            typ = self.obs_type[i, j]
            _, x, y, s1, s2, vx, vy = self.obs_params[i, j]
            if typ == self.TYPE_CIRCLE:
                circ = plt.Circle((x,y), s1, fill=False, linewidth=2)
                ax.add_patch(circ)
            elif typ == self.TYPE_RECT:
                rect = plt.Rectangle((x - s1/2, y - s2/2), s1, s2, fill=False, linewidth=2)
                ax.add_patch(rect)
            else:
                circ = plt.Circle((x,y), s1, fill=False, linewidth=2)
                ax.add_patch(circ)
                ax.arrow(x, y, vx*0.1, vy*0.1, head_width=0.03, length_includes_head=True)
        ax.add_patch(plt.Circle(tuple(self.starts[i]), self.cfg.start_r, color='green', alpha=0.4))
        ax.add_patch(plt.Circle(tuple(self.goals[i]),  self.cfg.goal_r,  color='red',   alpha=0.4))
        ax.add_patch(plt.Circle(tuple(self.pos[i]), 0.02, color='black'))
        if path is not None:
            fig.savefig(path, bbox_inches='tight', pad_inches=0.05)
            plt.close(fig)
        else:
            plt.show()


## PPO & Utilities (Running Normalization, Adaptive KL, LR Anneal)

Both agents use PPO with shared design choices: orthogonal initialization, layer-normalized MLP backbones, entropy regularization, gradient clipping, AdamW, generalized advantage estimation (GAE-λ), observation and return normalization, linear learning-rate annealing, and adaptive KL early-stop per epoch. We alternate updates: Creator samples a level once per episode batch; the Player collects rollouts on that batch.


In [10]:

def orthogonal_init(layer, gain=math.sqrt(2)):
    if isinstance(layer, nn.Linear):
        nn.init.orthogonal_(layer.weight, gain=gain)
        if layer.bias is not None:
            nn.init.zeros_(layer.bias)

class MLP(nn.Module):
    def __init__(self, in_dim, hid=128, out_dim=64, depth=2, act=nn.Tanh, layer_norm=True):
        super().__init__()
        layers = []
        d = in_dim
        for k in range(depth):
            lin = nn.Linear(d, hid)
            orthogonal_init(lin)
            block = [lin]
            if layer_norm:
                block.append(nn.LayerNorm(hid))
            block.append(act())
            layers += block
            d = hid
        self.backbone = nn.Sequential(*layers)
        self.head = nn.Linear(d, out_dim)
        orthogonal_init(self.head, gain=1.0)
    def forward(self, x):
        return self.head(self.backbone(x))

class ActorCriticGaussian(nn.Module):
    def __init__(self, obs_dim, act_dim, hidden=128):
        super().__init__()
        self.body = MLP(obs_dim, hid=hidden, out_dim=hidden, depth=2, layer_norm=True)
        self.mu = nn.Linear(hidden, act_dim)
        self.v  = nn.Linear(hidden, 1)
        self.log_std = nn.Parameter(torch.zeros(act_dim))
        orthogonal_init(self.mu, gain=0.01)
        orthogonal_init(self.v,  gain=1.0)
    def forward(self, obs):
        h = self.body(obs)
        mu = self.mu(h)
        v  = self.v(h).squeeze(-1)
        std = torch.exp(self.log_std)
        return mu, std, v
    def act(self, obs):
        mu, std, v = self(obs)
        dist = Normal(mu, std)
        a = dist.rsample()
        logp = dist.log_prob(a).sum(-1)
        return a, logp, v
    def evaluate(self, obs, actions):
        mu, std, v = self(obs)
        dist = Normal(mu, std)
        logp = dist.log_prob(actions).sum(-1)
        entropy = dist.entropy().sum(-1)
        return logp, entropy, v

@dataclass
class PPOConfig:
    gamma: float = 0.99
    lam: float = 0.95
    clip_ratio: float = 0.2
    lr: float = 3e-4
    weight_decay: float = 1e-4
    ent_coef: float = 0.01
    vf_coef: float = 0.5
    max_grad_norm: float = 0.5
    update_epochs: int = 4
    minibatch_size: int = 1024
    target_kl: float = 0.02

class RunningMeanStd:
    def __init__(self, shape):
        if shape == ():
            self.mean = np.array(0.0, dtype=np.float32)
            self.var  = np.array(1.0, dtype=np.float32)
        else:
            self.mean = np.zeros(shape, dtype=np.float32)
            self.var  = np.ones(shape, dtype=np.float32)
        self.count = 1e-4
    def update(self, x):
        x = np.asarray(x, dtype=np.float32)
        if x.ndim == 0:
            x = x.reshape(1,1)
        elif x.ndim == 1:
            x = x.reshape(-1, x.shape[-1])
        batch_mean = x.mean(axis=0)
        batch_var  = x.var(axis=0)
        batch_count = x.shape[0]
        delta = batch_mean - self.mean
        tot_count = self.count + batch_count
        new_mean = self.mean + delta * batch_count / tot_count
        m_a = self.var * self.count
        m_b = batch_var * batch_count
        M2 = m_a + m_b + delta**2 * self.count * batch_count / tot_count
        new_var = M2 / tot_count
        self.mean, self.var, self.count = new_mean, new_var, tot_count
    def normalize(self, x):
        return (x - self.mean) / (np.sqrt(self.var) + 1e-8)

def compute_gae(rewards, values, dones, gamma, lam):
    T, N = rewards.shape
    adv = np.zeros((T, N), dtype=np.float32)
    lastgaelam = np.zeros((N,), dtype=np.float32)
    next_value = values[-1]
    next_nonterminal = 1.0 - dones[-1]
    for t in reversed(range(T)):
        delta = rewards[t] + gamma * next_value * next_nonterminal - values[t]
        lastgaelam = delta + gamma * lam * next_nonterminal * lastgaelam
        adv[t] = lastgaelam
        next_value = values[t]
        next_nonterminal = 1.0 - dones[t]
    ret = adv + values[:-1]
    return adv, ret


## Training: Smarter PPO + Simple Curriculum

In [11]:

class Trainer:
    def __init__(self,
                 num_envs=32,
                 steps_per_env=128,
                 total_updates=400,
                 player_hidden=128,
                 creator_hidden=128,
                 save_dir=None,
                 init_active_obstacles=3):
        self.level_cfg = LevelConfig()
        self.env = PlayerCreatorEnvVec(num_envs=num_envs, level_cfg=self.level_cfg, active_obstacles=init_active_obstacles)
        self.num_envs = num_envs
        self.steps_per_env = steps_per_env
        self.total_updates = total_updates

        self.obs_p_dim = self.env.dim_obs_player
        self.obs_c_dim = self.env.dim_obs_creator
        self.player = ActorCriticGaussian(self.obs_p_dim, act_dim=2, hidden=player_hidden).to(device)
        self.creator = ActorCriticGaussian(self.obs_c_dim, act_dim=self.level_cfg.max_obstacles*7, hidden=creator_hidden).to(device)

        self.cfg_p = PPOConfig(ent_coef=0.01, target_kl=0.02)
        self.cfg_c = PPOConfig(lr=3e-4, ent_coef=0.02, target_kl=0.02)

        self.opt_p = torch.optim.AdamW(self.player.parameters(), lr=self.cfg_p.lr, weight_decay=self.cfg_p.weight_decay)
        self.opt_c = torch.optim.AdamW(self.creator.parameters(), lr=self.cfg_c.lr, weight_decay=self.cfg_c.weight_decay)

        self.save_dir = save_dir or ("/content" if os.path.exists("/content") else ".")
        os.makedirs(self.save_dir, exist_ok=True)

        self.best_eval = -1e9

        self.obs_rms = RunningMeanStd(self.obs_p_dim)
        self.ret_rms = RunningMeanStd(())

        self.base_lr_p = self.cfg_p.lr
        self.base_lr_c = self.cfg_c.lr

    def _anneal_lrs(self, upd):
        frac = 1.0 - (upd - 1) / max(1, self.total_updates)
        for g in self.opt_p.param_groups:
            g['lr'] = self.base_lr_p * frac
        for g in self.opt_c.param_groups:
            g['lr'] = self.base_lr_c * frac

    def rollout(self):
        obs_c = self.env.get_creator_obs()
        with torch.no_grad():
            a_c, logp_c, v_c = self.creator.act(obs_c)
        self.env.set_obstacles_from_creator_action(torch.tanh(a_c))

        T = self.steps_per_env
        obs_buf = torch.zeros((T, self.num_envs, self.obs_p_dim), dtype=torch.float32, device=device)
        act_buf = torch.zeros((T, self.num_envs, 2), dtype=torch.float32, device=device)
        logp_buf= torch.zeros((T, self.num_envs), dtype=torch.float32, device=device)
        val_buf = torch.zeros((T+1, self.num_envs), dtype=torch.float32, device=device)
        rew_buf = torch.zeros((T, self.num_envs), dtype=torch.float32, device=device)
        done_buf= torch.zeros((T+1, self.num_envs), dtype=torch.float32, device=device)

        self.env.reset_players()
        prev_dists = np.linalg.norm(self.env.pos - self.env.goals, axis=1)

        for t in range(T):
            obs_p = self.env.get_player_obs()
            self.obs_rms.update(obs_p.detach().cpu().numpy().reshape(self.num_envs, -1))
            obs_norm = torch.as_tensor(self.obs_rms.normalize(obs_p.detach().cpu().numpy().reshape(self.num_envs, -1)), device=device, dtype=torch.float32)
            with torch.no_grad():
                a_p, logp_p, v_p = self.player.act(obs_norm)
            a_clip = torch.tanh(a_p)
            self.env.step(a_clip)
            rewards, new_dists = self.env.compute_rewards(prev_dists)
            prev_dists = new_dists
            obs_buf[t] = obs_p
            act_buf[t] = a_p
            logp_buf[t] = logp_p
            val_buf[t] = v_p
            rew_buf[t] = torch.as_tensor(rewards, device=device)
            done_buf[t] = torch.as_tensor(self.env.done.astype(np.float32), device=device)
            if np.all(self.env.done):
                for tp in range(t+1, T):
                    obs_buf[tp] = obs_buf[t]
                    act_buf[tp] = act_buf[t]
                    logp_buf[tp]= logp_buf[t]
                    rew_buf[tp] = 0.0
                    done_buf[tp]= 1.0
                break

        obs_p = self.env.get_player_obs()
        obs_norm = torch.as_tensor(self.obs_rms.normalize(obs_p.detach().cpu().numpy().reshape(self.num_envs, -1)), device=device, dtype=torch.float32)
        with torch.no_grad():
            _, _, v_last = self.player(obs_norm)
        val_buf[T] = v_last
        done_buf[T] = torch.as_tensor(self.env.done.astype(np.float32), device=device)

        c_rewards = self.env.episode_creator_rewards()
        c_batch = dict(obs=obs_c.detach(),
                       act=a_c.detach(),
                       logp=logp_c.detach(),
                       val=v_c.detach(),
                       rew=torch.as_tensor(c_rewards, device=device))
        return {"player": dict(obs=obs_buf, act=act_buf, logp=logp_buf, val=val_buf, rew=rew_buf, done=done_buf),
                "creator": c_batch}

    def _ppo_update(self, ac: ActorCriticGaussian, opt, batch, cfg: PPOConfig, normalize_obs=False):
        obs = batch["obs"].reshape(-1, batch["obs"].shape[-1])
        if normalize_obs:
            self.obs_rms.update(obs.detach().cpu().numpy())
            obs = torch.as_tensor(self.obs_rms.normalize(obs.detach().cpu().numpy()), device=device, dtype=torch.float32)
        act = batch["act"].reshape(-1, batch["act"].shape[-1])
        old_logp = batch["logp"].reshape(-1)
        adv = batch["adv"].reshape(-1)
        ret = batch["ret"].reshape(-1)
        adv = (adv - adv.mean()) / (adv.std() + 1e-8)
        inds = np.arange(obs.shape[0])
        for _ in range(cfg.update_epochs):
            np.random.shuffle(inds)
            approx_kl_accum, num_mb = 0.0, 0
            for start in range(0, len(inds), cfg.minibatch_size):
                mb = inds[start:start+cfg.minibatch_size]
                logp_new, entropy, v_pred = ac.evaluate(obs[mb], act[mb])
                ratio = torch.exp(logp_new - old_logp[mb])
                surr1 = ratio * adv[mb]
                surr2 = torch.clamp(ratio, 1.0 - cfg.clip_ratio, 1.0 + cfg.clip_ratio) * adv[mb]
                policy_loss = -torch.min(surr1, surr2).mean()
                v_loss = F.mse_loss(v_pred, ret[mb])
                loss = policy_loss + cfg.vf_coef*v_loss - cfg.ent_coef*entropy.mean()
                opt.zero_grad(set_to_none=True)
                loss.backward()
                nn.utils.clip_grad_norm_(ac.parameters(), cfg.max_grad_norm)
                opt.step()
                approx_kl = (old_logp[mb] - logp_new).mean().item()
                approx_kl_accum += approx_kl; num_mb += 1
            if num_mb > 0 and (approx_kl_accum/num_mb) > cfg.target_kl:
                break

    def _ppo_update_creator(self, ac: ActorCriticGaussian, opt, batch, cfg: PPOConfig):
        obs = batch["obs"]
        act = batch["act"]
        old_logp = batch["logp"]
        rew = batch["rew"]
        val = batch["val"]
        adv = (rew - val).detach()
        ret = rew.detach()
        adv = (adv - adv.mean()) / (adv.std() + 1e-8)
        inds = np.arange(obs.shape[0])
        for _ in range(cfg.update_epochs):
            np.random.shuffle(inds)
            approx_kl_accum, num_mb = 0.0, 0
            for start in range(0, len(inds), cfg.minibatch_size):
                mb = inds[start:start+cfg.minibatch_size]
                logp_new, entropy, v_pred = ac.evaluate(obs[mb], act[mb])
                ratio = torch.exp(logp_new - old_logp[mb])
                surr1 = ratio * adv[mb]
                surr2 = torch.clamp(ratio, 1.0 - cfg.clip_ratio, 1.0 + cfg.clip_ratio) * adv[mb]
                policy_loss = -torch.min(surr1, surr2).mean()
                v_loss = F.mse_loss(v_pred, ret[mb])
                loss = policy_loss + cfg.vf_coef*v_loss - cfg.ent_coef*entropy.mean()
                opt.zero_grad(set_to_none=True)
                loss.backward()
                nn.utils.clip_grad_norm_(ac.parameters(), cfg.max_grad_norm)
                opt.step()
                approx_kl = (old_logp[mb] - logp_new).mean().item()
                approx_kl_accum += approx_kl; num_mb += 1
            if num_mb > 0 and (approx_kl_accum/num_mb) > cfg.target_kl:
                break

    def evaluate(self, episodes=16):
        env = PlayerCreatorEnvVec(num_envs=episodes, level_cfg=self.level_cfg, active_obstacles=self.env.active_k)
        with torch.no_grad():
            c_obs = env.get_creator_obs()
            a_c, _, _ = self.creator.act(c_obs)
            env.set_obstacles_from_creator_action(torch.tanh(a_c))
        env.reset_players()
        prev_d = np.linalg.norm(env.pos - env.goals, axis=1)
        for t in range(self.level_cfg.max_steps):
            obs = env.get_player_obs()
            obs_n = torch.as_tensor(self.obs_rms.normalize(obs.detach().cpu().numpy().reshape(episodes, -1)), device=device, dtype=torch.float32)
            with torch.no_grad():
                a, _, _ = self.player.act(obs_n)
            env.step(torch.tanh(a))
            r, prev_d = env.compute_rewards(prev_d)
        final_d = np.linalg.norm(env.pos - env.goals, axis=1)
        successes = (final_d <= self.level_cfg.goal_r).astype(np.int32)
        return successes.mean(), final_d.mean()

    def train(self, log_every=10, eval_every=20, save_name="player_creator_ppo_v2"):
        for upd in range(1, self.total_updates+1):
            self._anneal_lrs(upd)
            rollout = self.rollout()
            P = rollout["player"]
            with torch.no_grad():
                adv, ret = compute_gae(
                    rewards = P["rew"].cpu().numpy(),
                    values  = P["val"].cpu().numpy(),
                    dones   = P["done"].cpu().numpy(),
                    gamma=0.99, lam=0.95
                )
            self.ret_rms.update(ret.reshape(-1,1))
            ret_norm = (ret - ret.mean()) / (np.sqrt(self.ret_rms.var) + 1e-8)
            player_batch = dict(obs=P["obs"], act=P["act"], logp=P["logp"],
                                adv=torch.as_tensor(adv, device=device),
                                ret=torch.as_tensor(ret_norm, device=device))
            self._ppo_update(self.player, self.opt_p, player_batch, PPOConfig(), normalize_obs=True)

            C = rollout["creator"]
            creator_batch = dict(obs=C["obs"], act=C["act"], logp=C["logp"], rew=C["rew"], val=C["val"])
            self._ppo_update_creator(self.creator, self.opt_c, creator_batch, PPOConfig())

            if upd % log_every == 0:
                with torch.no_grad():
                    p_ent = Normal(torch.zeros_like(self.player.log_std), torch.exp(self.player.log_std)).entropy().sum().item()
                    c_ent = Normal(torch.zeros_like(self.creator.log_std), torch.exp(self.creator.log_std)).entropy().sum().item()
                print(f"[Update {upd}] PlayerEnt {p_ent:.2f} | CreatorEnt {c_ent:.2f} | active_obs={self.env.active_k}")

            if upd % eval_every == 0:
                succ, fdist = self.evaluate(episodes=16)
                score = succ*1.0 - fdist
                print(f"  Eval -> success_rate={succ:.2f}, final_dist={fdist:.3f}")
                if succ > 0.70 and self.env.active_k < self.level_cfg.max_obstacles:
                    self.env.active_k += 1; self.env.n_obs[:] = self.env.active_k
                    print(f"  📈 Curriculum: increased active obstacles to {self.env.active_k}")
                elif succ < 0.30 and self.env.active_k > 2:
                    self.env.active_k -= 1; self.env.n_obs[:] = self.env.active_k
                    print(f"  📉 Curriculum: decreased active obstacles to {self.env.active_k}")
                if score > self.best_eval:
                    self.best_eval = score
                    torch.save(self.player.state_dict(), os.path.join(self.save_dir, f"{save_name}_player.pt"))
                    torch.save(self.creator.state_dict(), os.path.join(self.save_dir, f"{save_name}_creator.pt"))
                    with open(os.path.join(self.save_dir, f"{save_name}_meta.json"), "w") as f:
                        json.dump({"best_score": float(self.best_eval), "update": upd, "active_k": self.env.active_k}, f)
                    print("  ✅ Saved new best checkpoints.")


## (Optional) Quick Tiny Run

In [12]:

if __name__ == "__main__":
    trainer = Trainer(num_envs=16, steps_per_env=96, total_updates=20, init_active_obstacles=3)
    trainer.train(log_every=5, eval_every=10, save_name="player_creator_ppo_v2")
    print("Tiny run complete. Best score so far:", trainer.best_eval)


[Update 5] PlayerEnt 2.84 | CreatorEnt 59.58 | active_obs=3
[Update 10] PlayerEnt 2.84 | CreatorEnt 59.57 | active_obs=3
  Eval -> success_rate=0.06, final_dist=0.796
  📉 Curriculum: decreased active obstacles to 2
  ✅ Saved new best checkpoints.
[Update 15] PlayerEnt 2.84 | CreatorEnt 59.58 | active_obs=2
[Update 20] PlayerEnt 2.84 | CreatorEnt 59.58 | active_obs=2
  Eval -> success_rate=0.19, final_dist=0.541
  ✅ Saved new best checkpoints.
Tiny run complete. Best score so far: -0.35351258516311646


## Sim Helper + GIF Export

In [13]:

def simulate_and_gif(player, creator, level_cfg=None, seed=0, steps=220, figsize=4, tmpdir=".", fname="episode_v2.gif", active_k=4):
    level_cfg = level_cfg or LevelConfig()
    env = PlayerCreatorEnvVec(num_envs=1, level_cfg=level_cfg, active_obstacles=active_k)
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    with torch.no_grad():
        c_obs = env.get_creator_obs()
        a_c, _, _ = creator.act(c_obs)
        env.set_obstacles_from_creator_action(torch.tanh(a_c))
    env.reset_players()
    prev_d = np.linalg.norm(env.pos - env.goals, axis=1)
    frames = []
    for t in range(min(steps, level_cfg.max_steps)):
        path = os.path.join(tmpdir, f"frame_{t:04d}.png")
        env.render_frame(0, figsize=figsize, path=path)
        frames.append(imageio.imread(path))
        obs = env.get_player_obs()
        obs_np = obs.detach().cpu().numpy()
        m, s = obs_np.mean(0), obs_np.std(0) + 1e-8
        obs_n = torch.as_tensor((obs_np - m)/s, device=device, dtype=torch.float32)
        with torch.no_grad():
            a, _, _ = player.act(obs_n)
        env.step(torch.tanh(a))
        r, prev_d = env.compute_rewards(prev_d)
        if env.done[0]:
            path = os.path.join(tmpdir, f"frame_{t+1:04d}.png")
            env.render_frame(0, figsize=figsize, path=path)
            frames.append(imageio.imread(path))
            break
    gif_path = os.path.join(tmpdir, fname)
    imageio.mimsave(gif_path, frames, duration=0.06)
    return gif_path


## Gradio Demo

In [15]:

import os, json
import gradio as gr

BASE_DIR = "/content" if os.path.exists("/content") else "."
PLAYER_CKPT = os.path.join(BASE_DIR, "player_creator_ppo_v2_player.pt")
CREATOR_CKPT = os.path.join(BASE_DIR, "player_creator_ppo_v2_creator.pt")

tmp_env = PlayerCreatorEnvVec(num_envs=1, level_cfg=LevelConfig(), active_obstacles=4)
player_demo = ActorCriticGaussian(tmp_env.dim_obs_player, 2, hidden=128).to(device)
creator_demo = ActorCriticGaussian(tmp_env.dim_obs_creator, tmp_env.max_obs*7, hidden=128).to(device)

def load_if_exists():
    if os.path.exists(PLAYER_CKPT) and os.path.exists(CREATOR_CKPT):
        player_demo.load_state_dict(torch.load(PLAYER_CKPT, map_location=device))
        creator_demo.load_state_dict(torch.load(CREATOR_CKPT, map_location=device))
        return "Loaded saved checkpoints."
    else:
        return "No checkpoints found yet. Run some training above first."

status_msg = load_if_exists()

def run_demo(seed: int, steps: int, figsize: float, active_obs: int):
    cfg = LevelConfig(max_obstacles=6, max_steps=int(steps))
    env_probe = PlayerCreatorEnvVec(1, cfg, active_obstacles=int(active_obs))
    local_player = ActorCriticGaussian(env_probe.dim_obs_player, 2, hidden=128).to(device)
    local_creator= ActorCriticGaussian(env_probe.dim_obs_creator, env_probe.max_obs*7, hidden=128).to(device)
    try:
        if os.path.exists(PLAYER_CKPT):
            local_player.load_state_dict(torch.load(PLAYER_CKPT, map_location=device), strict=False)
        if os.path.exists(CREATOR_CKPT):
            local_creator.load_state_dict(torch.load(CREATOR_CKPT, map_location=device), strict=False)
    except Exception as e:
        print("Warn: loading checkpoints:", e)
    out_dir = "/content" if os.path.exists("/content") else "."
    gif_path = simulate_and_gif(local_player, local_creator, level_cfg=cfg, seed=seed, steps=steps, figsize=figsize, tmpdir=out_dir, fname="episode_v2.gif", active_k=int(active_obs))
    return gif_path, f"Seed {seed} • Steps {steps} • Active Obstacles {active_obs}"

with gr.Blocks() as demo:
    gr.Markdown("### Player vs Creator — v2 Demo (circles, rectangles, moving circles)")
    gr.Markdown(status_msg)
    with gr.Row():
        seed = gr.Slider(0, 9999, step=1, value=0, label="Random Seed")
        steps = gr.Slider(100, 400, step=10, value=220, label="Max Steps")
    with gr.Row():
        active_obs = gr.Slider(1, 6, step=1, value=4, label="Active Obstacles (Curriculum)")
        figsize = gr.Slider(3.0, 6.0, step=0.5, value=4.0, label="Figure Size (inches)")
    run_btn = gr.Button("Generate Episode")
    out_img = gr.Image(type="filepath", label="Episode GIF")
    out_txt = gr.Textbox(label="Episode Info", interactive=False)
    run_btn.click(run_demo, inputs=[seed, steps, figsize, active_obs], outputs=[out_img, out_txt])
demo.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://745b84821204536ac0.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


