---
title: 'Example Code'
subtitle: '6600 Final Project'
author: Billy McGloin
date: last-modified
date-format: long
format:
  html:
    self-contained: true
    toc: true
    code-overflow: wrap
    code-fold: true
---

# DQN Keras

In [None]:
from keras.layers import Activation, Dense, Conv2D, Flatten
from keras.models import Sequential, load_model
from keras.optimizers import Adam
from keras import backend as KeyError
import numpy as np

class ReplayBuffer(object):
    def __init__(self, max_size, input_shape):
        # Initialize the Replay Buffer with a maximum size and the shape of the input states
        self.mem_size = max_size
        self.mem_cntr = 0  # Counter to manage the next index to store a transition

        # Allocate memory for each component of the environment's interaction
        self.state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)  # stores the states
        self.new_state_memory = np.zeros((self.mem_size, *input_shape), dtype=np.float32)  # stores the next states
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)  # stores the actions taken
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)  # stores the rewards received
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.uint8)  # stores the terminal status of the state (done flag)

    def store_transition(self, state, action, reward, state_, done):
        # Find the next available index in the buffer or overwrite if full
        index = self.mem_cntr % self.mem_size  # Circular buffer
        self.state_memory[index] = state  # Store the state
        self.new_state_memory[index] = state_  # Store the next state after the action
        self.action_memory[index] = action  # Store the action
        self.reward_memory[index] = reward  # Store the reward
        self.terminal_memory[index] = done  # Store the terminal state

        self.mem_cntr += 1  # Increment the counter

    def sample_buffer(self, batch_size):
        # Sample a batch of transitions for training
        max_mem = min(self.mem_cntr, self.mem_size)  # Determine the size of the available memory
        batch = np.random.choice(max_mem, batch_size, replace=False)  # Randomly select indices

        # Extract the sampled information from the buffer
        states = self.state_memory[batch]
        new_states = self.new_state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        dones = self.terminal_memory[batch]

        return states, actions, rewards, new_states, dones  # Return the sampled batch

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten

def build_dqn(lr, n_actions, input_dims, fcl_dims):
    # This function constructs and returns a deep Q-network with specified parameters.
    # lr: learning rate (not used in this function but typically used in compiling the model later)
    # n_actions: the number of possible actions in the environment (size of the output layer)
    # input_dims: dimensions of the state inputs from the environment
    # fcl_dims: dimensions of the fully connected layers
    
    model = Sequential()  # Start with a sequential model
    
    # Add a convolutional layer with 32 filters, a kernel size of 8x8, a stride of 4, and ReLU activation.
    # The input shape should be according to 'channels_first', meaning the channel dimension comes before the spatial dimensions.
    model.add(Conv2D(filters=32, kernel_size=8, strides=4, activation='relu', input_shape=(*input_dims, 4), data_format='channels_first'))
    
    # Add a second convolutional layer with 64 filters, a kernel size of 4x4, a stride of 2, and ReLU activation.
    model.add(Conv2D(filters=64, kernel_size=4, strides=2, activation='relu', data_format='channels_first'))
    
    # Add a third convolutional layer with 64 filters, a kernel size of 3x3, a stride of 1, and ReLU activation.
    model.add(Conv2D(filters=64, kernel_size=3, strides=1, activation='relu', data_format='channels_first'))
    
    # Flatten the output from the convolutional layers to feed it into fully connected layers.
    model.add(Flatten())
    
    # Add a fully connected layer with neurons equal to the first element of fcl_dims and ReLU activation.
    model.add(Dense(fcl_dims[0], activation='relu'))
    
    # Add the output fully connected layer with n_actions neurons. This will output the Q-values for each action.
    model.add(Dense(n_actions))

    model.compile(optimizer=Adam(lr = lr), loss='mean_squared_error')  # Compile the model with Adam optimizer and MSE loss

    return model  # Return the constructed model

In [None]:
import numpy as np

class Agent(object):
    def __init__(self, alpha, gamma, n_actions, epsilon, batch_size, replace, input_dims, eps_dec=1e-5, eps_min=0.01, mem_size=1000000, q_eval_fname='q_eval.h5', q_target_fname='q_target.h5'):
        # Initializes the agent with given parameters and sets up neural networks for action evaluation and target estimation.
        self.action_space = [i for i in range(n_actions)]  # List of all possible actions.
        self.gamma = gamma  # Discount factor for future rewards.
        self.epsilon = epsilon  # Initial probability for taking a random action.
        self.eps_dec = eps_dec  # Rate at which to decrease epsilon.
        self.eps_min = eps_min  # Minimum value for epsilon.
        self.batch_size = batch_size  # Number of experiences to use in each learning step.
        self.replace = replace  # Frequency at which target network weights are replaced with evaluation network weights.
        self.q_target_model_file = q_target_fname  # Path to save the target model.
        self.q_eval_model_file = q_eval_fname  # Path to save the evaluation model.
        self.learn_step = 0  # Counter for steps of learning (used for updating the target network).
        self.memory = ReplayBuffer(mem_size, input_dims)  # Replay buffer for storing experience tuples.
        self.q_eval = build_dqn(alpha, n_actions, input_dims, 512)  # Build the evaluation network.
        self.q_next = build_dqn(alpha, n_actions, input_dims, 512)  # Build the target network.

    def replace_target_network(self):
        # Replaces target network weights with those of the evaluation network if conditions are met.
        if self.replace != 0 and self.learn_step % self.replace == 0:
            self.q_next.set_weights(self.q_eval.get_weights())

    def store_transition(self, state, action, reward, new_state, done):
        # Stores the transition in the replay buffer.
        self.memory.store_transition(state, action, reward, new_state, done)

    def choose_action(self, observation):
        # Chooses an action using an epsilon-greedy strategy.
        if np.random.rand() < self.epsilon:  # Random action with probability epsilon.
            action = np.random.choice(self.action_space)
        else:  # Choose action with highest Q-value.
            state = np.array([observation], copy=False, dtype=np.float32)
            actions = self.q_eval.predict(state)
            action = np.argmax(actions)
        return action

    def learn(self):
        # Learns from a batch of experiences from the replay buffer if enough samples are available.
        if self.memory.mem_cntr > self.batch_size:
            state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
            self.replace_target_network()  # Updates the target network periodically.
            q_eval = self.q_eval.predict(state)
            q_next = self.q_next.predict(new_state)
            q_next[done] = 0.0  # Zero out the values for terminal states.
            indices = np.arange(self.batch_size)
            q_target = q_eval[:]
            q_target[indices, action] = reward + self.gamma * np.max(q_next, axis=1)
            self.q_eval.train_on_batch(state, q_target)
            self.epsilon = max(self.epsilon - self.eps_dec, self.eps_min)  # Decrement epsilon but keep it above the minimum.
            self.learn_step += 1  # Increment the learn step count.

    def save_models(self):
        # Saves the current state of the model weights.
        self.q_eval.save(self.q_eval_model_file)
        self.q_next.save(self.q_target_model_file)
        print('... saving models ...')

    def load_models(self):
        # Loads the model weights from files.
        self.q_eval = load_model(self.q_eval_model_file)
        self.q_next = load_model(self.q_target_model_file)
        print('... loading models ...')

# Utilities

In [None]:
import matplotlib.pyplot as plt

def plotLearning(x, scores, epsilons, filename, window):
    # Create a figure object to hold the plots
    fig = plt.figure()
    # Add a subplot for the epsilon values with label "1"
    ax = fig.add_subplot(111, label="1")
    # Add a second subplot for the scores with label "2", without interfering with the first
    ax2 = fig.add_subplot(111, label="2", frame_on=False)
    
    # Plot epsilon values on the first axis
    ax.plot(x, epsilons, color="C0")
    # Set labels and colors for the first axis (epsilon)
    ax.set_xlabel("Game", color="C0")
    ax.set_ylabel("Epsilon", color="C0")
    ax.tick_params(axis='x', colors="C0")
    ax.tick_params(axis='y', colors="C0")

    # Calculate a running average of the scores with specified window size
    N = len(scores)
    running_avg = np.empty(N)
    for t in range(N):
        running_avg[t] = np.mean(scores[max(0, t-window):(t+1)])

    # Plot the running average of scores on the second axis
    ax2.scatter(x, running_avg, color="C1")
    # Hide the x-axis details for the second subplot and move the y-axis to the right
    ax2.axes.get_xaxis().set_visible(False)
    ax2.yaxis.tick_right()
    ax2.set_ylabel('Score', color="C1")
    ax2.yaxis.set_label_position('right')
    ax2.tick_params(axis='y', colors="C1")

    # Save the plot to a file
    plt.savefig(filename)


def plotLearningNoEpsilons(scores, filename, x=None, window=5):
    # Determine the number of scores to compute the running average
    N = len(scores)
    running_avg = np.empty(N)  # Initialize an array to store the running average of scores
    for t in range(N):
        # Compute the running average using a sliding window approach
        running_avg[t] = np.mean(scores[max(0, t-window):(t+1)])
    
    # If no x values are provided, generate a sequential list starting from 0 to N-1
    if x is None:
        x = [i for i in range(N)]
    
    # Set up labels for the axes
    plt.ylabel('Score')  # Label for the y-axis
    plt.xlabel('Game')   # Label for the x-axis
    
    # Plot the running average of scores against the game number or provided x values
    plt.plot(x, running_avg)
    
    # Save the plot to a file specified by 'filename'
    plt.savefig(filename)

In [None]:
import gym

# SkipEnv is a custom wrapper for the environment that repeats the same action a fixed number of times
# and accumulates the rewards over those steps.
class SkipEnv(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        super(SkipEnv, self).__init__(env)  # Initialize the parent Wrapper class with the environment.
        self._skip = skip  # Number of times to repeat action.

    def step(self, action):
        t_reward = 0.0  # Total reward is initialized to 0.
        done = False  # Initialize 'done' to False.
        # Repeat action 'skip' times and accumulate reward, unless the episode ends.
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            t_reward += reward
            if done:
                break  # If the episode is done, exit the loop.
        return obs, t_reward, done, info  # Return the accumulated reward and last observation.

# PreprocessFrame is a wrapper that preprocesses the observation from the environment
# to a simplified format (grayscale and downscaled).
class PreprocessFrame(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(PreprocessFrame, self).__init__(env)  # Initialize the parent ObservationWrapper class.
        # Set observation space to 80x80 with 1 channel (grayscale).
        self.observation_space = gym.spaces.Box(low=0.0, high=255, shape=(80,80,1), dtype=np.uint8)

    def observation(self, obs):
        # Use the static method 'process' on the observation.
        return PreprocessFrame.process(obs)
    
    @staticmethod
    def process(frame):
        # Convert frame to grayscale and downscale it for easier processing.
        new_frame = np.reshape(frame, frame.shape).astype(np.float32)
        new_frame = 0.299*new_frame[:,:,0] + 0.587*new_frame[:,:,1] + 0.114*new_frame[:,:,2]
        # Downsample by taking only every second pixel.
        new_frame = new_frame[35:195:2, ::2].reshape(80,80,1)
        return new_frame.astype(np.uint8)

# MoveImgChannel is a wrapper that changes the order of the channels in the observation space.
class MoveImgChannel(gym.ObservationWrapper):
    def __init__(self, env):
        super(MoveImgChannel, self).__init__(env)  # Initialize the parent class.
        # Modify observation space to have channels as the first dimension.
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(self.observation_space.shape[-1], self.observation_space.shape[0], self.observation_space.shape[1]), dtype=np.float32)

    def observation(self, observation):
        # Move the last axis of the observation to the first position.
        return np.moveaxis(observation, 2, 0)
    
# ScaleFrame is a wrapper that normalizes pixel values in the observation.
class ScaleFrame(gym.ObservationWrapper):
    def observation(self, obs):
        # Scale the observation by dividing each pixel value by 255.
        return np.array(obs).astype(np.float32) / 255.0

# BufferWrapper is a wrapper that stacks multiple observations to create a temporal buffer.
class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps):
        super(BufferWrapper, self).__init__(env)  # Initialize the parent class.
        # Repeat the observation space for 'n_steps' to create a buffer.
        self.observation_space = gym.spaces.Box(env.observation_space.low.repeat(n_steps, axis=0),
                                                env.observation_space.high.repeat(n_steps, axis=0), dtype=np.float32)

    def reset(self):
        # Create a buffer of zeros with the shape of the observation space.
        self.buffer = np.zeros_like(self.observation_space.low, dtype=np.float32)
        # Initialize the buffer with the first observation.
        return self.observation(self.env.reset())
    
    def observation(self, observation):
        # Update the buffer with new observation at the end, removing the oldest observation.
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer
    
# make_env is a utility function to create an environment with all the wrappers applied.
def make_env(env_name):
    env = gym.make(env_name)  # Create the original environment.
    env = SkipEnv(env)  # Apply SkipEnv wrapper.
    env = PreprocessFrame(env)  # Apply PreprocessFrame wrapper.
    env = MoveImgChannel(env)  # Apply MoveImgChannel wrapper.
    env = BufferWrapper(env, 4)  # Apply BufferWrapper wrapper with a buffer of 4 observations.
    return ScaleFrame(env)  # Return the environment with all wrappers, including scaling the frame values.