In [2]:
pip install "gymnasium[classic-control]"


Collecting pygame>=2.1.3 (from gymnasium[classic-control])
  Downloading pygame-2.6.1-cp313-cp313-win_amd64.whl.metadata (13 kB)
Downloading pygame-2.6.1-cp313-cp313-win_amd64.whl (10.6 MB)
   ---------------------------------------- 0.0/10.6 MB ? eta -:--:--
   ---------- ----------------------------- 2.9/10.6 MB 17.1 MB/s eta 0:00:01
   ------------------------- -------------- 6.8/10.6 MB 18.1 MB/s eta 0:00:01
   ---------------------------------------  10.5/10.6 MB 18.3 MB/s eta 0:00:01
   ---------------------------------------- 10.6/10.6 MB 17.2 MB/s eta 0:00:00
Installing collected packages: pygame
Successfully installed pygame-2.6.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import gymnasium as gym
import time

# load the CartPole environment
env = gym.make("CartPole-v1", render_mode="human")

# every game starts with a reset
state, info = env.reset()

# run the game for a short period
for _ in range(50):
    # render the current frame
    env.render()

    # choose a random action (0 for push left, 1 for push right)
    action = env.action_space.sample()

    next_state, reward, terminated, truncated, info = env.step(action)

    print(f"State: {state.shape}, Action: {action}, Reward: {reward}")

    # update the state for the next loop
    state = next_state

    # if the game is over, reset it to start a new game
    if terminated or truncated:
        state, info = env.reset()

    time.sleep(0.02) # slow down for visualization

# close the environment window
env.close()

State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 0, Reward: 1.0
State: (4,), Action: 1, Rewa

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

class QNetwork(nn.Module):
    """
    Neural Network to approximate the Q-value function.
    """
    def __init__(self, state_size, action_size):
        """
        Initializes the network layers.
        :param state_size: The number of features in the game state (e.g., 4 for CartPole).
        :param action_size: The number of possible actions (e.g., 2 for CartPole).
        """
        super(QNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU(), # activation function
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_size)
        )

    def forward(self, state):
        """
        Defines the forward pass of the network.
        It takes a state and returns the Q-values for each action.
        """
        return self.network(state)

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque, namedtuple

# Hyperparameters
BUFFER_SIZE = 10000     # replay buffer size
BATCH_SIZE = 64         # minibatch size for training
GAMMA = 0.99            # discount factor for future rewards
LR = 5e-4               # learning rate
UPDATE_EVERY = 4        # how often to update the network

# use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class DQNAgent():
    """Interacts with and learns from the environment."""

    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size

        # Q-Network
        self.qnetwork_local = QNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)

        # replay memory
        self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE)
        # initialize time step
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):
        # save experience in replay memory
        self.memory.add(state, action, reward, next_state, done)

        # learn every UPDATE_EVERY time steps.
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            # if enough samples are available in memory, get random subset and learn
            if len(self.memory) > BATCH_SIZE:
                experiences = self.memory.sample()
                self.learn(experiences, GAMMA)

    def act(self, state, eps=0.):
        """Returns actions for given state as per current policy."""
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval() # set network to evaluation mode
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train() # set network back to training mode

        # epsilon-greedy action selection
        if random.random() > eps:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, gamma):
        """Update value parameters using given batch of experience tuples."""
        states, actions, rewards, next_states, dones = experiences

        # get max predicted Q-values for next states from the network
        Q_targets_next = self.qnetwork_local(next_states).detach().max(1)[0].unsqueeze(1)

        # compute Q targets for current states
        # target = reward + gamma * Q_next (if not done)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        # get expected Q values from local model
        Q_expected = self.qnetwork_local(states).gather(1, actions)

        # compute loss
        loss = F.mse_loss(Q_expected, Q_targets)

        # minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# Replay Buffer
class ReplayBuffer:
    """Fixed-size buffer to store experience tuples."""
    def __init__(self, action_size, buffer_size, batch_size):
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)

In [4]:
import gymnasium as gym

# Initialize Environment and Agent
env = gym.make("CartPole-v1")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size=state_size, action_size=action_size)

# Training Hyperparameters
n_episodes = 2000       # max number of training episodes
max_t = 1000            # max number of timesteps per episode
eps_start = 1.0         # starting value of epsilon
eps_end = 0.01          # minimum value of epsilon
eps_decay = 0.995       # multiplicative factor for decreasing epsilon

def train():
    scores = []                         # list containing scores from each episode
    scores_window = deque(maxlen=100)   # last 100 scores
    eps = eps_start                     # initialize epsilon

    for i_episode in range(1, n_episodes + 1):
        state, info = env.reset()
        score = 0
        for t in range(max_t):
            action = agent.act(state, eps)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.step(state, action, reward, next_state, done)

            state = next_state
            score += reward
            if done:
                break

        scores_window.append(score)
        scores.append(score)

        # decrease epsilon
        eps = max(eps_end, eps_decay * eps)

        print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}', end="")
        if i_episode % 100 == 0:
            print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')

        # check if the environment is solved
        if np.mean(scores_window) >= 195.0:
            print(f'\nEnvironment solved in {i_episode-100:d} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
            # save the trained model's weights
            torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
            break

    return scores

scores = train()
env.close()

Episode 100	Average Score: 18.36
Episode 200	Average Score: 63.03
Episode 242	Average Score: 196.79
Environment solved in 142 episodes!	Average Score: 196.79


In [5]:
# Testing the AI Agent

# Step 1: Initialize the Environment and Agent
env = gym.make("CartPole-v1", render_mode="human")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# create an agent
agent = DQNAgent(state_size=state_size, action_size=action_size)

# Step 2: Load the Trained Weights
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))

# Step 3: Watch the Smart Agent Play
num_episodes_to_watch = 10

for i in range(num_episodes_to_watch):
    # reset the environment to get the initial state
    state, info = env.reset()
    done = False
    episode_reward = 0

    print(f"--- Watching Episode {i+1} ---")

    while not done:
        # render the environment
        env.render()

        # choose the best action using the trained network (epsilon=0)
        action = agent.act(state)

        # perform the action in the environment
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        # update the state
        state = next_state
        episode_reward += reward

        # add a small delay to make it watchable
        time.sleep(0.02)

    print(f"Score for Episode {i+1}: {episode_reward}")

# close the environment window
env.close()

--- Watching Episode 1 ---
Score for Episode 1: 380.0
--- Watching Episode 2 ---
Score for Episode 2: 345.0
--- Watching Episode 3 ---
Score for Episode 3: 339.0
--- Watching Episode 4 ---
Score for Episode 4: 310.0
--- Watching Episode 5 ---
Score for Episode 5: 324.0
--- Watching Episode 6 ---
Score for Episode 6: 359.0
--- Watching Episode 7 ---
Score for Episode 7: 353.0
--- Watching Episode 8 ---
Score for Episode 8: 389.0
--- Watching Episode 9 ---
Score for Episode 9: 325.0
--- Watching Episode 10 ---
Score for Episode 10: 333.0
