# A3C for Kung Fu

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip3 install gymnasium
!pip3 install "gymnasium[atari, accept-rom-license]"
!sudo apt-get install -y swig
!pip3 install gym[box2d]

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1
Collecting shimmy[atari]<1.0,>=0.1.0 (from gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl (25 kB)
Collecting autorom[accept-rom-license]~=0.4.2 (from gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2->gymnasium[accept-rom-license,atari])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m11.0 

### Importing the libraries

In [None]:
import math
import random
import tqdm
import torch.optim as optim
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Network(nn.Module):
    def __init__(self, action_size):
        super(Network, self).__init__()
        # Define convolutional layers
        self.conv1 = nn.Conv2d(in_channels=4, out_channels=32, kernel_size=(3, 3), stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), stride=2)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), stride=2)

        # Define fully connected layers
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(512, 128)
        self.fc2a = nn.Linear(128, action_size)
        self.fc2s = nn.Linear(128, 1)

    def forward(self, state):
        # Forward pass through convolutional layers
        x = F.relu(self.conv1(state))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Flatten the output
        x = self.flatten(x)

        # Forward pass through fully connected layers
        x = F.relu(self.fc1(x))

        # Output action values and state value
        action_values = self.fc2a(x)
        state_value = self.fc2s(x)

        return action_values, state_value


## Part 2 - Training the AI

### Setting up the environment

In [None]:
import cv2
import gymnasium as gym
import numpy as np
from gymnasium import ObservationWrapper
from gymnasium.spaces import Box

class PreprocessAtari(ObservationWrapper):
    def __init__(self, env, height=42, width=42, crop=lambda img: img, dim_order='pytorch', color=False, n_frames=4):
        super(PreprocessAtari, self).__init__(env)
        self.img_size = (height, width)
        self.crop = crop
        self.dim_order = dim_order
        self.color = color
        self.frame_stack = n_frames
        n_channels = 3 * n_frames if color else n_frames
        obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
        self.observation_space = Box(0.0, 1.0, obs_shape)
        self.frames = np.zeros(obs_shape, dtype=np.float32)

    def reset(self):
        self.frames = np.zeros_like(self.frames)
        obs, info = self.env.reset()
        self._update_buffer(obs)
        return self.frames, info

    def observation(self, img):
        img = self.crop(img)
        img = cv2.resize(img, self.img_size)
        if not self.color:
            if len(img.shape) == 3 and img.shape[2] == 3:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = img.astype('float32') / 255.
        if self.color:
            self.frames = np.roll(self.frames, shift=-3, axis=0)
        else:
            self.frames = np.roll(self.frames, shift=-1, axis=0)
        if self.color:
            self.frames[-3:] = img
        else:
            self.frames[-1] = img
        return self.frames

    def _update_buffer(self, obs):
        self.frames = self.observation(obs)

def make_env():
    env = gym.make("KungFuMasterDeterministic-v0", render_mode='rgb_array')
    env = PreprocessAtari(env, height=42, width=42, crop=lambda img: img, dim_order='pytorch', color=False, n_frames=4)
    return env

env = make_env()

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape)
print("Number of actions:", number_actions)
print("Action names:", env.env.env.get_action_meanings())


  logger.deprecation(


State shape: (4, 42, 42)
Number of actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


  logger.warn(


### Initializing the hyperparameters

In [None]:
learning_rate = 1e-4  # The learning rate for training the model
discount_factor = 0.99  # Discount factor for future rewards in reinforcement learning
number_environments = 10  # Number of parallel environments to run in parallel for training

### Implementing the A3C class

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

class Agent():
    def __init__(self, action_size):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.action_size = action_size
        self.network = Network(action_size).to(self.device)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)

    def act(self, state):
        if state.ndim == 3:
            state = [state]
        state = torch.tensor(state, dtype=torch.float32, device=self.device)
        action_values, _ = self.network(state)
        policy = F.softmax(action_values, dim=-1)
        return np.array([np.random.choice(len(p), p=p) for p in policy.detach().cpu().numpy()])

    def step(self, state, action, reward, next_state, done):
        batch_size = state.shape[0]
        state = torch.tensor(state, dtype=torch.float32, device=self.device)
        next_state = torch.tensor(next_state, dtype=torch.float32, device=self.device)
        reward = torch.tensor(reward, dtype=torch.float32, device=self.device)
        done = torch.tensor(done, dtype=torch.bool, device=self.device).to(dtype=torch.float32)
        action_values, state_value = self.network(state)
        _, next_state_value = self.network(next_state)
        target_state_value = reward + discount_factor * next_state_value * (1 - done)
        advantage = target_state_value - state_value
        probs = F.softmax(action_values, dim=-1)
        logprobs = F.log_softmax(action_values, dim=-1)
        entropy = -torch.sum(probs * logprobs, axis=-1)
        batch_idx = np.arange(batch_size)
        logp_actions = logprobs[batch_idx, action]
        actor_loss = -(logp_actions * advantage.detach()).mean() - 0.001 * entropy.mean()
        critic_loss = F.mse_loss(target_state_value.detach(), state_value)
        total_loss = actor_loss + critic_loss
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()


### Initializing the A3C agent

In [None]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a certain number of episodes

In [None]:
def evaluate(agent, env, num_episodes=1):
    episode_rewards = []
    for _ in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        while True:
            action = agent.act(state)
            state, reward, done, info, _ = env.step(action[0])
            total_reward += reward
            if done:
                break
        episode_rewards.append(total_reward)
    return episode_rewards


  and should_run_async(code)


### Managing multiple environments simultaneously

In [None]:
class EnvBatch:
    def __init__(self, num_envs=10):
        self.envs = [make_env() for _ in range(num_envs)]

    def reset(self):
        states = []
        for env in self.envs:
            state, _ = env.reset()
            states.append(state)
        return np.array(states)

    def step(self, actions):
        next_states, rewards, dones, infos = [], [], [], []
        for env, action in zip(self.envs, actions):
            next_state, reward, done, info, _ = env.step(action)
            next_states.append(next_state)
            rewards.append(reward)
            dones.append(done)
            infos.append(info)
        next_states = np.array(next_states)
        rewards = np.array(rewards)
        dones = np.array(dones)
        infos = np.array(infos)
        for i, done in enumerate(dones):
            if done:
                next_states[i], _ = self.envs[i].reset()
        return next_states, rewards, dones, infos


### Training the A3C agent

In [None]:
env_batch = EnvBatch(number_environments)
batch_states = env_batch.reset()

with tqdm.trange(0, 3001) as progress_bar:
  for i in progress_bar:
    batch_actions = agent.act(batch_states)
    batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions)
    batch_rewards *= 0.01
    agent.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)
    batch_states = batch_next_states
    if i % 1000 == 0:
      print("Average agent reward: ", np.mean(evaluate(agent, env, num_episodes = 10)))

  critic_loss = F.mse_loss(target_state_value.detach(), state_value)
  state = torch.tensor(state, dtype=torch.float32, device=self.device)
  0%|          | 8/3001 [00:30<2:20:30,  2.82s/it] 

Average agent reward:  340.0


 34%|███▎      | 1008/3001 [01:19<36:00,  1.08s/it]

Average agent reward:  740.0


 67%|██████▋   | 2008/3001 [02:15<18:55,  1.14s/it]

Average agent reward:  1610.0


100%|██████████| 3001/3001 [03:02<00:00, 16.48it/s]

Average agent reward:  850.0





## Part 3 - Visualizing the results

In [None]:
def show_video_of_model(agent, env):
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action[0])
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

  logger.warn(
  self.pid = _posixsubprocess.fork_exec(
