In [None]:
import gymnasium
from utils import*
import torch
import torch.nn as nn
import numpy as np
from collections import deque
import random
from tqdm import tqdm
import gc
import matplotlib.pyplot as plt

def simulate(agent=None, env=None, epsilon=0, memory=None, render=False):
    agent.eval()  # Set the agent to evaluation mode.
    env.reset()  # Reset the environment to start a new episode.
    
    if render:
        env.env.render()  # Render the environment if specified.
    
    # Skip initial episodes to focus on the main part of the simulation.
    state, rew, done, info = env.skip_episodes(70, [0, 0.1, 0])
    ep_len = 0
    
    while not done:
        # Decide whether to take a random action or use the agent's policy.
        sample = torch.bernoulli(torch.tensor(epsilon).float())
        if sample == 1:
            A = torch.randint(0, 3, (1,))
        else:
            A = agent.get_action(state)
        
        # Perform the selected action and progress to the next time step.
        next_state, rew, done, info = env.step(agent.convert_action(A, state))
        
        if done:  # If the episode is finished, exit the loop.
            break

        # Store the experience in memory for later learning.
        if memory is not None:
            memory.collect([state, A, rew, next_state])
        state = next_state

        ep_len = env.ep_len
        # Stop the episode if it exceeds the maximum length.
        if ep_len > 2000:
            break

    # Adjust the final reward to account for any penalties or adjustments.
    score = env.real_rew
    if render:
        print("score", score, "ep_len", ep_len)
        env.env.close()
  
    return score, ep_len


def test_model(agent, env, episodes=1):
    rewards = []
    ep_lens = []
    
    # Simulate multiple episodes to test the agent's performance.
    for i in range(episodes):
        rew, ep_len = simulate(agent, env)
        rewards.append(rew)
        ep_lens.append(ep_len)
        print("Test " + str(i+1) + "/" + str(episodes) + ": reward =", rew, " episode len =", ep_len)
    
    # Print the average reward and episode length across all test episodes.
    print("\nAverage Reward =", sum(rewards) / len(rewards), "Average Ep_len =", sum(ep_lens) / len(ep_lens), "\n")
    return rewards, ep_lens


class ExperienceReplay(object):
    def __init__(self, length):
        # Initialize the experience replay buffer with a fixed maximum size.
        self.experience_replay = deque(maxlen=length)

    def collect(self, experience):
        # Add a new experience to the replay buffer.
        self.experience_replay.append(experience)
        return
  
    def sample_from_experience(self, sample_size):
        # Randomly sample a batch of experiences from the replay buffer.
        sample_size = min(sample_size, len(self.experience_replay))
        sample = random.sample(self.experience_replay, sample_size)
        
        # Extract states, actions, rewards, and next states from the sampled experiences.
        state = torch.tensor([episode[0] for episode in sample]).float()
        action = torch.tensor([episode[1] for episode in sample]).float()
        reward = torch.tensor([episode[2] for episode in sample]).float()
        next_state = torch.tensor([episode[3] for episode in sample]).float()

        return state, action, reward, next_state


class DQN_Network_Simple(nn.Module):
    def __init__(self, gamma=None):
        super().__init__()
        
        # Define the layers of the neural network.
        self.LeakyReLU = nn.LeakyReLU()
        self.conv1 = nn.Conv2d(1, 4, kernel_size=7, stride=4, padding=0)
        self.conv2 = nn.Conv2d(4, 8, kernel_size=3, stride=1, padding=2)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(289, 100)
        self.fc2 = nn.Linear(100, 3)
        self.batchnormCNN1 = nn.BatchNorm2d(num_features=4)
        self.batchnormCNN2 = nn.BatchNorm2d(num_features=8)
        self.flatten = nn.Flatten()
        self.gamma = gamma
    
    def forward(self, x):
        # Reformat the input image to the required shape for the network.
        x = torch.from_numpy(np.ascontiguousarray(x)).float()
        if x.dim() == 2:
            x = torch.unsqueeze(x, dim=0)
            x = torch.unsqueeze(x, dim=0)
        elif x.dim() == 3:
            x = torch.unsqueeze(x, dim=1)
        
        # Process the speed information from the image.
        subimage = (x[:, :, 84:96, 13:14] - 0.495) * 10
        speed = torch.sum(subimage, dim=(2, 3))
        x = x[:, :, :84, :]
        
        # Pass the image through the convolutional and pooling layers.
        x = self.LeakyReLU(self.conv1(x))
        x = self.pool(x)
        x = self.LeakyReLU(self.conv2(x))
        x = self.pool(x)
        x = self.flatten(x)
        
        # Concatenate the flattened image features with the speed information.
        x = torch.cat((x, speed), dim=1)
        x = self.LeakyReLU(self.fc1(x))
        x = self.fc2(x)
        
        # Return the output of the network.
        return x
    
    def get_action(self, state):
        # Pass the state through the network to get Q-values and select the best action.
        qvals = self.forward(state)
        return torch.argmax(qvals, 1)
  
    def convert_action(self, action, state):
        # Convert the discrete action to the corresponding control inputs for the environment.
        subimage = (state[84:96, 13:14] - 0.495) * 10
        speed = np.sum(subimage).item()
        
        # Determine the acceleration based on the speed.
        if speed > 3.5:
            accel = 0
        elif speed > 2.5:
            accel = 0
        else:
            accel = 0.1
        
        action = action.item()
        if action == 0:
            return [-0.3, accel, 0]  # Turn left with specified acceleration.
        elif action == 1:
            return [0, accel, 0]  # Go straight with specified acceleration.
        elif action == 2:
            return [0.3, accel, 0]  # Turn right with specified acceleration.


### Training

In [None]:
def load_memory(new, epsilon, exp_replay_size, initial_size=None):
    if initial_size is None:
        initial_size = exp_replay_size

    # Create the environment and the model, select render mode (human\rgb_array)
    env = modified_env(gymnasium.make("CarRacing-v2", render_mode="rgb_array").unwrapped)
    agent = DQN_Network_Simple()
    
    # Load a pre-trained model if it's not a new run
    if not new:
        agent.load_state_dict(torch.load("car-racing-dqn.pth"))
    
    # Initialize the experience replay buffer
    memory = ExperienceReplay(exp_replay_size)

    # Populate the experience replay buffer
    for i in range(exp_replay_size):
        state = env.reset()  # Reset the environment to the initial state
        simulate(agent, env, epsilon=epsilon, memory=memory)  # Simulate the environment and collect experiences
        if len(memory.experience_replay) >= initial_size:  # Stop if the buffer is filled to the initial size
            break
        print(len(memory.experience_replay))  # Print the current size of the replay buffer

    return memory  # Return the populated memory buffer


def plot_results(reward_hist, ep_len_hist):
    plt.figure(figsize=(12, 5))
    
    # Plot the reward progression over episodes
    plt.subplot(1, 2, 1)
    plt.plot(reward_hist, label='Reward')
    plt.xlabel('Episodes')
    plt.ylabel('Reward')
    plt.title('Reward Progression')
    plt.legend()
    
    # Plot the episode length progression over episodes
    plt.subplot(1, 2, 2)
    plt.plot(ep_len_hist, label='Episode Length', color='orange')
    plt.xlabel('Episodes')
    plt.ylabel('Length')
    plt.title('Episode Length Progression')
    plt.legend()
    
    plt.tight_layout()
    plt.show()  # Display the plots


def update(agent, optimizer, loss_func, target_agent, memory, batch_size):
    agent.train()  # Set the agent to training mode
    target_agent.eval()  # Set the target agent to evaluation mode
    
    # Sample a batch of experiences from memory
    state, action, reward, next_state = memory.sample_from_experience(batch_size)
    
    # Compute the Q-values for the current states
    Qvals = agent(state)
    curr_Qval = Qvals[torch.arange(Qvals.size(0)), action.long()]
  
    # Compute the target Q-values using the target agent
    with torch.no_grad():
        next_Qval, indices = torch.max(target_agent(next_state), dim=1)

    # Compute the loss and update the agent
    loss = loss_func(reward + agent.gamma * next_Qval, curr_Qval)
    loss.backward(retain_graph=False)
    optimizer.step()
    optimizer.zero_grad()  # Reset the gradients


def train(new, num_ep, lr_start, epsilon_start, gamma, memory):
    # Initialize the agent with the given discount factor
    agent = DQN_Network_Simple(gamma=gamma)
 
    # Initialize performance tracking lists
    if new:
        reward_hist = []
        ep_len_hist = []
        lr_hist = []
        epsilon_hist = []
    # Load a previously trained model if it's not a new run
    else:
        agent.load_state_dict(torch.load("car-racing-dqn.pth"))

    # Initialize the target agent and other components
    target_agent = DQN_Network_Simple(agent.gamma)  # Target agent for more stable updates
    target_agent.load_state_dict(agent.state_dict())
    env = modified_env(gymnasium.make("CarRacing-v2", render_mode="rgb_array").unwrapped)
    optimizer = torch.optim.SGD(agent.parameters(), lr_start)
    MSELoss = torch.nn.MSELoss()

    # Main training loop
    for ep_num in tqdm(range(0, num_ep)):
        # Update learning rate and epsilon
        lr = lr_start * (0.99042 ** ep_num)
        epsilon = epsilon_start * (0.99424 ** ep_num)

        for param_group in optimizer.param_groups:  # Update the learning rate
            param_group['lr'] = lr

        state, done, losses, ep_len, reward = env.reset(), False, 0, 0, 0
        reward, ep_len = simulate(agent, env, epsilon=epsilon, memory=memory)  # Run a simulation and collect the reward and episode length
  
        # Update the agent multiple times per episode
        for i in range(0, 30):
            update(agent, optimizer, MSELoss, target_agent, memory, batch_size=32)
        target_agent.load_state_dict(agent.state_dict())  # Sync the target agent with the main agent
        gc.collect(generation=2)  # Perform garbage collection to free memory

        if ep_num % 3 == 0:
            # Test the model and track performance
            reward, ep_len = test_model(agent=agent, env=env, episodes=1)
            print("Settings: lr =", lr, "epsilon =", epsilon)
            print("Test Result: reward =", reward[0], "episode length =", ep_len[0])
            reward_hist.append(reward[0])
            ep_len_hist.append(ep_len[0])
            lr_hist.append(lr)
            epsilon_hist.append(epsilon)
    
        if ep_num % 30 == 0:
            # Save the model every 30 episodes
            torch.save(agent.state_dict(), "car-racing-dqn.pth")
    
    return reward_hist, ep_len_hist  # Return the history of rewards and episode lengths


In [None]:
memory = load_memory(new=True,epsilon=1,exp_replay_size=500)
reward_hist, ep_len_hist = train(new=True,num_ep = 10,lr_start=0.008,epsilon_start=0.8,gamma=0.94,memory=memory)
plot_results(reward_hist, ep_len_hist)

In [None]:
plot_results(reward_hist, ep_len_hist)

In [None]:
env = modified_env(gymnasium.make("CarRacing-v2", render_mode="rgb_array").unwrapped)
agent = DQN_Network_Simple()
agent.load_state_dict(torch.load("car-racing-dqn.pth"))

simulate(agent=agent,env=env,render=True)