In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

In [2]:
class CartPoleEnv:
    def __init__(self):
        self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
        self.g = 9.8
        self.m = 0.1
        self.M = 1.0
        self.L = 0.5
        self.dt = 0.02
        
    def step(self, action):
        x, x_velocity, theta, theta_velocity = self.state
        force = 20.0 if action == 1 else -20.0
        cos_theta = np.cos(theta)
        sin_theta = np.sin(theta)
        total_mass = self.M + self.m
        pole_mass_length = self.m * self.L
        temp = (force + pole_mass_length * theta_velocity ** 2 * sin_theta) / total_mass
        theta_acc = (self.g * sin_theta - cos_theta * temp) / (self.L * (4 / 3 - self.m * cos_theta ** 2 / total_mass))
        x_acc = temp - pole_mass_length * theta_acc * cos_theta / total_mass
        x += x_velocity * self.dt
        x_velocity += x_acc * self.dt
        theta += theta_velocity * self.dt
        theta_velocity += theta_acc * self.dt
        self.state = np.array([x, x_velocity, theta, theta_velocity])
        reward = 1.0 if abs(theta) < np.pi / 12 else -1.0
        done = abs(theta) >= np.pi / 6
        return self.state, reward, done
    
    def reset(self):
        self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
        return self.state

In [3]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_dim)
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [4]:
class DQNAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 32
        self.model = DQN(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.criterion = nn.MSELoss()
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values = self.model(state)
        return torch.argmax(q_values).item()
    
    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                next_state = torch.FloatTensor(next_state).unsqueeze(0)
                target += self.gamma * torch.max(self.model(next_state)).item()
            state = torch.FloatTensor(state).unsqueeze(0)
            target_f = self.model(state).clone().detach()
            target_f[0][action] = target
            self.optimizer.zero_grad()
            output = self.model(state)
            loss = self.criterion(output, target_f)
            loss.backward()
            self.optimizer.step()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [5]:
env = CartPoleEnv()
agent = DQNAgent(state_dim=4, action_dim=2)
episodes = 1000

for e in range(episodes):
    state = env.reset()
    total_reward = 0
    for time in range(200):
        action = agent.act(state)
        next_state, reward, done = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        if done: break
    agent.replay()
    if (e+1)%50 == 0: print(f"Episode {e+1}/{episodes}, Reward: {total_reward}, Epsilon: {agent.epsilon:.4f}")

Episode 50/1000, Reward: 8.0, Epsilon: 0.7862
Episode 100/1000, Reward: 8.0, Epsilon: 0.6119
Episode 150/1000, Reward: 19.0, Epsilon: 0.4762
Episode 200/1000, Reward: 22.0, Epsilon: 0.3707
Episode 250/1000, Reward: 2.0, Epsilon: 0.2885
Episode 300/1000, Reward: 3.0, Epsilon: 0.2245
Episode 350/1000, Reward: 9.0, Epsilon: 0.1748
Episode 400/1000, Reward: -1.0, Epsilon: 0.1360
Episode 450/1000, Reward: 200.0, Epsilon: 0.1059
Episode 500/1000, Reward: 200.0, Epsilon: 0.0824
Episode 550/1000, Reward: 200.0, Epsilon: 0.0641
Episode 600/1000, Reward: 200.0, Epsilon: 0.0499
Episode 650/1000, Reward: 200.0, Epsilon: 0.0388
Episode 700/1000, Reward: 200.0, Epsilon: 0.0302
Episode 750/1000, Reward: 200.0, Epsilon: 0.0235
Episode 800/1000, Reward: 200.0, Epsilon: 0.0183
Episode 850/1000, Reward: 200.0, Epsilon: 0.0143
Episode 900/1000, Reward: 2.0, Epsilon: 0.0111
Episode 950/1000, Reward: 200.0, Epsilon: 0.0100
Episode 1000/1000, Reward: 200.0, Epsilon: 0.0100


In [7]:
def evaluate(agent, env, episodes=10):
    total_rewards = []
    for i in range(episodes):
        state = env.reset()
        total_reward = 0
        lose = False
        while not lose:
            action = agent.act(state)
            state, reward, lose = env.step(action)
            total_reward += reward
        print(f'Episode {i}: {total_reward}')

evaluate(agent, env)

Episode 0: 397.0
Episode 1: 377.0
Episode 2: 306.0
Episode 3: 291.0
Episode 4: 354.0
Episode 5: 345.0
Episode 6: 343.0
Episode 7: 339.0
Episode 8: 299.0
Episode 9: 289.0
