# Ice dungeon with partial observability

Agent A is in a dungeon, and can move up, down, left, right to find the exit:
- A agent
- E exit
- X non-traversable obstacle
- L lava.

Read the Dungeon class and try to identify how it works, the reward structure, etc.

This lab will take inspiration from the PyTorch tutorial on DQN, that we will apply to our custom environment Dungeon.

https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

We will adapt the concepts step by step to our environment.


# Loading the dungeon

In [None]:
from dungeon.dungeon import Dungeon, index_to_actions
import numpy as np
import random

import matplotlib.pyplot as plt


In [None]:
SIZE_ENVIR = 15

my_dungeon = Dungeon(SIZE_ENVIR)
my_dungeon.reset()
my_dungeon.display()

In [None]:
my_dungeon.reset()
my_dungeon.display()

In [None]:
obs, _, _ = my_dungeon.step('left')
my_dungeon.display()
for obs_name, o in obs.items():
    print(obs_name)
    print(o)

# Exercise 1 - Replay Buffer for Experience replay

The first step of this lab is to create a Replay Buffer that will allow us to use Experience Replay and mini-batch learning.
- First, we create a class Transition using named tuple, which holds state transition in a dedicated data structure.
- Then, create a Replay Memory class that collects transition in a First In First Out fashion (fixed memory size). This Replay memory should convert the states, action, rewards into tensors.

The models presented in the Pytorch tutorial are quite generic and can be used as is.

In [None]:
from collections import namedtuple

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [None]:
def convert_state(state):
    
    c = state['relative_coordinates'].flatten()/SIZE_ENVIR
    o = state['surroundings'].flatten()/4.
    state_tensor = np.concatenate( [c,o] )
    state_tensor = torch.tensor(state_tensor, device=device).unsqueeze(0)
    
    return state_tensor
    

class ReplayMemory:

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, state, action, next_state, reward):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        
        state_tensor = convert_state(state)
        
        if next_state is None:
            state_tensor_next = None            
        else:
            state_tensor_next = convert_state(next_state)
            
        action_tensor = torch.tensor([action], device=device).unsqueeze(0)

        reward = torch.tensor([reward], device=device).unsqueeze(0)/10. # reward scaling

        self.memory[self.position] = Transition(state_tensor, action_tensor, state_tensor_next, reward)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Exercise 2: Q-network

A Q-network is a neural network that maps states to Q-values for each actions.

Implement a first version of Q-networks.
Keep it simple (e.g. 3 hidden layers, with Relu activations).


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class DQN(nn.Module):

    def __init__(self, input_size, size_hidden, output_size):
        
        super().__init__()
        
        # Declare the different layers
        
        
    def forward(self, x):
        
        # Declare how the data flows from input x to output.
        
        return output
    

# Exercise 3 - Set up the Q-networks

In DQN, the weights of the target network are copied from the weights of policy network every few iterations.

We set the frequency of update using TARGET_UPDATE = 10.

Instead of RMSprop we will use SGD. 

In [None]:
OBS_SIZE = 5*5 + 2
HIDDEN_SIZE = 64
ACTION_SIZE = 4

Q_network = DQN(OBS_SIZE, HIDDEN_SIZE, ACTION_SIZE).to(device)
Q_target = DQN(OBS_SIZE, HIDDEN_SIZE, ACTION_SIZE).to(device)
Q_target.load_state_dict(Q_network.state_dict())
Q_target.eval()

TARGET_UPDATE = 100

optimizer = optim.SGD(Q_network.parameters(), lr=0.001)
memory = ReplayMemory(10000)


# Exercise 4 - Epsilon-greedy policy

You can take inspiration from pytorch tutorial and implement the select_action function.
Or, alternatively, you can implement a E-greedy policy class that will select epsilon greedy actions..


In [None]:
class E_Greedy_Policy():
    
    def __init__(self, epsilon, decay, min_epsilon):
        
        self.epsilon = epsilon
        self.epsilon_start = epsilon
        self.decay = decay
        self.epsilon_min = min_epsilon
                
    def __call__(self, state):
                
        is_greedy = random.random() > self.epsilon
        
        if is_greedy :
            # we select greedy action
            with torch.no_grad():
                Q_network.eval()
                index_action = # take action corresponding to max Q-value
                Q_network.train()
        else:
            # we sample a random action
            index_action = random.randint(0,3)
        
        return index_action
                
    def update_epsilon(self):
        
        self.epsilon = self.epsilon*self.decay
        if self.epsilon < self.epsilon_min:
            self.epsilon = self.epsilon_min
        
    def reset(self):
        self.epsilon = self.epsilon_start
        
        
policy = E_Greedy_Policy(0.99, decay=0.997, min_epsilon=0.001)

# Exercise 5 - Training loop

In [None]:
BATCH_SIZE = 256
GAMMA = 0.5

def optimize_model():
    
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    # Compute Q values using policy net
    Q_values = ...
    
    # Compute next Q values using Q_targets
    next_Q_values = ...
    
    # Compute targets
    target_Q_values = ...
    
    # Compute MSE Loss
    loss = F.mse_loss(Q_values, target_Q_values)
    
    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    
    # Trick: gradient clipping
    for param in Q_network.parameters():
        param.grad.data.clamp_(-1, 1)
        
    optimizer.step()
    
    return loss

In [None]:
num_episodes = 1000

policy.reset()

rewards_history = []

# Warmup phase!
memory_filled = False

while not memory_filled:
    
    my_dungeon = Dungeon(SIZE_ENVIR)
    state = my_dungeon.reset()
    done = False
    
    total_reward = 0
    
    while not done:
        
        # Get action and act in the world
        state_tensor = convert_state(state)
        
        action = policy(state_tensor)
        action_name = index_to_actions[action].name 
        next_state, reward, done = my_dungeon.step(action_name)
        
        total_reward += float(reward)
        
        # Observe new state
        if done:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, next_state, float(reward))
        
        state = next_state


    memory_filled = memory.capacity == len(memory)

print('Done with the warmup')
    
    
for i_episode in range(num_episodes):
    
    # New dungeon at every run
    my_dungeon = Dungeon(SIZE_ENVIR)
    state = my_dungeon.reset()
    done = False
    
    total_reward = 0
    
    while not done:
        
        # Get action and act in the world
        state_tensor = convert_state(state)
        
        action = policy(state_tensor)
        action_name = index_to_actions[action].name 
        next_state, reward, done = my_dungeon.step(action_name)
        
        total_reward += float(reward)
        
        # Observe new state
        if done:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, next_state, float(reward))

        # Move to the next state
        state = next_state

        # Perform one step of the optimization
        started_training = True
        l = optimize_model()

    policy.update_epsilon()
    rewards_history.append( float(total_reward) )

    
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:

        Q_target.load_state_dict(Q_network.state_dict())
    
    if (i_episode) % 10 == 0:
        
        print('Episode ', i_episode, ': ', 'reward :',  total_reward, 'eps: ', 
              policy.epsilon, ' loss:', l.detach().cpu())   
        print( sum(rewards_history[-10:])/10)    

print('Complete')


In [None]:
plt.plot(rewards_history, 'b.', alpha=.1)