REINFORCE with Custom Reward Shaping
Author: Darren Wu

In [None]:
import gymnasium as gym
import math
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from collections import defaultdict

In [None]:
SEED = 42
GAMMA = 0.99
LR = 1e-3
NUM_EPISODES = 1000
PRINT_EVERY = 50
DEVICE = torch.device("cpu")

In [None]:
# monkey patch for deprecated numpy aliasing error.
if not hasattr(np, "bool8"):
    np.bool8 = np.bool_

In [None]:
# reward weights
ENABLE_ACTION_CHANGE_PENALTY = True
ACTION_CHANGE_PENALTY_WEIGHT = 0.001

ENABLE_STATE_DEPENDENT_COST = True
STATE_DEPENDENT_COST_WEIGHT = 0.001

ENABLE_EXPLORATION_BONUS = True
EXPLORATION_BONUS_WEIGHT = 0.01
EXPLORATION_GRID_SIZE = 0.1

HIDDEN_SIZE = 64

In [None]:
def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)

def get_discrete_state_key(state, grid_size=EXPLORATION_GRID_SIZE):
    """
    For exploration bonus in continuous state spaces:
    Round the state to a grid size and return a tuple as a dictionary key.
    """
    return tuple((state / grid_size).astype(int))

# policy networks
class DiscretePolicyNetwork(nn.Module):
    """A simple MLP for discrete action spaces (e.g., CartPole)."""
    def __init__(self, state_dim, action_dim, hidden_size=HIDDEN_SIZE):
        super(DiscretePolicyNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_dim)
        )

    def forward(self, x):
        return self.net(x)  # returns logits

    def act(self, state):
        """
        Samples an action according to the policy distribution.
        Returns the action (int) and the log_prob of that action.
        """
        logits = self.forward(state)
        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()
        return action.item(), dist.log_prob(action)


class ContinuousPolicyNetwork(nn.Module):
    """
    A simple MLP policy for continuous action spaces (e.g., LunarLanderContinuous).
    Outputs mean and log_std for each action dimension.
    """
    def __init__(self, state_dim, action_dim, hidden_size=HIDDEN_SIZE):
        super(ContinuousPolicyNetwork, self).__init__()
        self.fc_mean = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_dim)
        )
        # We'll keep log_std as a trainable parameter
        self.log_std = nn.Parameter(torch.zeros(action_dim))

    def forward(self, x):
        mean = self.fc_mean(x)
        std = torch.exp(self.log_std)
        return mean, std

    def act(self, state):
        """
        Samples a continuous action from a Normal distribution parameterized
        by [mean, std] for each dimension.
        Returns action (np.array) and log_prob (torch.tensor).
        """
        mean, std = self.forward(state)
        dist = torch.distributions.Normal(mean, std)
        action_sample = dist.sample()   # shape: (1, action_dim) if batch_size=1

        # Flatten to remove the batch dimension (so shape is (action_dim,))
        action_np = action_sample.numpy().flatten()
        log_prob = dist.log_prob(action_sample).sum(dim=-1)

        return action_np, log_prob

# reward shaping function
def shape_reward(env_name, state, action, prev_action, raw_reward, state_visit_count):
    """
    Incorporates:
      1. Action-change penalty
      2. State-dependent cost
      3. Exploration bonus
    """
    shaped_reward = raw_reward

    # 1) Action-change penalty (smoothness)
    if ENABLE_ACTION_CHANGE_PENALTY and prev_action is not None:
        if isinstance(action, (int, np.integer)) and isinstance(prev_action, (int, np.integer)):
            # Discrete: small penalty if action differs
            if action != prev_action:
                shaped_reward -= ACTION_CHANGE_PENALTY_WEIGHT
        else:
            # Continuous: penalize magnitude of difference
            diff = np.linalg.norm(action - prev_action)
            shaped_reward -= ACTION_CHANGE_PENALTY_WEIGHT * diff

    # 2) State-dependent cost
    if ENABLE_STATE_DEPENDENT_COST:
        if "CartPole" in env_name:
            # state[1] = x_dot, state[3] = theta_dot
            cost = abs(state[1]) + abs(state[3])
            shaped_reward -= STATE_DEPENDENT_COST_WEIGHT * cost
        elif "LunarLander" in env_name:
            # state = [x, y, x_dot, y_dot, theta, theta_dot, left_contact, right_contact]
            # penalize large velocities
            cost = abs(state[2]) + abs(state[3])
            shaped_reward -= STATE_DEPENDENT_COST_WEIGHT * cost

    # 3) Exploration bonus
    if ENABLE_EXPLORATION_BONUS:
        shaped_reward += EXPLORATION_BONUS_WEIGHT / (1.0 + state_visit_count)

    return shaped_reward

# episode trajectory
def run_episode(env, policy, is_discrete=True):
    """
    Runs a single episode using the given policy.
    Returns lists of states, actions, log_probs, rewards (the entire episode).
    """
    state, _ = env.reset(seed=SEED)  # new step API for resetting
    done = False

    states = []
    actions = []
    log_probs = []
    rewards = []

    prev_action = None

    # For exploration bonus, maintain a dictionary of visit counts
    visit_counts = defaultdict(int)

    while not done:
        state_t = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)

        # Choose action
        if is_discrete:
            action, log_prob = policy.act(state_t)
        else:
            action, log_prob = policy.act(state_t)

        # Discretize state for exploration counting
        disc_state_key = get_discrete_state_key(state) if ENABLE_EXPLORATION_BONUS else None
        if disc_state_key is not None:
            visit_counts[disc_state_key] += 1
            shaped_reward = shape_reward(
                env.unwrapped.spec.id,
                state,
                action,
                prev_action,
                raw_reward=0.0,  # we'll add the environment reward later
                state_visit_count=visit_counts[disc_state_key]
            )
        else:
            shaped_reward = 0.0

        # Step environment
        next_state, raw_reward, done, _, info = env.step(action)

        # Add the environment's native reward to the shaped reward
        total_reward = shaped_reward + raw_reward

        states.append(state)
        actions.append(action)
        log_probs.append(log_prob)
        rewards.append(total_reward)

        state = next_state
        prev_action = action

    return states, actions, log_probs, rewards

# discount returns + update policy
def update_policy(policy, optimizer, log_probs, rewards):
    """
    Given the entire episode's log_probs and shaped rewards,
    compute discounted returns and perform a policy gradient update.
    """
    # Compute discounted returns
    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + GAMMA * G
        returns.insert(0, G)

    returns = torch.tensor(returns, dtype=torch.float).to(DEVICE)
    # Normalize returns for stability (optional but common)
    returns = (returns - returns.mean()) / (returns.std() + 1e-8)

    # Accumulate policy loss
    policy_loss = []
    for log_prob, Gt in zip(log_probs, returns):
        policy_loss.append(-log_prob * Gt)

    policy_loss = torch.stack(policy_loss).sum()

    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

    return policy_loss.item()

# train loop
def train_env(env_name="CartPole-v1",
              num_episodes=NUM_EPISODES,
              lr=LR,
              hidden_size=HIDDEN_SIZE):
    """
    Generic training function. Automatically detects discrete vs continuous
    action space and constructs the appropriate policy network.
    """
    # Make environment with new_step_api to avoid old-step warnings
    env = gym.make(env_name)
    set_seed(SEED)

    state_dim = env.observation_space.shape[0]
    # If the environment's action space has no 'shape', it is discrete
    is_discrete = (len(env.action_space.shape) == 0)

    if is_discrete:
        action_dim = env.action_space.n
        policy = DiscretePolicyNetwork(state_dim, action_dim, hidden_size).to(DEVICE)
    else:
        action_dim = env.action_space.shape[0]
        print("action_dim =", action_dim)
        policy = ContinuousPolicyNetwork(state_dim, action_dim, hidden_size).to(DEVICE)

    optimizer = optim.Adam(policy.parameters(), lr=lr)

    episode_rewards = []
    smoothed_rewards = []
    losses = []

    for ep in range(num_episodes):
        states, actions, log_probs, rewards = run_episode(env, policy, is_discrete)
        ep_reward = sum(rewards)
        loss = update_policy(policy, optimizer, log_probs, rewards)

        episode_rewards.append(ep_reward)
        losses.append(loss)

        # Smoothed (running average over last 10 episodes)
        if len(episode_rewards) < 10:
            smoothed_rewards.append(np.mean(episode_rewards[-len(episode_rewards):]))
        else:
            smoothed_rewards.append(np.mean(episode_rewards[-10:]))

        if (ep + 1) % PRINT_EVERY == 0:
            print(f"Env: {env_name} | Episode: {ep+1} | Return: {ep_reward:.2f} | Avg(10): {smoothed_rewards[-1]:.2f}")

    env.close()
    return episode_rewards, smoothed_rewards, losses


In [None]:
if __name__ == "__main__":

    shaping_configs = [
        {
            "name": "baseline",
            "ENABLE_ACTION_CHANGE_PENALTY": False,
            "ACTION_CHANGE_PENALTY_WEIGHT": 0.0,
            "ENABLE_STATE_DEPENDENT_COST": False,
            "STATE_DEPENDENT_COST_WEIGHT": 0.0,
            "ENABLE_EXPLORATION_BONUS": False,
            "EXPLORATION_BONUS_WEIGHT": 0.0
        },
        {
            "name": "action_change_only",
            "ENABLE_ACTION_CHANGE_PENALTY": True,
            "ACTION_CHANGE_PENALTY_WEIGHT": 0.001,
            "ENABLE_STATE_DEPENDENT_COST": False,
            "STATE_DEPENDENT_COST_WEIGHT": 0.0,
            "ENABLE_EXPLORATION_BONUS": False,
            "EXPLORATION_BONUS_WEIGHT": 0.0
        },
        {
            "name": "state_dependent_only",
            "ENABLE_ACTION_CHANGE_PENALTY": False,
            "ACTION_CHANGE_PENALTY_WEIGHT": 0.0,
            "ENABLE_STATE_DEPENDENT_COST": True,
            "STATE_DEPENDENT_COST_WEIGHT": 0.001,
            "ENABLE_EXPLORATION_BONUS": False,
            "EXPLORATION_BONUS_WEIGHT": 0.0
        },
        {
            "name": "exploration_only",
            "ENABLE_ACTION_CHANGE_PENALTY": False,
            "ACTION_CHANGE_PENALTY_WEIGHT": 0.0,
            "ENABLE_STATE_DEPENDENT_COST": False,
            "STATE_DEPENDENT_COST_WEIGHT": 0.0,
            "ENABLE_EXPLORATION_BONUS": True,
            "EXPLORATION_BONUS_WEIGHT": 0.01
        },
        {
            "name": "combined",
            "ENABLE_ACTION_CHANGE_PENALTY": True,
            "ACTION_CHANGE_PENALTY_WEIGHT": 0.001,
            "ENABLE_STATE_DEPENDENT_COST": True,
            "STATE_DEPENDENT_COST_WEIGHT": 0.001,
            "ENABLE_EXPLORATION_BONUS": True,
            "EXPLORATION_BONUS_WEIGHT": 0.01
        }
    ]

    N_EPISODES_CP = 300
    N_EPISODES_LL = 300

    results_cartpole = {}
    results_lunar = {}

    for config in shaping_configs:

        globals()["ENABLE_ACTION_CHANGE_PENALTY"] = config["ENABLE_ACTION_CHANGE_PENALTY"]
        globals()["ACTION_CHANGE_PENALTY_WEIGHT"] = config["ACTION_CHANGE_PENALTY_WEIGHT"]
        globals()["ENABLE_STATE_DEPENDENT_COST"] = config["ENABLE_STATE_DEPENDENT_COST"]
        globals()["STATE_DEPENDENT_COST_WEIGHT"] = config["STATE_DEPENDENT_COST_WEIGHT"]
        globals()["ENABLE_EXPLORATION_BONUS"] = config["ENABLE_EXPLORATION_BONUS"]
        globals()["EXPLORATION_BONUS_WEIGHT"] = config["EXPLORATION_BONUS_WEIGHT"]

        print(f"\n==== Running CartPole with config: {config['name']} ====")
        cp_returns, cp_smoothed, cp_losses = train_env(
            env_name="CartPole-v1",
            num_episodes=N_EPISODES_CP,
            lr=LR,
            hidden_size=HIDDEN_SIZE
        )
        results_cartpole[config["name"]] = (cp_returns, cp_smoothed, cp_losses)

        print(f"\n==== Running LunarLander with config: {config['name']} ====")
        ll_returns, ll_smoothed, ll_losses = train_env(
            env_name="LunarLanderContinuous-v3",
            num_episodes=N_EPISODES_LL,
            lr=LR,
            hidden_size=HIDDEN_SIZE
        )
        results_lunar[config["name"]] = (ll_returns, ll_smoothed, ll_losses)

    import matplotlib.pyplot as plt

    episodes_cp = range(N_EPISODES_CP)

    plt.figure(figsize=(10,6))
    plt.title("CartPole - Episode Returns (Multiple Shaping Configs)")
    for cfg_name, (cp_returns, cp_smoothed, cp_losses) in results_cartpole.items():
        plt.plot(episodes_cp, cp_returns, label=f"{cfg_name} (raw)")
    plt.xlabel("Episode")
    plt.ylabel("Episode Return (Shaped)")
    plt.legend()
    plt.show()

    plt.figure(figsize=(10,6))
    plt.title("CartPole - Smoothed Returns")
    for cfg_name, (cp_returns, cp_smoothed, cp_losses) in results_cartpole.items():
        plt.plot(episodes_cp, cp_smoothed, label=f"{cfg_name} (smoothed)")
    plt.xlabel("Episode")
    plt.ylabel("Smoothed Return")
    plt.legend()
    plt.show()

    plt.figure(figsize=(10,6))
    plt.title("CartPole - Policy Loss")
    for cfg_name, (cp_returns, cp_smoothed, cp_losses) in results_cartpole.items():
        plt.plot(episodes_cp, cp_losses, label=f"{cfg_name} (loss)")
    plt.xlabel("Episode")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

    # -- LUNARLANDER PLOTS --
    episodes_ll = range(N_EPISODES_LL)

    plt.figure(figsize=(10,6))
    plt.title("LunarLander - Episode Returns (Multiple Shaping Configs)")
    for cfg_name, (ll_returns, ll_smoothed, ll_losses) in results_lunar.items():
        plt.plot(episodes_ll, ll_returns, label=f"{cfg_name} (raw)")
    plt.xlabel("Episode")
    plt.ylabel("Episode Return (Shaped)")
    plt.legend()
    plt.show()

    plt.figure(figsize=(10,6))
    plt.title("LunarLander - Smoothed Returns")
    for cfg_name, (ll_returns, ll_smoothed, ll_losses) in results_lunar.items():
        plt.plot(episodes_ll, ll_smoothed, label=f"{cfg_name} (smoothed)")
    plt.xlabel("Episode")
    plt.ylabel("Smoothed Return")
    plt.legend()
    plt.show()

    plt.figure(figsize=(10,6))
    plt.title("LunarLander - Policy Loss")
    for cfg_name, (ll_returns, ll_smoothed, ll_losses) in results_lunar.items():
        plt.plot(episodes_ll, ll_losses, label=f"{cfg_name} (loss)")
    plt.xlabel("Episode")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()



==== Running CartPole with config: baseline ====
Env: CartPole-v1 | Episode: 50 | Return: 18.00 | Avg(10): 20.80
Env: CartPole-v1 | Episode: 100 | Return: 38.00 | Avg(10): 27.30
Env: CartPole-v1 | Episode: 150 | Return: 60.00 | Avg(10): 90.50
Env: CartPole-v1 | Episode: 200 | Return: 166.00 | Avg(10): 198.20


In [None]:
!pip install swig
!pip install "gymnasium[box2d]"

Collecting swig
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.6/1.9 MB[0m [31m17.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.9/1.9 MB[0m [31m36.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m25.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.0
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Prepar

In [None]:
!apt-get update
!apt-get install -y swig build-essential python3-dev
!pip install --upgrade pip setuptools wheel
!pip install --no-deps "gym==0.25.2"
!pip install "box2d-py==2.3.5" "pygame>=2.3.0"
!pip install "numpy>=1.18.0" "cloudpickle>=1.2.0" "gym_notices>=0.0.4"


Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading



In [None]:
import gym
env = gym.make("LunarLanderContinuous-v2")
obs, info = env.reset()
action = env.action_space.sample()
obs, reward, done, truncated, info = env.step(action)
print("Success:", obs.shape)

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,381 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,684 kB]
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http://archive.ubuntu.com/ubuntu jammy-up