<a href="https://colab.research.google.com/github/vsoni03/AI-projects/blob/main/Personal_A3C_for_Kung_Fu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A3C for Kung Fu

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 20 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 2s (673 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 124926 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubunt

### Importing the libraries

In [None]:
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import ale_py
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium import ObservationWrapper

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):
  def __init__(self, action_size):
    # inheritance from network class
    super(Network, self).__init__()
    # convulation layer input channels 4, output 32, and others
    self.conv1 = torch.nn.Conv2d(in_channels = 4, out_channels = 32, kernel_size = (3,3), stride = 2)
    self.conv2 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
    # doesnt have to increase the input and output size
    self.conv3 = torch.nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = (3,3), stride = 2)
    # don't have batch norm layers bc they do not help here
    # unlike before - we did not create the flattening layers just moved to
    # linear, will be created
    self.flatten = torch.nn.Flatten()
    # full connected - linear - input 512 and
    self.fc1 = torch.nn.Linear(512, 128)
    # two output layers - q values and state values

    # final output layers for corresponding action_size
    self.fc2a = torch.nn.Linear(128, action_size)
    # just one final state layer
    self.fc2s = torch.nn.Linear(128, 1)

  def forward(self, state):
    #  forward propogration
    x = self.conv1(state)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = self.conv3(x)
    x = F.relu(x)
    # rest after convulation later
    x = self.flatten(x)
    x = self.fc1(x)
    x = F.relu(x)
    # two values set
    action_values = self.fc2a(x)
    # need vector not an array
    state_value = self.fc2s(x)[0]
    return action_values, state_value


**Init Function:**
It creates 3 convulational layers which has a kernel, stride, input channels, and out channels. Then it converts the convolutional output into format suitable for fully connected layers. It is then go through one fully connected layer which and final layer produces two things the action values and state value.

**Forward Propogration**:
It will pass the input state through convoluational layers and applies relu function - repeats for conv2 and conv3. It then flatten converts the feature maps into a single dimensional vectors. It pass through the fully connected layer and seperate output with action values and state values.

## Part 2 - Training the AI

### Setting up the environment

In [None]:
class PreprocessAtari(ObservationWrapper):
  # defines the properties for the preprocessor
  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4):
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

# resets the environment
  def reset(self):
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info
# preprocess images
  def observation(self, img):
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs):
    self.frames = self.observation(obs)

# create the envirroment
def make_env():
  env = gym.make("KungFuMasterDeterministic-v0", render_mode = 'rgb_array')
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env()

state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.env.get_action_meanings())

State shape: (4, 42, 42)
Number actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


This explains the state space state shape, number of actions, and action names. The state space is 4 grayscales images and dimensions 42 by 42. There is 14 actions that they can take.

### Initializing the hyperparameters

In [None]:
learning_rate = 1e-4
# how we value future rewards
discount_factor = 0.99
# ac3 model trains environments and independent agent in different
# environments and need to be trained in parallel
# speeds up learing process and helps algorithm - converge much faster
number_enviromnments = 10


### Implementing the A3C class

In [None]:
class Agent():
  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.action_size = action_size
    # before had two brain local and target network, but we don't need this
    self.network = Network(action_size).to(self.device)
    # tool to train the brain
    self.optimizer = torch.optim.Adam(self.network.parameters(), lr = learning_rate)

  def act(self, state):
    # make sure that the state is in a batch - check if the dimension is 3 - stack of four
    # if it is 3 then single and add an extra dimension
    if state.ndim == 3:
      state = [state]
    state = torch.tensor(state, dtype = torch.float32, device = self.device)
    # knows it calling the forward method
    action_values, _ = self.network(state)
    # using soft-max instead of epilson greedy strategy to select action
    # give logits and give dimension representing different policies
    policy = F.softmax(action_values, dim = -1)
    # return several states and several actions corresponding for each state
    return np.array([np.random.choice(len(p), p = p) for p in policy.detach().cpu().numpy()])

  def step(self, state, action, reward, next_state, done):
    # batch of states and corresponding actions
    # first object in state tensor - represents number of state oberservations
    batch_size = state.shape[0]
    state = torch.tensor(state, dtype = torch.float32, device = self.device)
    next_state = torch.tensor(next_state, dtype = torch.float32, device = self.device)
    reward = torch.tensor(reward, dtype = torch.float32, device = self.device)
    done = torch.tensor(done, dtype = torch.bool, device = self.device).to(dtype = torch.float32)
    # forward propogration
    action_values, state_value = self.network(state)
    _, next_state_value = self.network(next_state)

    # bellman equation
    target_state_value = reward + (1 - done) * discount_factor * next_state_value
    advantage = target_state_value - state_value

    # actor loss - entropy
    probs = F.softmax(action_values, dim = -1)
    logprobs = F.log_softmax(action_values, dim = -1)
    # entropy on sum product of last dimension
    entropy = -torch.sum(probs * logprobs, axis = -1)
    # array
    batch_idx = np.arange(batch_size)
    logp_actions = logprobs[batch_idx, action]


    actor_loss = -(logp_actions * advantage.detach()).mean() - 0.001 * entropy.mean()
    critic_loss = F.mse_loss(target_state_value, state_value)
    loss = actor_loss + critic_loss

    # reset the optimizer
    self.optimizer.zero_grad()
    # backprograting the loss and backprogration
    loss.backward()
    # update the weights of neutral network
    self.optimizer.step()






Init Function: This sends either to the gpu or cpu depending on whether we have one or not. It also takes the action size so we can see the possible actions the agents can take. Along with the network and optimizer, network is the brain which is different just need one do not need local or target network. Optimizer allows the agent to be trained - it is needed to update the weights.

Act: Convert state into the right format - if the state has three dimensions. The state has three dimesion - slide image with multiple stacked frames. It is wrapped in a list so the model always recieves a batch of states. It is then converts it to a tensor so neural network can process moved to either a gpu or cpu. Get action scores from the foward pass in the neutral. Netwrok gives action values and state values. The action values into probabilities when passed through a softmax funntion - high values then higher probabilities. It is action that has the highest probability has a higher probability of being pick but not always. If it always pcickes the highest action then it would be greedy. Epsilon greedy action explore the random with a certain probabilities and the agent exploits by picking the action with the highest value. The q-values into probabilites using the softmax function as all action have a chance to be selected but high q values have a higher probabilities.

Step: The step function has two different thing actor and critic: actor what action to take and critic how good the state is. Batch size is number of samples being processed at same time. Actor is policy and critic is value network. Batch size is the number of samples (state -action) being processed at the same time - instead of traiing on one state-action pair at a time, we train on a batch of multiple state-action pairs. This helps with better learning stability and faster training. The state.shape gives the number of samples in the batch. The state, next_state to convert raw data into tensor for gpu/cpu computation - float for state, next state, and reward. Done will be a boolean tensor. There is a forward progation pass through - returns the action values and state value. The action scores for each state and give state value for each state. The compute target state value (bellman equtation), then target state value and subtract state value. it gives it the advantage - tensor(array) not a single vcalye. It is one advantage value per state in the batch. The acotr loss - it converts action values into probabilities. The compute the log probability of actions. The entropy measure of randomness or uncertainity in agents action selection and helps Ml models decide how much to explore vs exploit when making decisions. It creattes an array that represets indicies for each sample in the batch. The logprobs is a tensor containing the log probabilites of chosen action for each sample. The advantage tells how much better or worse a action was compared to expected. The negative sig ensure the model maximizes the probablity of good actions when neahtative and there is entropy regularization. It increaes random not too much exploration. The critic loss is the mean square error of the target state and state value. The computed total loss. The old gradients cleared, incorrect updates, and computes graident of loss, and update the weights.

### Initializing the A3C agent

In [None]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a single episode

In [None]:
def evaluate(agent, env, n_episodes = 1):
  episodes_rewards = []
  for _ in range(n_episodes):
    # reset method, initalize state
    state, _ = env.reset()
    total_reward = 0
    while True:
      # action is played
      action = agent.act(state)
      # gives us next state and given an action taken
      # index of an action
      state, reward, done, _, _ = env.step(action[0])
      total_reward += reward
      if done:
        break
        # add total reward for each episode
    episodes_rewards.append(total_reward)
    # returns an array of episode rewards
  return episodes_rewards


Evaluate: The function evaluates an agent in a given environment over a specified number of episodes. It takes an agent, env, and num of episodes. There is episodes rewards - this loop runs for a specified number of episodes. It initialize the environment at the begining of each episode. The state variable stores the initial state of the environment and total reward is 0. This loop runs until the episodes ends - the agent determines an action to take based on current state. The environment takes the next state after taking the action, reward obtained from this step, and assummes it returns a first element as the actual action. The reward is added to total reward. If done then it exit and added to total_reward in the episode rewards.

### Testing multiple agents on multiple environments at the same time

In [None]:
class EnvBatch:
  def __init__(self, n_envs = 10):
    self.envs = [make_env() for _ in range(n_envs)]

  def reset(self):
    _states = []
    for env in self.envs:
      # index to return state and state only
      _states.append(env.reset()[0])
    return np.array(_states)

  # take in multiple actions as we have multiple environments
  def step(self, actions):
    # transpose and convert each states, rewards, dones ... into numpy arrays
    next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)]))

    # check if an environment has been finished
    for i in range(len(self.envs)):
      if dones[i]:
        next_states[i] = self.envs[i].reset()[0]
    return next_states, rewards, dones, infos


This is made to manage multiple environments simulataneously for asychronous where environments run in parallel to speed up training. It used to store a loist of n_envs environments each created using make_env() function.

There is reset - it resets all environments and collects their inital states. It loops throught the environments and calls reset on each environment and extract the first value. It converts the collected states in a numpy array and returns it.

The step function is it executes a step in each environment to corrresponding from actions. A list of actions where each action corresponds to an environment in self.envs. The line applies actions to alll environments at once using a list comprehension. Pairs each environment with its corresponding action - call env.step for each environment action pair. It returns a tuple, it unzips the list of tuples - grouping similar items together converts each extracted list into a numpy array. Then it loops through all environments to see if an episode has ended. If done is true, the episode is reset with new inital state replaces next states. It returns next states, rewards, fones, infos.

### Training the A3C agent

In [None]:
import tqdm

env_batch = EnvBatch(number_enviromnments)
# reset environment
batch_states = env_batch.reset()

with tqdm.trange(0, 3001) as progress_bar:
  for i in progress_bar:
    # batch of actions being played
    batch_actions = agent.act(batch_states)
    # multiple environment
    batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions)
    # reduce magnitude of rewards to not be high
    batch_rewards *= 0.01
    agent.step(batch_states, batch_actions, batch_rewards, batch_next_states, batch_dones)
    batch_states = batch_next_states

    # 1,000 iterations
    if i % 1000 == 0:
      print("Average agent reward:", np.mean(evaluate(agent, env, n_episodes=10)))






  critic_loss = F.mse_loss(target_state_value, state_value)
  state = torch.tensor(state, dtype = torch.float32, device = self.device)
  0%|          | 9/3001 [00:31<2:06:21,  2.53s/it] 

Average agent reward: 650.0


 34%|███▎      | 1008/3001 [01:14<30:30,  1.09it/s]

Average agent reward: 540.0


 67%|██████▋   | 2009/3001 [02:00<14:00,  1.18it/s]

Average agent reward: 1050.0


100%|██████████| 3001/3001 [02:36<00:00, 19.13it/s]

Average agent reward: 60.0





There is one agent that it is parallel in the agent in multiple environments in parallel. Some environment might end earlier and could do it continously.

## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

