In [3]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# --- Hyperparameters ---
ENV_NAME = "CartPole-v1"
MAX_EPISODES = 500
MAX_STEPS = 200           
BATCH_SIZE = 64
GAMMA = 0.99               
LR = 1e-3                  
EPS_START = 1.0            
EPS_END = 0.01            
EPS_DECAY = 0.995         
TARGET_UPDATE_FREQ = 30     
REPLAY_BUFFER_SIZE = 10000
MIN_REPLAY_SIZE = 1000

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

class QNetwork(nn.Module):
    """
    Simple MLP that takes the state as input and outputs Q-values for each action.
    """
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.net(x)


class ReplayBuffer:
    """
    Replay buffer to store tuples of (state, action, reward, next_state, done).
    """
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states, dtype=np.float32),
                np.array(actions, dtype=np.int64),
                np.array(rewards, dtype=np.float32),
                np.array(next_states, dtype=np.float32),
                np.array(dones, dtype=np.uint8))

    def __len__(self):
        return len(self.buffer)


def select_action(state, q_network, epsilon, env):
    """
    Epsilon-greedy action selection.
    """
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        state_t = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = q_network(state_t)
        return q_values.argmax(dim=1).item()


def compute_td_loss(batch, q_network, target_network, optimizer):
    """
    Compute the temporal difference loss using a minibatch.
    """
    states, actions, rewards, next_states, dones = batch

    states_t = torch.FloatTensor(states)
    actions_t = torch.LongTensor(actions).unsqueeze(-1)
    rewards_t = torch.FloatTensor(rewards).unsqueeze(-1)
    next_states_t = torch.FloatTensor(next_states)
    dones_t = torch.FloatTensor(dones).unsqueeze(-1)  # 0 or 1

    # Get current Q-values
    current_q_values = q_network(states_t).gather(1, actions_t)

    # Get next Q-values from the target network
    with torch.no_grad():
        next_q_values = target_network(next_states_t).max(dim=1, keepdim=True)[0]

    # If done, next_q_values should be 0
    expected_q_values = rewards_t + GAMMA * next_q_values * (1 - dones_t)

    loss = nn.MSELoss()(current_q_values, expected_q_values)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss.item()




In [4]:

env = gym.make(ENV_NAME)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Initialize Q-network and target network
q_network = QNetwork(state_dim, action_dim)
target_network = QNetwork(state_dim, action_dim)
target_network.load_state_dict(q_network.state_dict())  # same initial weights

optimizer = optim.Adam(q_network.parameters(), lr=LR)
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

epsilon = EPS_START
all_rewards = []

# Pre-fill replay buffer with random actions
obs, info = env.reset()  # new API: reset returns (obs, info)
for _ in range(MIN_REPLAY_SIZE):
    action = env.action_space.sample()
    next_obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

    replay_buffer.push(obs, action, reward, next_obs, done)

    obs = next_obs
    if done:
        obs, info = env.reset()

# Main training loop
for episode in range(MAX_EPISODES):
    obs, info = env.reset()
    episode_reward = 0

    for step in range(MAX_STEPS):
        action = select_action(obs, q_network, epsilon, env)
        next_obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        episode_reward += reward
        replay_buffer.push(obs, action, reward, next_obs, done)

        obs = next_obs

        # Sample from replay buffer and update network
        batch = replay_buffer.sample(BATCH_SIZE)
        loss = compute_td_loss(batch, q_network, target_network, optimizer)

        if done:
            break

    # Decay epsilon
    epsilon = max(EPS_END, epsilon * EPS_DECAY)

    # Update target network periodically
    if (episode + 1) % TARGET_UPDATE_FREQ == 0:
        target_network.load_state_dict(q_network.state_dict())

    all_rewards.append(episode_reward)
    print(f"Episode {episode+1}, Reward: {episode_reward}, Epsilon: {epsilon:.3f}")

    # Early stopping if environment is solved
    # CartPole-v1 is considered solved at average reward >= 195 over 100 consecutive episodes
    if len(all_rewards) >= 100 and np.mean(all_rewards[-100:]) >= 195:
        print(f"Solved in {episode+1} episodes!")
        break

env.close()

Episode 1, Reward: 10.0, Epsilon: 0.010
Episode 2, Reward: 9.0, Epsilon: 0.010
Episode 3, Reward: 10.0, Epsilon: 0.010
Episode 4, Reward: 10.0, Epsilon: 0.010
Episode 5, Reward: 11.0, Epsilon: 0.010
Episode 6, Reward: 10.0, Epsilon: 0.010
Episode 7, Reward: 9.0, Epsilon: 0.010
Episode 8, Reward: 12.0, Epsilon: 0.010
Episode 9, Reward: 10.0, Epsilon: 0.010
Episode 10, Reward: 10.0, Epsilon: 0.010
Episode 11, Reward: 9.0, Epsilon: 0.010
Episode 12, Reward: 10.0, Epsilon: 0.010
Episode 13, Reward: 9.0, Epsilon: 0.010
Episode 14, Reward: 10.0, Epsilon: 0.010
Episode 15, Reward: 9.0, Epsilon: 0.010
Episode 16, Reward: 11.0, Epsilon: 0.010
Episode 17, Reward: 9.0, Epsilon: 0.010
Episode 18, Reward: 10.0, Epsilon: 0.010
Episode 19, Reward: 9.0, Epsilon: 0.010
Episode 20, Reward: 9.0, Epsilon: 0.010
Episode 21, Reward: 10.0, Epsilon: 0.010
Episode 22, Reward: 10.0, Epsilon: 0.010
Episode 23, Reward: 9.0, Epsilon: 0.010
Episode 24, Reward: 8.0, Epsilon: 0.010
Episode 25, Reward: 9.0, Epsilon: 0

In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# --- Hyperparameters ---
ENV_NAME = "CartPole-v1"
MAX_EPISODES = 500
MAX_STEPS = 200           
BATCH_SIZE = 64
GAMMA = 0.99               
LR = 1e-3                  
EPS_START = 1.0            
EPS_END = 0.01            
EPS_DECAY = 0.995         
TARGET_UPDATE_FREQ = 30     
REPLAY_BUFFER_SIZE = 10000
MIN_REPLAY_SIZE = 1000

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

class QNetwork(nn.Module):
    """
    Simple MLP that takes the state as input and outputs Q-values for each action.
    """
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.net(x)


class ReplayBuffer:
    """
    Replay buffer to store tuples of (state, action, reward, next_state, done).
    """
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states, dtype=np.float32),
                np.array(actions, dtype=np.int64),
                np.array(rewards, dtype=np.float32),
                np.array(next_states, dtype=np.float32),
                np.array(dones, dtype=np.uint8))

    def __len__(self):
        return len(self.buffer)


def select_action(state, q_network, epsilon, env):
    """
    Epsilon-greedy action selection.
    """
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        state_t = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = q_network(state_t)
        return q_values.argmax(dim=1).item()


def compute_td_loss(batch, q_network, target_network, optimizer):
    """
    Compute the temporal difference loss using a minibatch.
    """
    states, actions, rewards, next_states, dones = batch

    states_t = torch.FloatTensor(states)
    actions_t = torch.LongTensor(actions).unsqueeze(-1)
    rewards_t = torch.FloatTensor(rewards).unsqueeze(-1)
    next_states_t = torch.FloatTensor(next_states)
    dones_t = torch.FloatTensor(dones).unsqueeze(-1)  # 0 or 1

    # Get current Q-values
    current_q_values = q_network(states_t).gather(1, actions_t)

    # Get next Q-values from the target network
    with torch.no_grad():
        next_q_values = target_network(next_states_t).max(dim=1, keepdim=True)[0]

    # If done, next_q_values should be 0
    expected_q_values = rewards_t + GAMMA * next_q_values * (1 - dones_t)

    loss = nn.MSELoss()(current_q_values, expected_q_values)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss.item()




In [None]:

env = gym.make(ENV_NAME)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Initialize Q-network and target network
q_network = QNetwork(state_dim, action_dim)
target_network = QNetwork(state_dim, action_dim)
target_network.load_state_dict(q_network.state_dict())  # same initial weights

optimizer = optim.Adam(q_network.parameters(), lr=LR)
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

epsilon = EPS_START
all_rewards = []

# Pre-fill replay buffer with random actions
obs, info = env.reset()  # new API: reset returns (obs, info)
for _ in range(MIN_REPLAY_SIZE):
    action = env.action_space.sample()
    next_obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

    replay_buffer.push(obs, action, reward, next_obs, done)

    obs = next_obs
    if done:
        obs, info = env.reset()

# Main training loop
for episode in range(MAX_EPISODES):
    obs, info = env.reset()
    episode_reward = 0

    for step in range(MAX_STEPS):
        action = select_action(obs, q_network, epsilon, env)
        next_obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        episode_reward += reward
        replay_buffer.push(obs, action, reward, next_obs, done)

        obs = next_obs

        # Sample from replay buffer and update network
        batch = replay_buffer.sample(BATCH_SIZE)
        loss = compute_td_loss(batch, q_network, target_network, optimizer)

        if done:
            break

    # Decay epsilon
    epsilon = max(EPS_END, epsilon * EPS_DECAY)

    # Update target network periodically
    if (episode + 1) % TARGET_UPDATE_FREQ == 0:
        target_network.load_state_dict(q_network.state_dict())

    all_rewards.append(episode_reward)
    print(f"Episode {episode+1}, Reward: {episode_reward}, Epsilon: {epsilon:.3f}")

    # Early stopping if environment is solved
    # CartPole-v1 is considered solved at average reward >= 195 over 100 consecutive episodes
    if len(all_rewards) >= 100 and np.mean(all_rewards[-100:]) >= 195:
        print(f"Solved in {episode+1} episodes!")
        break

env.close()