In [17]:
import ale_py
# if using gymnasium
import shimmy
import gymnasium as gym
import torch
from torch import nn
from torch.utils.data import DataLoader, random_split, Dataset
from torch.nn import functional as F
from torchvision.datasets import MNIST
from torchvision import datasets, transforms
import torchvision.transforms.functional as TF
import os
import random
import copy
import time
import pickle
import numpy as np
from torch.distributions import Categorical
from torchviz import make_dot, make_dot_from_trace

1. Initilize model

In [18]:
class NeuralNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        self.training = False
        self.convStack = torch.nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2,stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2,stride=2),
            nn.ReLU(),
            nn.Conv2d(64,64, kernel_size=2, stride=1, padding=0),
            nn.MaxPool2d(kernel_size=3,stride=3),
            nn.ReLU(), # 128x7x7
            )
        
        self.actor_head = torch.nn.Sequential(
            nn.Linear(2304, 100),
            nn.ReLU(),
            nn.Linear(100, 5),
            nn.Softmax()
        )

        self.critic_head = torch.nn.Sequential(
            nn.Linear(2304, 25),
            nn.ReLU(),
            nn.Linear(25, 1),
        )
        
        
        
    def forward(self, x):
        x = self.convStack(x)
        if self.training:
            x = torch.flatten(x, start_dim = 1)
        if not self.training:
            x = torch.flatten(x, start_dim = 0)
        return self.actor_head(x), self.critic_head(x)
    
    
    def calculate_loss(self, G, V, A ):
        advantage = G - V
        vector = -torch.mul(A.unsqueeze(dim=1),advantage)
        loss_actor = vector.sum()
        loss_critic = F.smooth_l1_loss(V, G, reduction='sum')
        return loss_actor, loss_critic
        
        
        

In [21]:
class ActorCritic:
    """
    A class representing an Actor-Critic model.
    """

    def __init__(self, gamma):
        """
        Initializes the ActorCritic object.

        Args:
            gamma (float): The discount factor for calculating expected returns.
        """
        self.gamma = gamma
        self.values = []
        self.actions = []
        self.rewards = []
        self.device = (
            "cuda"
            if torch.cuda.is_available()
            else "mps"
            if torch.backends.mps.is_available()
            else "cpu"
        )
        print(f"Using {self.device} device")
        self.nn_model = NeuralNetwork().to(self.device)
        self.optimizer = torch.optim.Adam(self.nn_model.parameters(), lr=0.001)

    def act(self, state):
        """
        Selects an action based on the current state.

        Args:
            state: The current state of the environment.

        Returns:
            tuple: A tuple containing the selected action, its log probability, and the predicted value for the state.
        """
        state = torch.tensor(state).to(self.device)
        action_probs, value = self.nn_model(state)
        m = Categorical(action_probs)
        action = m.sample()
        action_prob = m.log_prob(action)
        return action.item(), action_prob, value

    def train_step(self):
        """
        Performs a single training step on the neural network.
        """
        V = torch.stack(self.values).to(self.device)
        A = torch.stack(self.actions).to(self.device)
        G = self.expected_return().to(self.device)
        self.optimizer.zero_grad()
        actor_loss, critic_loss = self.nn_model.calculate_loss(G, V, A)
        loss = actor_loss + critic_loss
        loss.backward()
        self.optimizer.step()

    def save_data(self, action_prob, value, reward):
        """
        Saves the data from a single step in the training process.

        Args:
            action_prob: The log probability of the selected action.
            value: The predicted value for the state.
            reward: The reward received for the action.
        """
        self.actions.append(action_prob)
        self.values.append(value)
        self.rewards.append(reward)

    def reset(self):
        """
        Resets the values, actions, and rewards lists.
        """
        self.values = []
        self.actions = []
        self.rewards = []

    def expected_return(self):
        """
        Calculates the expected return (discounted rewards) for the current episode.

        Returns:
            torch.Tensor: The expected return values, normalized by subtracting the mean and dividing by the standard deviation.
        """
        dis_reward = 0
        g_array = []
        for reward in reversed(self.rewards):
            dis_reward = reward + self.gamma * dis_reward
            g_array.insert(0, dis_reward)
        g = torch.tensor(g_array).unsqueeze(dim=1)
        eps = np.finfo(np.float32).eps.item()
        g = (g - g.mean()) / (g.std()+eps)
        return g 

In [22]:
env = gym.make("ALE/Pacman-v5",render_mode="rgb_array")
#preprocess environment
env = gym.wrappers.AtariPreprocessing(env, screen_size=84, grayscale_obs=True, frame_skip=1, noop_max=30, scale_obs = True)
torch.manual_seed(543)
max_episodes = 10000
max_steps = 9999
model = ActorCritic(gamma=0.99)
for episode in range(max_episodes): 
    state, _ = env.reset()
    episode_reward = 0
    states = []
    for step in range(1, max_steps):
        if step <= 4:
            action = env.action_space.sample()
            state, reward, done, _, _ = env.step(action) 
            states.append(state)
        else:
            action, action_prob, value = model.act(states)
            state, reward, done, _, _ = env.step(action) 
            states.pop(0)
            states.append(state)
            model.save_data(action_prob,value,reward)
        episode_reward += reward
        if done:
            observation, info = env.reset()
            print(f"Episode {episode} ended. Episode reward: {episode_reward} ")
            break
        if step == max_steps - 1:
            print("Max steps reached.")
    model.train_step()
    model.reset()
env.close()

Using cpu device
Episode 0 ended. Episode reward: 9.0 


KeyboardInterrupt: 

In [None]:
torch.save(model.nn_model.state_dict(), "ac_model.pt")