In [None]:
pip install gym torch numpy matplotlib


In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import matplotlib.pyplot as plt


In [None]:
# Initialize the environment
env = gym.make("CartPole-v1")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
print(f"State Size: {state_size}, Action Size: {action_size}")


In [None]:
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Initialize the model
model = DQN(state_size, action_size)
target_model = DQN(state_size, action_size)
target_model.load_state_dict(model.state_dict())
target_model.eval()


In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = random.sample(range(len(self.buffer)), batch_size)
        return [self.buffer[idx] for idx in indices]

    def size(self):
        return len(self.buffer)

# Initialize replay buffer
buffer = ReplayBuffer(10000)


In [None]:
def train_dqn(env, model, target_model, buffer, episodes, batch_size, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01, learning_rate=1e-3, target_update=10):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    rewards = []

    for episode in range(episodes):
        state = env.reset()
        state = torch.FloatTensor(state).unsqueeze(0)
        total_reward = 0

        while True:
            # Epsilon-greedy action selection
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_values = model(state)
                    action = torch.argmax(q_values).item()

            # Take action and observe the result
            next_state, reward, done, _ = env.step(action)
            next_state = torch.FloatTensor(next_state).unsqueeze(0)
            buffer.add((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            # Train the model
            if buffer.size() >= batch_size:
                batch = buffer.sample(batch_size)
                states, actions, rewards, next_states, dones = zip(*batch)

                states = torch.cat(states)
                actions = torch.LongTensor(actions).unsqueeze(1)
                rewards = torch.FloatTensor(rewards).unsqueeze(1)
                next_states = torch.cat(next_states)
                dones = torch.FloatTensor(dones).unsqueeze(1)

                current_q = model(states).gather(1, actions)
                next_q = target_model(next_states).max(1)[0].unsqueeze(1)
                target_q = rewards + (gamma * next_q * (1 - dones))

                loss = nn.MSELoss()(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if done:
                break

        # Update epsilon
        epsilon = max(min_epsilon, epsilon * epsilon_decay)
        rewards.append(total_reward)

        # Update target network
        if episode % target_update == 0:
            target_model.load_state_dict(model.state_dict())

        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward:.2f}")

    return rewards

# Train the agent
rewards = train_dqn(env, model, target_model, buffer, episodes=500, batch_size=64)


In [None]:
state = env.reset()
state = torch.FloatTensor(state).unsqueeze(0)
total_reward = 0

while True:
    with torch.no_grad():
        q_values = model(state)
        action = torch.argmax(q_values).item()
    next_state, reward, done, _ = env.step(action)
    env.render()
    state = torch.FloatTensor(next_state).unsqueeze(0)
    total_reward += reward
    if done:
        break

env.close()
print(f"Test Total Reward: {total_reward:.2f}")
