# A3C for Kung Fu

https://ale.farama.org/environments/kung_fu_master/

Observation space is still image

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [1]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 34 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 1s (843 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 126102 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubunt

### Importing the libraries

In [1]:
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import ale_py
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium import ObservationWrapper

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

Architecture will have conv layers and fully connected layers
The critic won't be included in this, it will come in the A3C algorithm in part 2

In [2]:
class Network(nn.Module):

  def __init__(self, action_size):
    super(Network, self).__init__()
    self.conv1 = torch.nn.Conv2d(in_channels=4,  out_channels=32, kernel_size = (3,3), stride = 2) #4 is not the 3 rgb channels, but 4 greyscale frames
    self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size = (3,3), stride = 2)
    self.conv3 = torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size = (3,3), stride = 2) #the params remain same, dont decrease
    #Dont batch normalize since doesnt help
    #Create flattening layer and 1 fcl
    self.flatten = torch.nn.Flatten()
    self.fc1 = torch.nn.Linear(in_features=512, out_features=128)
    #Directly 2 output layers- action and critic
    self.fc2a= torch.nn.Linear(in_features=128, out_features=action_size)
    self.fc2s= torch.nn.Linear(in_features=128, out_features=1)

  def forward(self, state):
    x = self.conv1(state)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = self.conv3(x)
    x = F.relu(x)
    x = self.flatten(x)
    x = self.fc1(x)
    x = F.relu(x)
    action_values= self.fc2a(x)
    state_values = self.fc2s(x)[0]
    return action_values, state_values


## Part 2 - Training the AI

### Setting up the environment

In [3]:
class PreprocessAtari(ObservationWrapper): #Preprocessing the environment

  def __init__(self, env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4): #initialize all properties of env
    super(PreprocessAtari, self).__init__(env)
    self.img_size = (height, width)
    self.crop = crop
    self.dim_order = dim_order
    self.color = color
    self.frame_stack = n_frames
    n_channels = 3 * n_frames if color else n_frames
    obs_shape = {'tensorflow': (height, width, n_channels), 'pytorch': (n_channels, height, width)}[dim_order]
    self.observation_space = Box(0.0, 1.0, obs_shape)
    self.frames = np.zeros(obs_shape, dtype = np.float32)

  def reset(self): #reset environment
    self.frames = np.zeros_like(self.frames)
    obs, info = self.env.reset()
    self.update_buffer(obs)
    return self.frames, info

  def observation(self, img): #preprocess image
    img = self.crop(img)
    img = cv2.resize(img, self.img_size)
    if not self.color:
      if len(img.shape) == 3 and img.shape[2] == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = img.astype('float32') / 255.
    if self.color:
      self.frames = np.roll(self.frames, shift = -3, axis = 0)
    else:
      self.frames = np.roll(self.frames, shift = -1, axis = 0)
    if self.color:
      self.frames[-3:] = img
    else:
      self.frames[-1] = img
    return self.frames

  def update_buffer(self, obs): #to update buffer
    self.frames = self.observation(obs)

def make_env(): #calls the preprocessAtari environment
  env = gym.make("KungFuMasterDeterministic-v0", render_mode = 'rgb_array')
  env = PreprocessAtari(env, height = 42, width = 42, crop = lambda img: img, dim_order = 'pytorch', color = False, n_frames = 4)
  return env

env = make_env()

state_shape = env.observation_space.shape #4 images of dimensions 42 by 42
number_actions = env.action_space.n #14 actions possible
print("State shape:", state_shape)
print("Number actions:", number_actions)
print("Action names:", env.env.env.env.get_action_meanings())

  logger.deprecation(


State shape: (4, 42, 42)
Number actions: 14
Action names: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'DOWNRIGHT', 'DOWNLEFT', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']


### Initializing the hyperparameters

In [4]:
learning_rate = 5e-4
discount_factor = 0.99 #used in computing target state value
number_environments= 10 #each env has agent, and this considerably improves learning

### Implementing the A3C class

In [5]:
#No seprarate LEARN method only a STEP method which also has contents of learn method

class Agent():

  def __init__(self, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.action_size = action_size
    #No local and target Q network, only 1 brain
    self.network = Network(action_size).to(self.device)
    self.optimizer = torch.optim.Adam(self.network.parameters(), lr = learning_rate)




  def act(self, state): #takes a batch of states(cz multiple envs), but we represent as single state for ease of notation
    if state.ndim ==3:
      state=[state] #dimension will be 4, cz extra dimension of batch added in beginning
                    #(basically unsqueeze but dont need to use unsqueeze since not a tensor yet)
    state=torch.tensor(state, dtype=torch.float32, device= self.device)
    action_values, _ = self.network(state) # since we dont need state value yet
    #Action selection using Softmax not epsilon
    policy=F.softmax(action_values, dim=-1) # softmax to be applied on last dimension- dimension representing diff actions
    #Softmax converts action_values into probabilities, so for each state, len(policy) will be no. of actions

    #remember that the state is actually a batch of several states
    #so policy is a batch of policies, one for each state in batch

    return np.array([np.random.choice(len(p),p =p) for p in policy.detach().cpu().numpy()])
    #for each policy, action is chosen by choosing a random element from the policy
    #Return a batch of actions- one for each state(policy) in the batch





  #Method called when agent takes a step, and it will update the parameters(weights) of A3C NN, takes all properties of experience as params
  #Has the learn method also inside
  def step(self, state, action, reward, next_state, done): #takes batch of each param, but for ease of notation, writing them as single
    batch_size = state.shape[0]
    state = torch.tensor(state, dtype = torch.float32, device = self.device)
    next_state = torch.tensor(next_state, dtype = torch.float32, device = self.device)
    reward = torch.tensor(reward, dtype = torch.float32, device = self.device)
    done = torch.tensor(done, dtype = torch.bool, device = self.device).to(dtype = torch.float32)

    action_values, state_value = self.network(state) #predicted action_values and state_value
    _, next_state_value= self.network(next_state) #reqd to calculate target state value
    target_state_value=reward+discount_factor*next_state_value*(1-done) #Bellman equation

    advantage=target_state_value-state_value #advantage is used in actor loss
    #actors loss and critic loss

    #actor loss
    probs =F.softmax(action_values, dim=-1) #Prob dist of action values
    logprobs =F.softmax(action_values, dim=-1) #Log prob dist of action values
    entropy= -torch.sum(probs * logprobs, axis=-1)
    #Need logprobs of actions selected for each of the states from the batch of states

    batch_idx = np.arange(batch_size) #batch indexes
    logp_actions=logprobs[batch_idx, action] #action selected for each of the states from batch
    actor_loss=-(logp_actions*advantage.detach()).mean()+0.0001*entropy.mean()

    #Critic loss
    critic_loss = F.mse_loss(target_state_value.detach(), state_value)

    #total loss
    total_loss=actor_loss + critic_loss
    self.optimizer.zero_grad()
    total_loss.backward()
    self.optimizer.step()




### Initializing the A3C agent

In [6]:
agent = Agent(number_actions)

### Evaluating our A3C agent on a single episode

In [7]:
#Eval A3C agent on a certain no. of episodes, this will be called in training (this code was inside training for prev project)
def evaluate(agent, env, n_episodes = 1):
  #return list of total rewards in each of these episodes
  episodes_rewards = [] #len will be n_episodes

  for _ in range(n_episodes):
    state, _ = env.reset()
    done = False #break if episode is done
    total_reward = 0
    while not done:
      action = agent.act(state)
      state, reward, done, _, _ = env.step(action[0]) #get the next state and rewards etc if agent were to take the action
      #at each step/state, get reward and add it to total reward of episode, until episode is done
      total_reward += reward
    episodes_rewards.append(total_reward)
  return episodes_rewards

### Testing multiple agents on multiple environments at the same time

In [8]:
#Multiple agents and multiple environments
class EnvBatch:
  def __init__(self, n_envs=10):
    self.envs = [make_env() for _ in range(n_envs)] #create 10 envs
    self.n_envs = n_envs

  def reset(self): #to reset all environments
    _states = []

    for env in self.envs: #reset state of each env and append to _states
      _states.append(env.reset()[0]) #returns the reset state
    return np.array(_states)

  # step in multiple environments
  def step(self, actions): #action for each environment
    next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)])) #env.step for each env
    for i in range(len(self.envs)):
      if dones[i]: #if env i is done, reset env i -> reset next_state of env i
        next_states[i] = self.envs[i].reset()[0]
    return next_states, rewards, dones, infos

### Training the A3C agent

In [9]:
import tqdm
env_batch = EnvBatch(number_environments)
#reset all states in multiple environments
batch_states = env_batch.reset()

#progress bar as we progress in training
with tqdm.trange(0,3001) as progress_bar: #range of iterations to train model
  for i in progress_bar:
    batch_actions= agent.act(batch_states) #all agents act
    batch_next_states, batch_rewards, batch_dones, _ = env_batch.step(batch_actions) #get all the next states and rewards etc if agents were to take the action
    batch_rewards *=0.01 #to stabilize the training
    agent.step(batch_states, batch_actions, batch_rewards, batch_states, batch_dones) #agents actually does step(in all 10 envs), and learns
    batch_states = batch_next_states #update batch states for the next iteration
    if i%1000 ==0: #every 1000 iterations
      print("Average agent reward: ",np.mean(evaluate(agent, env, n_episodes=10))) #evaluate the average reward over 10 episodes


  logger.deprecation(
  critic_loss = F.mse_loss(target_state_value.detach(), state_value)
  state=torch.tensor(state, dtype=torch.float32, device= self.device)
  0%|          | 4/3001 [00:34<5:27:04,  6.55s/it] 

Average agent reward:  450.0


 33%|███▎      | 1005/3001 [01:41<1:14:56,  2.25s/it]

Average agent reward:  540.0


 67%|██████▋   | 2006/3001 [02:48<34:17,  2.07s/it]

Average agent reward:  770.0


100%|██████████| 3001/3001 [03:58<00:00, 12.58it/s]

Average agent reward:  1250.0





## Part 3 - Visualizing the results

In [10]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env):
  state, _ = env.reset()
  done = False
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame)
    action = agent.act(state)
    state, reward, done, _, _ = env.step(action[0])
  env.close()
  imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

