In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
import random
import numpy
import gym
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import namedtuple
import math
import matplotlib.pyplot as plt

In [10]:
"""
In this file, you may edit the hyperparameters used for different environments.

memory_size: Maximum size of the replay memory.
n_episodes: Number of episodes to train for.
batch_size: Batch size used for training DQN.
target_update_frequency: How often to update the target network.
train_frequency: How often to train the DQN.
gamma: Discount factor.
lr: Learning rate used for optimizer.
eps_start: Starting value for epsilon (linear annealing).
eps_end: Final value for epsilon (linear annealing).
anneal_length: How many steps to anneal epsilon for.
n_actions: The number of actions can easily be accessed with env.action_space.n, but we do
    some manual engineering to account for the fact that Pong has duplicate actions.
"""

# Hyperparameters for CartPole-v0
CartPole = {
    'memory_size': 50000,
    'n_episodes': 10000,
    'batch_size': 32,
    'target_update_frequency': 100,
    'train_frequency': 1,
    'gamma': 0.95,
    'lr': 1e-4,
    'eps_start': 1.0,
    'eps_end': 0.05,
    'anneal_length': 10**4,
    'n_actions': 2,
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def preprocess(obs, env):
    """Performs necessary observation preprocessing."""
    if env in ['CartPole-v0']:
        return torch.tensor(obs, device=device).float()
    else:
        raise ValueError('Please add necessary observation preprocessing instructions to preprocess() in utils.py.')


In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Store Transitions
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def __len__(self):
        return len(self.memory)

    def push(self, obs, action, next_obs, reward):
        if len(self.memory) < self.capacity:
            self.memory.append(None)

        self.memory[self.position] = (obs, action, next_obs, reward)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        """
        Samples batch_size transitions from the replay memory and returns a tuple
            (obs, action, next_obs, reward)
        """
        sample = random.sample(self.memory, batch_size)
        return tuple(zip(*sample))


steps_done = 0


class DQN(nn.Module):
    def __init__(self, env_config):
        super(DQN, self).__init__()

        # Save hyperparameters needed in the DQN class.
        self.batch_size = env_config["batch_size"]
        self.gamma = env_config["gamma"]
        self.eps_start = env_config["eps_start"]
        self.eps_end = env_config["eps_end"]
        self.anneal_length = env_config["anneal_length"]
        self.n_actions = env_config["n_actions"]
        self.steps_done = 0

        self.fc1 = nn.Linear(4, 256)
        self.fc2 = nn.Linear(256, self.n_actions)

        self.relu = nn.ReLU()
        self.flatten = nn.Flatten()

    def forward(self, x):
        """Runs the forward pass of the NN depending on architecture."""
        x = self.relu(self.fc1(x))
        x = self.fc2(x)

        return x

    def act(self, observation, exploit=False):
        """Selects an action with an epsilon-greedy exploration strategy."""
        # TODO: Implement action selection using the Deep Q-network. This function
        #       takes an observation tensor and should return a tensor of actions.
        #       For example, if the state dimension is 4 and the batch size is 32,
        #       the input would be a [32, 4] tensor and the output a [32, 1] tensor.
        # TODO: Implement epsilon-greedy exploration.
        global steps_done
        if exploit:
            with torch.no_grad():
                return self.forward(observation[0]).max(0)[1]
        action = []
        eps_threshold = self.eps_end + (self.eps_start - self.eps_end) * math.exp(-1. * steps_done / 200)
        steps_done += 1

        for state in observation:
            if random.random() <= eps_threshold:
                action_index = random.randrange(0, self.n_actions)
                action.append(action_index)
            else:
                action.append(self.forward(state).max(0)[1].item())

        return torch.tensor(action)


def optimize(dqn, target_dqn, memory, optimizer):
    """This function samples a batch from the replay buffer and optimizes the Q-network."""
    # If we don't have enough transitions stored yet, we don't train.
    if len(memory) < dqn.batch_size:
        return

    # TODO: Sample a batch from the replay memory and concatenate so that there are
    #       four tensors in total: observations, actions, next observations and rewards.
    #       Remember to move them to GPU if it is available, e.g., by using Tensor.to(device).
    #       Note that special care is needed for terminal transitions!

    transitions = memory.sample(dqn.batch_size)
    Transition = namedtuple('Transition',
                            ('obs', 'action', 'next_obs', 'reward'))
    batch = Transition(*transitions)
    non_final_mask = torch.tensor(tuple(map(lambda s: not isinstance(s, numpy.ndarray),
                                            batch.next_obs)), device=device, dtype=torch.bool)

    non_final_next_states = torch.cat([s for s in batch.next_obs if not isinstance(s, numpy.ndarray)])
    state_batch = torch.cat(batch.obs).to(device)
    action_batch = torch.cat(batch.action).to(device)
    reward_batch = torch.cat(batch.reward).to(device)

    # TODO: Compute the current estimates of the Q-values for each state-action
    #       pair (s,a). Here, torch.gather() is useful for selecting the Q-values
    #       corresponding to the chosen actions.
    q_values = dqn(state_batch).gather(1, action_batch.unsqueeze(1))

    # TODO: Compute the Q-value targets. Only do this for non-terminal transitions!

    next_state_values = torch.zeros(dqn.batch_size, device=device)
    next_state_values[non_final_mask] = target_dqn(non_final_next_states).detach().max(1)[0]

    # Compute the expected Q values
    q_value_targets = (next_state_values * dqn.gamma) + reward_batch
    # Compute loss.
    loss = F.mse_loss(q_values.squeeze(), q_value_targets)

    # Perform gradient descent.
    optimizer.zero_grad()

    loss.backward()
    optimizer.step()

    return loss.item()


In [12]:
def evaluate_policy(dqn, env, env_config, args, n_episodes, render=False, verbose=False):
    """Runs {n_episodes} episodes to evaluate current policy."""
    total_return = 0

    for i in range(n_episodes):
        obs = preprocess(env.reset(), env=args["env"]).unsqueeze(0)

        done = False
        episode_return = 0

        while not done:
            if render:
                env.render()

            action = dqn.act(obs, exploit=True).item()

            obs, reward, done, info = env.step(action)
            obs = preprocess(obs, env=args["env"]).unsqueeze(0)

            episode_return += reward
        
        total_return += episode_return
        
        if verbose:
            print(f'Finished episode {i+1} with a total return of {episode_return}')

    
    return total_return / n_episodes

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

args = { "evaluate_freq": 25, "evaluation_episodes":5, "env":'CartPole-v0' }


# Hyperparameter configurations for different environments. See config.py.
ENV_CONFIGS = {
    'CartPole-v0': CartPole
}


# Initialize environment and config.
env = gym.make(args["env"]).unwrapped
env_config = ENV_CONFIGS["CartPole-v0"]

# Initialize deep Q-networks.
dqn = DQN(env_config=env_config).to(device)
target_dqn = DQN(env_config=env_config).to(device)
target_dqn.load_state_dict(dqn.state_dict())
target_dqn.eval()

# Create replay memory.
memory = ReplayMemory(env_config['memory_size'])

# Initialize optimizer used for training the DQN. We use Adam rather than RMSProp.
optimizer = torch.optim.Adam(dqn.parameters(), lr=env_config['lr'])

# Keep track of best evaluation mean return achieved so far.
best_mean_return = -float("Inf")

mean_return_history = []

for episode in range(env_config['n_episodes']):
    done = False

    obs = preprocess(env.reset(), env=args["env"]).unsqueeze(0)

    while not done:

        action = dqn.act(obs)

        # Act in the true environment.
        obs_old = obs
        obs, reward, done, info = env.step(action.item())
        # Preprocess incoming observation.
        if not done:
            obs = preprocess(obs, env=args["env"]).unsqueeze(0)
        # TODO: Add the transition to the replay memory. Remember to convert
        #       everything to PyTorch tensors!
        reward = torch.tensor([reward], device=device)
        memory.push(obs_old, action, obs, reward)

        # TODO: Run DQN.optimize() every env_config["train_frequency"] steps.
        if episode % env_config["train_frequency"] == 0:
            optimize(dqn, target_dqn, memory, optimizer)

        # TODO: Update the target network every env_config["target_update_frequency"] steps.
        if episode % env_config["target_update_frequency"] == 0:
            target_dqn.load_state_dict(dqn.state_dict())

    # Evaluate the current agent.
    if episode % args["evaluate_freq"] == 0:
        mean_return = evaluate_policy(dqn, env, env_config, args, n_episodes=5)
        mean_return_history.append(min(mean_return, 500))

        if episode % (args["evaluate_freq"]*40) == 0:
            f = plt.figure()
            plt.plot(range(0, len(mean_return_history) * args["evaluate_freq"], args["evaluate_freq"]), mean_return_history)
            plt.axhline(y=200, color='r', linestyle='-')
            plt.xlabel("Episode")
            plt.ylabel("Mean return")
            plt.title("Mean return over episodes")
            f.savefig(f'/content/drive/My Drive/Reinforcement_Learning_Atari/models/{args["env"]}_model_tuf_{env_config["target_update_frequency"]}_episode_{episode}.png')

        print(f'Episode {episode}/{env_config["n_episodes"]}: {mean_return}')

        # Save current agent if it has the best performance so far.
        if mean_return >= best_mean_return:
            best_mean_return = mean_return

            print('Best performance so far! Saving model.')
            #torch.save(dqn, f'models/x_best.pt')
            with open(f'/content/drive/My Drive/Reinforcement_Learning_Atari/models/{args["env"]}_model_tuf_{env_config["target_update_frequency"]}.pt', 'w') as f:
                f.write('dqn')

f = plt.figure()
plt.plot(range(0, len(mean_return_history) * args["evaluate_freq"], args["evaluate_freq"]), mean_return_history)
plt.axhline(y=200, color='r', linestyle='-')
plt.xlabel("Episode")
plt.ylabel("Mean return")
plt.title("Mean return over episodes")
f.savefig(f'/content/drive/My Drive/Reinforcement_Learning_Atari/models/{args["env"]}_model_tuf_{env_config["target_update_frequency"]}_episode_{episode}.png')

# Close environment after training is completed.
env.close()
