In [None]:
%%writefile MountainCar-DQN-a.py

import argparse
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import random
from itertools import count
import os

def parse_args():
    p = argparse.ArgumentParser(description="Running with different episodic counts and mean rewards")
    p.add_argument("--environment", type=str, default="MountainCar-v0", help="Gym environment id")
    p.add_argument("--episodes", type=int, default=100, help="Number of episodes to run")
    p.add_argument("--mean_n", type=int, default=5, help="n for rolling mean plot")
    p.add_argument("--seed", type=int, default=None, help="Random seed (optional)")
    p.add_argument("--render", action="store_true", help="Render environment (slows down execution)")
    return p.parse_args()


def safe_reset(env):
    """Handle gym vs gymnasium reset return types."""
    out = env.reset()
    if isinstance(out, tuple) and len(out) >= 1:
        return out[0]
    return out

def safe_step(env, action):
    """Handle gym vs gymnasium step return types."""
    out = env.step(action)
    if len(out) == 4:
        obs, reward, done, info = out
        return obs, reward, done, info
    elif len(out) == 5:
        obs, reward, terminated, truncated, info = out
        done = terminated or truncated
        return obs, reward, done, info
    else:
        raise RuntimeError("Unexpected step output format: len = {}".format(len(out)))

def get_state_tensor(obs):
    """Return numpy array representation for plotting / visualization. Kept simple."""
    return np.array(obs, dtype=np.float32)

def select_random_action(action_space):
    """Return a single integer action chosen uniformly at random."""
    return action_space.sample()

def plot_results(rewards_mean, steps, best_rewards_mean, env_name, file_name, action_scatter):
    """Create a 2-panel plot: performance and action choices scatter."""
    fig = plt.figure(figsize=(12,5))

    ax1 = fig.add_subplot(121)
    ax1.plot(steps, rewards_mean, label=f"{len(steps)}-point rolling mean")
    ax1.plot(steps, best_rewards_mean, label="Best mean reward")
    ax1.grid(True)
    ax1.set_xlabel("Total environment steps")
    ax1.set_ylabel("Reward (higher is better)")
    ax1.legend()
    ax1.set_title(f"Performance of random agent on {env_name}")

    ax2 = fig.add_subplot(122)
    if len(action_scatter) > 0:
        arr = np.array(action_scatter)
        X = arr[:,0].astype(float)
        Y = arr[:,1].astype(float)
        Z = arr[:,2].astype(int)
        # color map for 3 discrete actions
        cmap = {0: 'lime', 1: 'red', 2: 'blue'}
        colors = [cmap[int(a)] for a in Z]
        ax2.scatter(X, Y, c=colors, s=12, alpha=0.7)
        action_names = ['Left (0)', 'No-Op (1)', 'Right (2)']
        # legend patches
        legend_recs = [mpatches.Rectangle((0,0),1,1,fc=cmap[i]) for i in range(3)]
        ax2.legend(legend_recs, action_names, loc='best')
    ax2.set_title("Random agent action choices (sampled states)")
    ax2.set_xlabel("Position")
    ax2.set_ylabel("Velocity")

    plt.suptitle(f"{env_name} - Random Agent Analysis")
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.savefig(file_name, dpi=200)
    print(f"Saved plot to {file_name}")
    plt.show()
    plt.close(fig)
    return

def run_random_agent(env_id, episodes, mean_n, seed=None, render=False):
    if seed is not None:
        np.random.seed(seed)
        random.seed(seed)

    env = gym.make(env_id)
    print("Environment:", env_id)
    print("Observation space:", env.observation_space)
    print("Action space:", env.action_space)

    episode_rewards = []
    best_reward = -float('inf')
    rewards_mean = []
    best_rewards_mean = []
    steps = []

    total_steps = 0
    success_count = 0

    action_scatter = []  # store (pos, vel, action) samples for plotting

    for ep in range(episodes):
        obs = safe_reset(env)
        state = get_state_tensor(obs)
        total_reward = 0.0

        for t in count():
            if render:
                env.render()

            action = select_random_action(env.action_space)
            if len(action_scatter) < 2000 and (total_steps % max(1, int(100/episodes)) == 0):
                action_scatter.append((state[0], state[1], action))

            obs, reward, done, info = safe_step(env, action)
            state = get_state_tensor(obs)
            total_reward += reward
            total_steps += 1

            if done or t >= 10000:  # safety cap
                # success condition: position >= 0.5 at termination
                try:
                    pos = state[0]
                except:
                    pos = None
                if pos is not None and pos >= 0.5:
                    success_count += 1
                break

        episode_rewards.append(total_reward)

        if len(episode_rewards) >= mean_n:
            present_mean = float(np.mean(episode_rewards[-mean_n:]))
            rewards_mean.append(present_mean)
            best_reward = max(present_mean, best_reward)
            best_rewards_mean.append(best_reward)
            steps.append(total_steps)

        print(f"Episode {ep+1}/{episodes} | Reward = {total_reward:.2f} | Successes so far = {success_count}")

    env.close()

    fn = f"{env_id}_random_{episodes}ep_mean{mean_n}.png"
    plot_results(rewards_mean, steps, best_rewards_mean, env_id, fn, action_scatter)

    # Summary
    summary = {
        "total_episodes": episodes,
        "mean_n": mean_n,
        "final_mean_reward": rewards_mean[-1] if rewards_mean else None,
        "best_mean_reward": best_rewards_mean[-1] if best_rewards_mean else None,
        "success_count": success_count,
        "total_steps": total_steps
    }
    return summary

if __name__ == "__main__":
    args = parse_args()
    summary = run_random_agent(args.environment, args.episodes, args.mean_n, args.seed, args.render)
    print()
    print("="*10 +"Summary" + "="*10)
    for k, v in summary.items():
        print(f"{k}: {v}")


Writing MountainCar-DQN-a.py


In [None]:
# !python3 MountainCar-DQN-a.py --environment "MountainCar-v0" --episodes 100 --mean_n 5 --seed 303
# run the python CMD via CLI


Environment: MountainCar-v0
Observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
Action space: Discrete(3)
Episode 1/100 | Reward = -200.00 | Successes so far = 0
Episode 2/100 | Reward = -200.00 | Successes so far = 0
Episode 3/100 | Reward = -200.00 | Successes so far = 0
Episode 4/100 | Reward = -200.00 | Successes so far = 0
Episode 5/100 | Reward = -200.00 | Successes so far = 0
Episode 6/100 | Reward = -200.00 | Successes so far = 0
Episode 7/100 | Reward = -200.00 | Successes so far = 0
Episode 8/100 | Reward = -200.00 | Successes so far = 0
Episode 9/100 | Reward = -200.00 | Successes so far = 0
Episode 10/100 | Reward = -200.00 | Successes so far = 0
Episode 11/100 | Reward = -200.00 | Successes so far = 0
Episode 12/100 | Reward = -200.00 | Successes so far = 0
Episode 13/100 | Reward = -200.00 | Successes so far = 0
Episode 14/100 | Reward = -200.00 | Successes so far = 0
Episode 15/100 | Reward = -200.00 | Successes so far = 0
Episode 16/100 | Reward = -200.00 

In [None]:
%%writefile MountainCar-DQN-b.py

import math
import random
from itertools import count
from collections import namedtuple, deque
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import argparse
import pandas as pd

#################################
# Argument Parsing
#################################
parser = argparse.ArgumentParser(description="DQN for MountainCar-v0")
parser.add_argument("--environment", type=str, default="MountainCar-v0")
parser.add_argument("--num_episodes", type=int, default=200)
parser.add_argument("--batch", type=int, default=64)
parser.add_argument("--gamma", type=float, default=0.99)
parser.add_argument("--learning_rate", type=float, default=1e-3)
parser.add_argument("--mean_n", type=int, default=5)
parser.add_argument("--seed", type=int, default=0)
args = parser.parse_args()

###############
# Setup
###############
env = gym.make(args.environment)
env.reset(seed=args.seed)
np.random.seed(args.seed)
random.seed(args.seed)
torch.manual_seed(args.seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

######################
# DQN Model Definition
##############3#######
class DQN(nn.Module):
    def __init__(self, state_dim=2, hidden_dim=200, action_dim=3):
        super(DQN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.net(x)

####################
# Replay Memory
####################
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory:
    def __init__(self, capacity=10000):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

#####################
# Helper Functions
#####################
def get_state(obs):
    """Convert observation to tensor."""
    s = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
    return s

def select_action(state, eps_threshold):
    """ε-greedy action selection."""
    if random.random() > eps_threshold:
        with torch.no_grad():
            return policy_net(state).argmax(dim=1).view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

def optimize_model():
    """Perform a single optimization step."""
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = Transition(*zip(*transitions))

    state_batch = torch.cat(batch.state).to(dtype=torch.float32, device=device)
    action_batch = torch.cat(batch.action).to(dtype=torch.long, device=device)
    reward_batch = torch.cat(batch.reward).to(dtype=torch.float32, device=device)

    non_final_mask = torch.tensor(
        tuple(map(lambda s: s is not None, batch.next_state)),
        device=device, dtype=torch.bool
    )

    non_final_next_states = torch.cat(
        [s for s in batch.next_state if s is not None]
    ).to(dtype=torch.float32, device=device)

    # Q(s_t, a)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # V(s_{t+1})
    next_state_values = torch.zeros(batch_size, device=device, dtype=torch.float32)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]

    # expected Q values
    expected_state_action_values = (next_state_values * gamma) + reward_batch

    # loss
    loss = F.mse_loss(state_action_values.squeeze(), expected_state_action_values)

    # Backpropagation
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
    optimizer.step()

##################
# Training Loop
###################
n_actions = env.action_space.n
batch_size = args.batch
gamma = args.gamma
num_episodes = args.num_episodes
lr = args.learning_rate
mean_n = args.mean_n

policy_net = DQN().to(device)
target_net = DQN().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=lr)
memory = ReplayMemory(10000)

eps_start, eps_end, eps_decay = 1.0, 0.02, 800
target_update = 10
initial_memory = 1000
total_steps = 0

rewards_list, mean_rewards, best_mean_rewards, steps_list = [], [], [], []
best_mean = -float("inf")
successes = 0

for i_episode in range(num_episodes):
    obs, _ = env.reset()
    state = get_state(obs)
    total_reward = 0.0

    for t in count():
        eps_threshold = eps_end + (eps_start - eps_end) * math.exp(-1. * total_steps / eps_decay)
        action = select_action(state, eps_threshold)
        obs_next, reward, done, truncated, _ = env.step(action.item())
        done = done or truncated

        # --- Reward shaping: ---
        # Original reward is always -1 until goal is reached
        # We shape it to encourage progress towards the goal
        shaped_reward = obs_next[0] + 0.5 + abs(obs_next[1]*10)

        # Additional reward for reaching the goal
        if obs_next[0] >= 0.5:
            shaped_reward += 100 # Large bonus for success


        # Convert to tensor
        reward_tensor = torch.tensor([shaped_reward], device=device)

        next_state = None if done else get_state(obs_next)
        memory.push(state, action, next_state, reward_tensor)

        state = next_state
        total_reward += reward # Keep track of the TRUE environment reward
        total_steps += 1

        if total_steps > initial_memory:
            optimize_model()

        if done:
            if obs_next[0] >= 0.5:
                successes += 1
            break

    rewards_list.append(total_reward)

    if i_episode >= mean_n:
        mean_r = np.mean(rewards_list[-mean_n:])
        mean_rewards.append(mean_r)
        best_mean = max(best_mean, mean_r)
        best_mean_rewards.append(best_mean)
        steps_list.append(total_steps)

    if i_episode % target_update == 0:
        target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode {i_episode+1}/{num_episodes} | Reward: {total_reward:.2f} | "
          f"Success: {successes} | epsilon={eps_threshold:.3f}")

###############
# Plotting
###############
env.close()

file_name = f"{args.environment}_DQN_{num_episodes}_episodes_{args.batch}batch.png"

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Performance Plot
# Ensure steps_list and mean_rewards are not empty
if steps_list and mean_rewards:
    ax1.plot(steps_list, mean_rewards, label=f"{mean_n}-episode mean")
    ax1.plot(steps_list, best_mean_rewards, label="Best mean reward")
ax1.set_xlabel("Steps")
ax1.set_ylabel("Reward")
ax1.set_title("DQN Performance on MountainCar-v0")
ax1.grid(True)
ax1.legend()


X = np.linspace(-1.5, 0.6, 50)
Y = np.linspace(-1, 1, 50)
X, Y = np.meshgrid(X, Y)
X, Y = X.flatten(), Y.flatten()


Z = []
# Using policy_net to determine the action for each state
states_to_evaluate = torch.tensor(np.array(list(zip(X, Y))), dtype=torch.float32, device=device)
with torch.no_grad():
    actions_tensor = policy_net(states_to_evaluate).argmax(dim=1).cpu().numpy() # Get index of max Q-value
Z = actions_tensor

colors = ['lime', 'red', 'blue'] # 0: left, 1: no-op, 2: right
ax2.scatter(X, Y, c=[colors[z] for z in Z], s=1, alpha=0.7)
ax2.set_xlabel("Position")
ax2.set_ylabel("Velocity")
ax2.set_title("Trained DQN Action Choices")
legend_recs = [mpatches.Patch(color=colors[i], label=f"Action {i}") for i in range(3)]
ax2.legend(handles=legend_recs)
ax2.set_xlim([-1.5, 0.6])
ax2.set_ylim([-1, 1])


plt.tight_layout()
plt.savefig(file_name, dpi=200)
print(f"Saved plot to {file_name}")


Writing MountainCar-DQN-b.py


In [None]:
# !python3 MountainCar-DQN-b.py --environment="MountainCar-v0" --num_episodes=100 --batch=8 --gamma=0.9 --learning_rate=1e-4 --mean_n=5


Episode 1/100 | Reward: -200.00 | Success: 0 | ε=0.784
Episode 2/100 | Reward: -200.00 | Success: 0 | ε=0.615
Episode 3/100 | Reward: -200.00 | Success: 0 | ε=0.483
Episode 4/100 | Reward: -200.00 | Success: 0 | ε=0.381
Episode 5/100 | Reward: -200.00 | Success: 0 | ε=0.301
Episode 6/100 | Reward: -200.00 | Success: 0 | ε=0.239
Episode 7/100 | Reward: -200.00 | Success: 0 | ε=0.191
Episode 8/100 | Reward: -200.00 | Success: 0 | ε=0.153
Episode 9/100 | Reward: -200.00 | Success: 0 | ε=0.123
Episode 10/100 | Reward: -200.00 | Success: 0 | ε=0.101
Episode 11/100 | Reward: -200.00 | Success: 0 | ε=0.083
Episode 12/100 | Reward: -200.00 | Success: 0 | ε=0.069
Episode 13/100 | Reward: -200.00 | Success: 0 | ε=0.058
Episode 14/100 | Reward: -200.00 | Success: 0 | ε=0.050
Episode 15/100 | Reward: -200.00 | Success: 0 | ε=0.043
Episode 16/100 | Reward: -200.00 | Success: 0 | ε=0.038
Episode 17/100 | Reward: -200.00 | Success: 0 | ε=0.034
Episode 18/100 | Reward: -200.00 | Success: 0 | ε=0.031
E

In [None]:
%%writefile MountainCar-DQN-c.py
import subprocess
import matplotlib.pyplot as plt
import math
import random
from itertools import count
from collections import namedtuple, deque
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import argparse
import pandas as pd

# Fixed settings
environment = "MountainCar-v0"
num_episodes = 100

learning_rate = 1e-4
gamma = 0.9
mean_n = 5

# Batch sizes to compare
batch_sizes = [8, 16, 32, 64]

# Store results
batch_rewards = {}

for b in batch_sizes:
    print(f"\nRunning with batch size = {b}")
    result = subprocess.run(
        [
            "python3",
            "MountainCar-DQN-b.py",
            "--environment", environment,
            "--num_episodes", str(num_episodes),
            "--batch", str(b),
            "--learning_rate", str(learning_rate),
            "--gamma", str(gamma),
            "--mean_n", str(mean_n)
        ],
        capture_output=True, text=True
    )
