In [20]:
import gym
import torch
from collections import deque
import random

import numpy as np

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [21]:
# Define the neural network model
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Define the DQN agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)
        self.batch_size = 64
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.model = DQN(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            with torch.no_grad():
                state = torch.FloatTensor(state).unsqueeze(0)
                q_values = self.model(state)
                return torch.argmax(q_values).item()

    def replay(self):
        # if memory is less than batch_size, do nothing
        batch_size = self.batch_size
        if len(self.memory) < batch_size:
            return
        # Select a random batch of experiences from the memory
        minibatch = random.sample(self.memory, batch_size)

        # Loop through each experience in the minibatch
        for state, action, reward, next_state, done in minibatch:
            # convert state, next_state to tensors
            state = torch.FloatTensor(state)
            next_state = torch.FloatTensor(next_state)
            action = torch.LongTensor([action])
            reward = torch.FloatTensor([reward])
            done = torch.FloatTensor([done])

            # If the episode has ended, the target is just the reward
            target = reward

            # If the episode has not ended, calculate the future discounted reward
            # Bellman equation: Q(s,a) = r + gamma * max Q(s',a')
            # reward = reward + gamma * (max Q value of the next state)
            if not done:
                target = reward + self.gamma * torch.max(self.model(next_state))

            # Get the predicted Q values
            #current_prediction = self.model(state)[action]
            # Get the predicted Q values
            current_prediction = self.model(state).gather(1, action.unsqueeze(-1))
            # Calculate the loss function between the predicted Q values and the target
            loss = F.mse_loss(current_prediction, target)

            # Zero the gradients
            self.optimizer.zero_grad()
            # Backpropagate the loss (calculate the gradients)
            loss.backward()
            # Update the weights (adam step)
            self.optimizer.step()


        # If epsilon is greater than epsilon_minimum, decay it
        if self.epsilon > self.epsilon_min:
            # epsilon = epsilon * decay
            self.epsilon *= self.epsilon_decay


In [22]:
# Create the environment
env = gym.make('MountainCar-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Create the DQN agent
agent = DQNAgent(state_size, action_size)


In [23]:
# Train the agent
num_episodes = 1000
for episode in range(num_episodes):
    state = env.reset()
    state = np.reshape(state[0], [1, state[0].shape[0]])
    done = False
    total_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, _, info = env.step(action)
        next_state = np.reshape(next_state, [1, next_state.shape[0]])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

    agent.replay()

    print(f"Episode: {episode+1}, Total Reward: {total_reward}")

  if not isinstance(terminated, (bool, np.bool8)):
  loss = F.mse_loss(current_prediction, target)


Episode: 1, Total Reward: -4844.0
Episode: 2, Total Reward: -35559.0
Episode: 3, Total Reward: -7068.0
Episode: 4, Total Reward: -68569.0
Episode: 5, Total Reward: -61958.0
Episode: 6, Total Reward: -26423.0
Episode: 7, Total Reward: -9480.0
Episode: 8, Total Reward: -105199.0
Episode: 9, Total Reward: -1275.0
Episode: 10, Total Reward: -222038.0
Episode: 11, Total Reward: -69785.0
Episode: 12, Total Reward: -160119.0
Episode: 13, Total Reward: -18231.0
Episode: 14, Total Reward: -15366.0
Episode: 15, Total Reward: -283996.0
Episode: 16, Total Reward: -16175.0
Episode: 17, Total Reward: -30889.0
Episode: 18, Total Reward: -140800.0
Episode: 19, Total Reward: -108009.0
Episode: 20, Total Reward: -1152366.0
Episode: 21, Total Reward: -126441.0
Episode: 22, Total Reward: -76751.0
Episode: 23, Total Reward: -31184.0
Episode: 24, Total Reward: -273106.0
Episode: 25, Total Reward: -22043.0
Episode: 26, Total Reward: -344877.0
Episode: 27, Total Reward: -79995.0
Episode: 28, Total Reward: -73

KeyboardInterrupt: 

In [None]:
# Test the agent
state = env.reset()
state = np.reshape(state[0], [1, state[0].shape[0]])
done = False
total_reward = 0

while not done:
    action = agent.act(state)
    next_state, reward, done, _, info = env.step(action)
    next_state = np.reshape(next_state, [1, next_state.shape[0]])
    state = next_state
    total_reward += reward


In [None]:
print(f"Test Total Reward: {total_reward}")