In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from PIL import Image

In [None]:
#define the DQN Network
class DQN(nn.Module):
    def __init__(self, action_dim):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(7*7*64, 512)
        self.fc2 = nn.Linear(512, action_dim)
    
    def forward(self, x):
        x = x.squeeze(1)  #Convert from [batch_size, 1, channels, height, width] to [batch_size, channels, height, width]
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  #Flatten the tensor
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
#Set up the Space Invaders environment
env = gym.make('SpaceInvaders-v0')
#Number of possible actions
action_dim = env.action_space.n


In [None]:
#Initialize primary and target DQN networks
dqn = DQN(action_dim)
target_dqn = DQN(action_dim)
#Copy weights from the primary network to the target network
target_dqn.load_state_dict(dqn.state_dict())


In [None]:
#Define hyperparameters

#set up the Adam optimizer with a learning rate of 0.00025
optimizer = optim.Adam(dqn.parameters(), lr=0.00025)
#define the loss function 
loss_fn = nn.SmoothL1Loss()

#Discount factor for future rewards
gamma = 0.99
#Initial epsilon (exploration rate)
epsilon = 1.0
#Epsilon decay rate after each episode
epsilon_decay = 0.995
#Exploration progression rate
epsilon_min = 0.1
#Memory buffer to store experiences
memory = []
#Maximum number of experiences to keep in memory
max_memory = 10000
batch_size = 32
episodes = 500

In [None]:
#Function to preprocess the image
def preprocess(image):
    #Convert image to grayscale
    image = Image.fromarray(image).convert('L')
    image = image.resize((84, 84))
    #Convert to numpy array and normalize the pixel values
    image = np.array(image) / 255.0
    return image

In [None]:
#Function to select an action using epsilon-greedy policy
def select_action(state, epsilon):
    # Random action with exploration probability
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        #Else, rely on exploitation (experience)
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        with torch.no_grad():
            return dqn(state).argmax().item()

In [None]:
#Function to train the DQN using mini-batch from the memory buffer
def train():
    # Skip training if there are not enough experiences
    if len(memory) < batch_size:
        return
    
    #Sample a mini-batch of experiences
    batch = random.sample(memory, batch_size)
    #Unpack the batch
    state_batch, action_batch, reward_batch, next_state_batch, done_batch = zip(*batch)
    
    #Convert batch to PyTorch tensors
    state_batch = torch.FloatTensor(state_batch).to(device)
    action_batch = torch.LongTensor(action_batch).unsqueeze(1).to(device)
    reward_batch = torch.FloatTensor(reward_batch).to(device)
    next_state_batch = torch.FloatTensor(next_state_batch).to(device)
    done_batch = torch.FloatTensor(done_batch).to(device)
    
    #Calculate Q-values for the current state-action pairs
    q_values = dqn(state_batch).gather(1, action_batch)
    #Calculate Q-values for the next state using the target network
    next_q_values = target_dqn(next_state_batch).max(1)[0].unsqueeze(1)
    
    #Compute the target Q-value using the Bellman equation
    expected_q_values = reward_batch + gamma * next_q_values * (1 - done_batch)

    #Calculate the loss between the current Q-value and the target Q-value
    loss = loss_fn(q_values, expected_q_values.detach())
    #Zero the gradients for the optimizer
    optimizer.zero_grad()
    #Backpropagate the loss
    loss.backward()
    #Update the network weights
    optimizer.step()

In [3]:
#GPU Boost
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dqn.to(device)
target_dqn.to(device)

state_buffer = []

#Main training loop
for episode in range(episodes):
    reset_result = env.reset()
    #Handle reset return depending on its type (tuple or single value)
    if isinstance(reset_result, tuple):
        state = reset_result[0]
    else:
        state = reset_result
    
    state_buffer.clear()
    processed_frame = preprocess(state)
    state_buffer = [processed_frame] * 4  #Initialize with 4 frames

    total_reward = 0 
    
    for t in range(10000):  # number of steps per episode
        #Stack the state buffer and reshape it for the model
        stacked_state = np.stack(state_buffer, axis=0)
        stacked_state = np.expand_dims(stacked_state, axis=0)  # Add batch dimension
        
        #Remove the extra dimension
        stacked_state = stacked_state.squeeze(0)

        #Select an action based on the current state and epsilon
        action = select_action(stacked_state, epsilon)
        
        #Perform the action and observe the next state, reward, and done flag
        next_step_result = env.step(action)
        if isinstance(next_step_result, tuple):
            if len(next_step_result) == 5:
                next_state, reward, done, truncated, info = next_step_result
                done = done or truncated
            else:
                next_state, reward, done, info = next_step_result
        else:
            next_state, reward, done, info = next_step_result

        processed_next_frame = preprocess(next_state)
        #Update the state buffer with the new frame
        next_state_buffer = state_buffer[1:] + [processed_next_frame]
        
        #Store the experience in memory
        memory.append((np.stack(state_buffer, axis=0), action, reward, np.stack(next_state_buffer, axis=0), done))
        #Remove the oldest experience if memory exceeds the maximum size
        if len(memory) > max_memory:
            memory.pop(0)
        
        train()
        
        #Update the current state buffer
        state_buffer = next_state_buffer
        total_reward += reward
        if done:
            break
    
    #Decay epsilon to reduce exploration over time
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay
    
    #Update the target network with the primary network weights every 10 episodes
    if episode % 10 == 0:
        target_dqn.load_state_dict(dqn.state_dict())
    
    print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {epsilon:.4f}")
env


  return F.smooth_l1_loss(input, target, reduction=self.reduction, beta=self.beta)


Episode: 0, Total Reward: 125.0, Epsilon: 0.9950
Episode: 1, Total Reward: 260.0, Epsilon: 0.9900
Episode: 2, Total Reward: 125.0, Epsilon: 0.9851
Episode: 3, Total Reward: 105.0, Epsilon: 0.9801
Episode: 4, Total Reward: 210.0, Epsilon: 0.9752
Episode: 5, Total Reward: 290.0, Epsilon: 0.9704
Episode: 6, Total Reward: 180.0, Epsilon: 0.9655
Episode: 7, Total Reward: 50.0, Epsilon: 0.9607
Episode: 8, Total Reward: 270.0, Epsilon: 0.9559
Episode: 9, Total Reward: 240.0, Epsilon: 0.9511
Episode: 10, Total Reward: 80.0, Epsilon: 0.9464
Episode: 11, Total Reward: 180.0, Epsilon: 0.9416
Episode: 12, Total Reward: 285.0, Epsilon: 0.9369
Episode: 13, Total Reward: 45.0, Epsilon: 0.9322
Episode: 14, Total Reward: 35.0, Epsilon: 0.9276
Episode: 15, Total Reward: 505.0, Epsilon: 0.9229
Episode: 16, Total Reward: 135.0, Epsilon: 0.9183
Episode: 17, Total Reward: 135.0, Epsilon: 0.9137
Episode: 18, Total Reward: 165.0, Epsilon: 0.9092
Episode: 19, Total Reward: 155.0, Epsilon: 0.9046
Episode: 20, T

KeyboardInterrupt: 