<a href="https://colab.research.google.com/github/suprabhat25/FrozenLake/blob/main/Cart_and_Pole_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import Libraries
https://deeplizard.com/learn/video/PyQNfsGUnQA


In [60]:
!apt-get install -y xvfb python-opengl > /dev/null 2>&1

In [61]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1

In [62]:
!apt-get install python-opengl -y

!apt install xvfb -y

!pip install pyvirtualdisplay

!pip install piglet

Reading package lists... Done
Building dependency tree       
Reading state information... Done
python-opengl is already the newest version (3.1.0+dfsg-1).
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
Reading package lists... Done
Building dependency tree       
Reading state information... Done
xvfb is already the newest version (2:1.19.6-1ubuntu4.7).
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.


In [63]:
%matplotlib inline
import gym
import math 
import random 
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn # To build a neural network in PyTorch, we use the torch.nn package  
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

Set up display

In [64]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display # we import IPython’s display module to aid us in plotting images to the screen later.

Deep Q-Network

In [65]:
#Module is the base class for all NN
#DQN receives screenshots of current environment as inputs
#so, img's height and width has been used as arguments.
class DQN(nn.Module):
  def __init__(self, img_height, img_width):
    super().__init__()

    self.fc1 = nn.Linear(in_features=img_height*img_width*3, out_features=24)
    self.fc2 = nn.Linear(in_features=24, out_features=32)
    self.out = nn.Linear(in_features=32, out_features=2)

  def forward(self, t):
    t = t.flatten(start_dim=1)  # https://stackoverflow.com/questions/43237124/what-is-the-role-of-flatten-in-keras
    # Flattening a tensor means to remove all of the dimensions except for one. This is exactly what the Flatten layer do.
    t = F.relu(self.fc1(t))
    t = F.relu(self.fc2(t))
    t = self.out(t)
    return t

    # In our particular cart and pole example, remember that the network will be outputting the Q-values 
    # that correspond to each possible action that the agent can take from a given state. Our only available actions are to move right 
    # or to move left, therefore, the number outputs will be equal to two.


  # To start out with a very simple network, our network will consist only of two fully connected hidden layers, 
  # and an output layer. PyTorch refers to fully connected layers as Linear layers.

  # The last thing we have to do for our DQN class is to define a function called forward(). This function will implement a forward pass to the network.
  # Note that all PyTorch neural networks require an implementation of forward().

Experience class from replay memory will be used to train the NN.

In [66]:
Experience = namedtuple (# experiences from replay memory is what we’ll use to train our network.
 
    'Experience', ('state', 'action', 'next_state', 'reward')
  )

In [67]:
e = Experience(2,3,1,4)

In [68]:
e

Experience(state=2, action=3, next_state=1, reward=4)

In [69]:
class ReplayMemory(): # ReplayMemory class, which is where these experiences will be stored.
  def __init__(self,capacity):
    self.capacity = capacity
    self.memory = [] # stores the experiences of the agent 
    self.push_count = 0 #keeps track of how many experiences we have added so far to the memory
  
  def push(self, experience):
    if len(self.memory) < self.capacity:
      self.memory.append(experience)
    else:
      self.memory[self.push_count % self.capacity] = experience # used to override the older experiences. It means experiences will be added at the front of the memory.
    self.push_count += 1

  def sample(self,batch_size): #sample experiences are used to train our DQN
    return random.sample(self.memory, batch_size)

  def can_provide_sample(self, batch_size):
    return len(self.memory) >= batch_size


Epsilon Greedy Strategy


In [70]:
class EpsilonGreedyStrategy():
  def __init__(self, start, end, decay):
    self.start = start
    self.end = end
    self.decay = decay

  def get_exploration_rate(self, current_step): #this decay rate explains what to prefer "exploration" or "exploitation"
    return self.end + (self.start - self.end) * math.exp(-1 * current_step * self.decay)

Reinforcement Learning Agent

In [71]:
class Agent():
  def __init__(self, strategy, num_actions, device ): #strategy used here is taking values from EpsilonGreedyStrategy()
  # num_actions corresponds to how many actions an agent can take from a given state.
  # In the cart pole game, we'll always have 2 num_actions as agent can only go in either left or right directions.
    self.current_step = 0 
    self.strategy = strategy
    self.num_actions = num_actions
    self.device = device

  def select_action(self, state, policy_net): # policy_net is the policy network we used to train the DQ network to learn optimal policy.
    rate = strategy.get_exploration_rate(self.current_step) # current_step as an argument tells which option to go for "exploration" or "exploitation".
    self.current_step += 1

    if rate > random.random():
      return random.randrange(self.num_actions) # we explore the environment by randomly selecting an action
    else:
      with torch.no_grad(): #using this to turn off gradient tracking since we r using the model just
      # for inference and not for training.
        return policy_net(state).argmax(dim=1).to(device) #exploit
        # we exploit the environment by selecting the action that corresponds to highest Q-value 
        # output from our policy network for the given state.



  # During training PyTorch keeps track of all the forward pass calculations that happen within the network. It needs to do this so that it can know how to apply backpropagation later. 
  # Since we’re only using the model for inference at the moment, we’re telling PyTorch not to keep track of any forward pass calculations.

  # INFERENCE is the process of drawing conclusions about a parameter one is seeking to measure or estimate.

Environment Manager

In [72]:
 # this class will manage our cart and pole environment
class CartPoleEnvManager():
   def __init__(self, device):
        self.device = device
        self.env = gym.make('CartPole-v0').unwrapped
        self.env.reset()
        self.current_screen = None
        self.done = False
   def reset(self):
    self.env.reset()
    self.current_screen = None

   def close(self):
    self.env.close()

   def render(self, mode='human'):
    return self.env.render(mode)

   def num_actions_available(self):
    return self.env.action_space.n

   def take_action(self, action):        
    _, reward, self.done, _ = self.env.step(action.item())
    return torch.tensor([reward], device=self.device)

   def just_starting(self):
    return self.current_screen is None

   def get_state(self):
    if self.just_starting() or self.done:
        self.current_screen = self.get_processed_screen()
        black_screen = torch.zeros_like(self.current_screen)
        return black_screen
    else:
        s1 = self.current_screen
        s2 = self.get_processed_screen()
        self.current_screen = s2
        return s2 - s1

    def get_screen_height(self):
      screen = self.get_processed_screen()
      return screen.shape[2]

    def get_screen_width(self):
      screen = self.get_processed_screen()
      return screen.shape[3]

    def get_processed_screen(self):
     screen = self.render('rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
     screen = self.crop_screen(screen)
     return self.transform_screen_data(screen)


    def crop_screen(self, screen):
     screen_height = screen.shape[1]

    # Strip off top and bottom
     top = int(screen_height * 0.4)
     bottom = int(screen_height * 0.8)
     screen = screen[:, top:bottom, :]
     return screen

   def transform_screen_data(self, screen):       
    # Convert to float, rescale, convert to tensor
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)

    # Use torchvision package to compose image transforms
    resize = T.Compose([
        T.ToPILImage()
        ,T.Resize((40,90))
        ,T.ToTensor()
    ])

    return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)



Non-processed Screen

In [73]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
em.reset()
screen = em.render(mode='rgb_array')

plt.figure()
plt.imshow(screen)
plt.title('Non-processed screen example')
plt.show()

NameError: ignored

Processed Screen

In [None]:
screen = em.get_processed_screen()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Processed screen example')
plt.show()

Starting state

In [None]:
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Starting state example')
plt.show()

Non-Starting State

In [None]:
for i in range(5):
    em.take_action(torch.tensor([1]))
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Non starting state example')
plt.show()

Ending State

In [None]:
em.done = True
screen = em.get_state()

plt.figure()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Ending state example')
plt.show()
em.close()

In [None]:
def plot(values, moving_avg_period):
    plt.figure(2)
    plt.clf()        
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(values)
    plt.plot(get_moving_average(moving_avg_period, values))
    plt.pause(0.001)
    if is_ipython: display.clear_output(wait=True)

In [None]:
def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
            .mean(dim=1).flatten(start_dim=0)
        moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
        return moving_avg.numpy()
    else:
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()

In [None]:
plot(np.random.rand(300), 100)