In [1]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import namedtuple, deque
import torch.optim as optim
import datetime
import gymnasium as gym
import matplotlib.pyplot as plt
from scipy.special import softmax
import numpy as np
from collections import deque, namedtuple
from torch.distributions import Categorical

In [2]:
env = gym.make('CartPole-v1')
env.reset(seed=0)

(array([ 0.01369617, -0.02302133, -0.04590265, -0.04834723], dtype=float32),
 {})

In [3]:
state_shape = env.observation_space.shape[0]
no_of_actions = env.action_space.n
print('state shape',state_shape)
print('no_of_actions',no_of_actions)
print(env.action_space.sample())
print("----")

state shape 4
no_of_actions 2
1
----


In [4]:
class QNetwork1(nn.Module):

    def __init__(self, state_size, action_size, seed,adv_type = 'avg', fc1_units=128, fc2_units=64,fc3_units = 256):
        """Initialize parameters and build model.
        Params
        ======
            state_size (int): Dimension of each state
            action_size (int): Dimension of each action
            seed (int): Random seed
            fc1_units (int): Number of nodes in first hidden layer
            fc2_units (int): Number of nodes in second hidden layer
        """
        super(QNetwork1, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.fc_adv = nn.Linear(fc2_units, fc3_units)
        self.fc_value = nn.Linear(fc2_units, fc3_units)
        self.adv = nn.Linear(fc3_units, action_size)
        self.value = nn.Linear(fc3_units, 1)
        self.adv_type = adv_type

    def forward(self, state):
        """Build a network that maps state -> action values."""
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x_adv = F.relu(self.fc_adv(x))
        x_adv = F.relu(self.adv(x_adv))
        x_value = F.relu(self.fc_value(x))
        x_value = F.relu(self.adv(x_value))
        if self.adv_type == 'avg':
          advAverage = torch.mean(x_adv, dim=1, keepdim=True)
          q =  x_value + x_adv - advAverage
        else:
          advMax,_ = torch.max(x_adv, dim=1, keepdim=True)
          q =  x_value + x_adv - advMax
        return q

Replay Buffer

In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

BUFFER_SIZE = int(1e5)  # replay buffer size
BATCH_SIZE = 64         # minibatch size
GAMMA = 0.99            # discount factor
LR = 5e-4               # learning rate
UPDATE_EVERY = 20       # how often to update the network (When Q target is present)

class ReplayBuffer:
    """Fixed-size buffer to store experience tuples."""

    def __init__(self, action_size, buffer_size, batch_size, seed):
        """Initialize a ReplayBuffer object.

        Params
        ======
            action_size (int): dimension of each action
            buffer_size (int): maximum size of buffer
            batch_size (int): size of each training batch
            seed (int): random seed
        """
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)

In [6]:
class TutorialAgent():

    def __init__(self, state_size, action_size, seed,adv_type):

        ''' Agent Environment Interaction '''
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        ''' Q-Network '''
        self.qnetwork_local = QNetwork1(state_size, action_size, seed, adv_type).to(device)
        self.qnetwork_target = QNetwork1(state_size, action_size, seed, adv_type).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)

        ''' Replay memory '''
        self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)

        ''' Initialize time step (for updating every UPDATE_EVERY steps)           -Needed for Q Targets '''
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):

        ''' Save experience in replay memory '''
        self.memory.add(state, action, reward, next_state, done)

        ''' If enough samples are available in memory, get random subset and learn '''
        if len(self.memory) >= BATCH_SIZE:
            experiences = self.memory.sample()
            self.learn(experiences, GAMMA)

        """ +Q TARGETS PRESENT """
        ''' Updating the Network every 'UPDATE_EVERY' steps taken '''
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            self.qnetwork_target.load_state_dict(self.qnetwork_local.state_dict())

    def act(self, state, eps=0.):

        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()

        ''' Epsilon-greedy action selection (Already Present) '''
        if random.random() > eps:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def act_softmax(self, state, tau=1.0):

        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()

        ''' Softmax action selection '''
        x = [action_value/tau for action_value in action_values.cpu().data.numpy()][0]
        return np.random.choice(np.arange(self.action_size), p=softmax(x))

    def learn(self, experiences, gamma):
        """ +E EXPERIENCE REPLAY PRESENT """
        states, actions, rewards, next_states, dones = experiences

        ''' Get max predicted Q values (for next states) from target model'''
        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)

        ''' Compute Q targets for current states '''
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        ''' Get expected Q values from local model '''
        Q_expected = self.qnetwork_local(states).gather(1, actions)

        ''' Compute loss '''
        loss = F.mse_loss(Q_expected, Q_targets)

        ''' Minimize the loss '''
        self.optimizer.zero_grad()
        loss.backward()

        ''' Gradiant Clipping '''
        """ +T TRUNCATION PRESENT """
        for param in self.qnetwork_local.parameters():
            if param.grad != None:
              param.grad.data.clamp_(-1, 1)

        self.optimizer.step()
        
    def save_model(self,path):
        torch.save(self.qnetwork_local.state_dict(), f'weights/{path}.pth')
        
    def load_model(self,path):
        self.qnetwork_local.load_state_dict(torch.load(f'weights/{path}.pth', map_location=torch.device(device)))
        self.qnetwork_local.eval()

In [9]:
''' Defining DQN Algorithm '''

state_shape = env.observation_space.shape[0]
action_shape = env.action_space.n

def dqn(n_episodes=10000, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.905):
    rewards = []

    scores_window = deque(maxlen=100)
    ''' last 100 scores for checking if the avg is more than 195 '''

    eps = eps_start
    ''' initialize epsilon '''

    for i_episode in range(1, n_episodes+1):
        state,_ = env.reset()
        score = 0
        for t in range(max_t):
            action = agent.act(state, eps)
            next_state, reward, done, _, info = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break

        scores_window.append(score)
        rewards.append(score)
        eps = max(eps_end, eps_decay*eps)
        ''' decrease epsilon '''

        print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

        if i_episode % 100 == 0:
           print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
        if np.mean(scores_window)>=475.0:
           print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
           agent.save_model('deuling_dqn')
           break
    return rewards

Avg Advantage

In [10]:
begin_time = datetime.datetime.now()
agent = TutorialAgent(state_size=state_shape,action_size = action_shape,seed = 0,adv_type="max")
dqn()
time_taken = datetime.datetime.now() - begin_time
print(time_taken)

Episode 100	Average Score: 221.33
Episode 200	Average Score: 65.788
Episode 300	Average Score: 10.00
Episode 400	Average Score: 69.45
Episode 500	Average Score: 10.24
Episode 600	Average Score: 28.19
Episode 700	Average Score: 53.50
Episode 800	Average Score: 100.60
Episode 900	Average Score: 105.72
Episode 1000	Average Score: 13.53
Episode 1100	Average Score: 143.64
Episode 1200	Average Score: 25.038
Episode 1300	Average Score: 64.85
Episode 1400	Average Score: 132.40
Episode 1500	Average Score: 328.43
Episode 1600	Average Score: 22.288
Episode 1700	Average Score: 35.57
Episode 1800	Average Score: 19.51
Episode 1900	Average Score: 166.44
Episode 2000	Average Score: 9.4752
Episode 2100	Average Score: 9.47
Episode 2200	Average Score: 9.31
Episode 2300	Average Score: 9.32
Episode 2400	Average Score: 9.37
Episode 2500	Average Score: 9.33
Episode 2600	Average Score: 9.50
Episode 2700	Average Score: 9.42
Episode 2800	Average Score: 9.52
Episode 2900	Average Score: 9.32
Episode 3000	Average 

In [None]:
def render_policy():
    agent =  TutorialAgent(state_size=state_shape,action_size = action_shape,seed = 0,adv_type="max")
    agent.load_model('deuling_dqn')
    env = gym.make('CartPole-v1', render_mode='human')
    state,_ = env.reset()
    total_reward = 0
    while True:
        action = agent.act(state)
        next_state, reward, done, _, info = env.step(action)
        total_reward += reward
        env.render()
        if done:
            break
        state = next_state
    print(total_reward)
    env.close()
render_policy()

KeyboardInterrupt: 