Imports

In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
from OPPO import OPPO_update

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def OPPO_update(policy,
                optimizer,
                env,
                baseline=None,
                n_episodes=1000,
                max_t=1000,
                gamma=1.0,
                print_every=100,
                early_stop=False,
                seed=42,
                target_score=None,
                env_name="CartPole-v0",
                display_every=False):

    set_seed(seed, env)
    checkpoint_reached = False
    scores_deque = deque(maxlen=print_every)
    scores = []

    for e in range(1, n_episodes + 1):
        saved_log_probs = []
        rewards = []
        states = []

        state = env.reset()
        for step_in_episode in range(max_t):
            states.append(state)
            action, log_prob, _ = policy.act(state)
            saved_log_probs.append(log_prob)

            state, reward, done, _ = env.step(action)
            rewards.append(reward)
            if done:
                break

        total_R = sum(rewards)
        scores.append(total_R)
        scores_deque.append(total_R)

        discounts = [gamma ** i for i in range(len(rewards))]
        rewards_to_go = [
            sum(discounts[k] * rewards[k + t] for k in range(len(rewards) - t))
            for t in range(len(rewards))
        ]

        # ======> Fit baseline here
        if baseline is not None and hasattr(baseline, 'fit'):
            baseline.fit(states, rewards_to_go)

        # Compute policy loss
        policy_loss_terms = []
        for log_prob, G, s in zip(saved_log_probs, rewards_to_go, states):
            b = baseline(s) if baseline is not None else 0
            advantage = G - b
            policy_loss_terms.append(-log_prob * advantage)

        policy_loss = torch.stack(policy_loss_terms).sum()

        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        # Logging
        if e % print_every == 0:
            avg_score = float(np.mean(scores_deque))
            print(f"Episode {e} \t Average Score over last {print_every}: {avg_score:.1f}")

        if not checkpoint_reached and target_score is not None and np.mean(scores_deque) >= target_score / 2:
            print(f"Saving pi2 checkpoint at episode {e}...")
            torch.save(policy.state_dict(), f"./Policies/pi2_ref_{env_name}_seed_{seed}.pth")
            checkpoint_reached = True

        if target_score is not None and np.mean(scores_deque) >= target_score:
            print(f"Target score reached at episode {e}!")
            torch.save(policy.state_dict(), f"./Policies/pi1_ref_{env_name}_seed_{seed}.pth")
            break

    return scores


def set_seed(seed, env):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    env.seed(seed)
    env.action_space.seed(seed)




class PolicyNetwork(nn.Module):
    def __init__(self, state_size=2, action_size=3, hidden_size=128):
        super(PolicyNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size),
            nn.Softmax(dim=-1)
        )

    def forward(self, state):
        state = torch.from_numpy(state).float().to(device)
        probs = self.net(state)
        return probs

    def act(self, state):
        probs = self.forward(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), probs.detach().cpu().numpy()



class LinearBaseline(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.linear = nn.Linear(state_dim, 1, bias=True)

    def forward(self, state):
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32, device=device)
        return self.linear(state).squeeze()

    def fit(self, states, returns):
        """
        Fit linear model: return = w·state + b using least-squares.
        """
        states = torch.tensor(states, dtype=torch.float32, device=device)
        returns = torch.tensor(returns, dtype=torch.float32, device=device)

        # Add bias term manually (for least squares)
        X = torch.cat([states, torch.ones(states.shape[0], 1, device=device)], dim=1)  # [N, D+1]
        y = returns.view(-1, 1)  # [N, 1]

        # Least squares solution: w = (X^T X)^-1 X^T y
        try:
            XtX = X.T @ X
            Xty = X.T @ y
            weights = torch.linalg.solve(XtX, Xty)  # [D+1, 1]
        except RuntimeError:
            print("Least squares failed due to singular matrix.")
            return

        # Update model parameters
        with torch.no_grad():
            self.linear.weight.data = weights[:-1].T
            self.linear.bias.data = weights[-1].squeeze()

class PositionBasedBaseline:
    def __init__(self, scale=10.0):
        self.scale = scale  # Encourages stronger incentives to go right

    def __call__(self, state):
        position = state[0]  # position is the first state variable
        # Shift range to [0, 1.7] and scale
        return self.scale * (np.abs(position) + 1.2)  # maps [-1.2, 0.5] → [0, ~1.7*scale]






In [9]:
env = gym.make("MountainCar-v0")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy = PolicyNetwork(state_dim, action_dim).to(device)
baseline = LinearBaseline(state_dim).to(device)

optimizer = optim.Adam(policy.parameters(), lr=3e-2)

scores = OPPO_update(
    policy=policy,
    optimizer=optimizer,
    env=env,
    baseline=baseline,
    n_episodes=1000,
    max_t=300,
    gamma=0.89,
    print_every=50,
    early_stop=True,
    seed=32,
    target_score=-110,   # around the solved threshold for MountainCar
    env_name="MountainCar-v0",
    display_every=False
)





KeyboardInterrupt: 

In [None]:
from OPPO import OPPO_update

class EscapePitBaseline:
    def __init__(self, pit_center=-0.5, pit_width=0.2, vel_threshold=0.01, scale=10.0):
        self.pit_center = pit_center
        self.pit_width = pit_width
        self.vel_threshold = vel_threshold
        self.scale = scale

    def __call__(self, state):
        position, velocity = state
        # Gaussian-like penalty for being in the pit
        in_pit = np.exp(-((position - self.pit_center) ** 2) / (2 * self.pit_width ** 2))
        low_velocity_penalty = np.exp(-abs(velocity) / self.vel_threshold)
        # Product gives high penalty when in pit and not moving
        penalty = in_pit * low_velocity_penalty
        return self.scale * penalty

class EscapePitBaseline:
    def __init__(self, pit_center=-0.5, pit_width=0.2, speed_thresh=0.01, scale=10.0):
        self.pit_center = pit_center
        self.pit_width = pit_width
        self.speed_thresh = speed_thresh
        self.scale = scale

    def __call__(self, state):
        pos, vel = state
        # Close to the pit?
        in_pit = np.exp(-((pos - self.pit_center) ** 2) / (2 * self.pit_width ** 2))
        # Not moving much (low speed, not velocity sign!)
        low_speed = np.exp(-abs(vel) / self.speed_thresh)

        # Only penalize being stuck, not moving
        stuck_penalty = in_pit * low_speed

        return self.scale * stuck_penalty

baseline = EscapePitBaseline(scale=10.0)

gym.envs.register(
    id='MountainCarMyEasyVersion-v0',
    entry_point='gym.envs.classic_control:MountainCarEnv',
    max_episode_steps=600,      # MountainCar-v0 uses 200
    reward_threshold=-110.0,
)
env = gym.make('MountainCarMyEasyVersion-v0')


# env = gym.make("MountainCar-v0")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy = PolicyNetwork(state_dim, action_dim).to(device)

optimizer = optim.Adam(policy.parameters(), lr=4e-3)

scores = OPPO_update(
    policy=policy,
    optimizer=optimizer,
    env=env,
    baseline=baseline,
    n_episodes=1000,
    max_t=600,
    gamma=0.89,
    print_every=50,
    early_stop=True,
    seed=32,
    target_score=-110,   # around the solved threshold for MountainCar
    env_name="MountainCar-v0",
    display_every=True
)




Episode 50 	 Average Score over the last 50 episodes: -600.0
