[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lorenzobasile/DeepLearning2022/blob/main/6_dql.ipynb)

# Lab 6

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
np.random.seed(0)

# An introducion to Deep Reinforcement Learning

Today we will be seeing a simple example of how a Deep RL algorithm can help us understand how to navigate an initially unknown enviroment.

Our task is usually referred to as *gridworld*, as we have an agent that moves in a rectangular grid with the final aim of collecting a **reward**.

The agent knows its position, which is the **state** of our decision process and at each move it makes it receives a reward signal, which is either 0 if nothing happens, 1 if the reward is reached, and -2 if a penalty cell is reached.

The agent is also penalized for the time it takes it to reach the objective. For this reason, the final goal would be to learn a way to reach the reward as quickly as possible, while avoiding the penalty cell.

In [None]:
class Gridworld():
    def __init__(self, nrows, ncols, target_pos, init_pos, penalty_pos=None):
        self.dimensions=(nrows,ncols)
        self.done=(target_pos==init_pos)
        self.target=target_pos
        self.current_pos=init_pos
        self.penalty=penalty_pos
        
    def observe(self):
        return self.current_pos
    
    def reset(self, init_pos):
        self.current_pos=init_pos
        self.done=(self.target==init_pos)
        
    def step(self, action):
        nrows,ncols=self.dimensions
        if action==0:
            self.current_pos=(self.current_pos[0], min(self.current_pos[1]+1, float(nrows)))
        if action==1:
            self.current_pos=(min(self.current_pos[0]+1, float(ncols)), self.current_pos[1])
        if action==2:
            self.current_pos=(self.current_pos[0], max(self.current_pos[1]-1, 0.0))
        if action==3:
            self.current_pos=(max(self.current_pos[0]-1, 0.0), self.current_pos[1])     
        observation=self.observe()
        terminated=False     
        if self.current_pos==self.target:
            reward=1.0
            terminated=True
        elif self.current_pos==self.penalty:
            reward=-2.0
        else:
            reward=0.0       
        return observation, reward, terminated
    
    def show(self):
        plt.figure(figsize=(15, 8))
        plt.title("Gridworld")
        plt.imshow(np.zeros((nrows,ncols)),cmap="Greys")
        ax=plt.gca()
        ax.set_xticks(np.arange(-.5, nrows, 1), minor=True)
        ax.set_yticks(np.arange(-.5, ncols, 1), minor=True)
        ax.grid(which='minor', color='k', linestyle='-', linewidth=1)
        ax.tick_params(which='minor', bottom=False, left=False)
        plt.scatter(self.target[0], self.target[1], label='target', color='y', s=100)
        plt.scatter(self.current_pos[0], self.current_pos[1], label='current', color='red', s=100)
        plt.legend()
        plt.show()


We can create and display our environment, in which the reward is fixed, while our initial position is randomized:

In [None]:
nrows=6
ncols=7
gamma=0.9
target_pos=(3.0, 1.0)
env=Gridworld(nrows, ncols, target_pos=target_pos, init_pos=(float(np.random.randint(ncols)), float(np.random.randint(nrows))))

env.show()
print(env.observe())

This task should be very easy to solve (we wouldn't even really need DL), so we can use a very simple MLP architecture as a Q-Network:

In [None]:
class QNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1=torch.nn.Linear(2,128)
        self.layer2=torch.nn.Linear(128,64)
        self.layer3=torch.nn.Linear(64,4)
        
    def forward(self, x):
        x=torch.relu(self.layer1(x))
        x=torch.relu(self.layer2(x))
        return self.layer3(x)

The following two functions will be used to visualize the policy we learn: since we have a small and fixed number of states ($nrows\times ncols$) and actions (4), we can query our Q-Network in each of these state-action pairs to obtain the Q value $Q(s,a)$.

Once we have computed values for $Q(s,a)$, we can derive a greedy policy by taking the $\arg\max$ over actions in each state:
$$
\pi(a|s)=\arg\max_a Q(s,a)
$$

In [None]:
def compute_q(net):
    Q=np.zeros((4, nrows, ncols))
    for x in range(ncols):
        for y in range(nrows):
            for a in range(4):
                Q[a, y, x]=net(torch.tensor([x,y], dtype=torch.float))[a].item()
    return Q

def plot_policy(model, target, penalty=None):
    cmap = colors.ListedColormap(['white', 'red', 'green', 'blue'])
    bounds=[0,1,2,3,4]
    norm = colors.BoundaryNorm(bounds, cmap.N)
    policy=np.argmax(compute_q(model), axis=0)
    plt.figure(figsize=(15, 8))
    plt.title("Learned Policy")
    img=plt.imshow(policy,cmap=cmap, norm=norm)
    plt.scatter(target[0], target[1], label='target', color='y', s=200)
    if penalty is not None:
        plt.scatter(penalty[0], penalty[1], label='penalty', color='k', s=200)
    cbar=plt.colorbar(img, boundaries=bounds)
    cbar.set_ticks([0.5,1.5,2.5,3.5])
    cbar.ax.set_yticklabels(['Down','Right', 'Up', 'Left'])
    plt.legend(prop={'size': 14})
    plt.show()

Now we can train our network using Q-Learning.

Pay attention to the fact that our exploration rate $\epsilon$ and learning rate decrease with training time and to the fact that we need to make the target constant w.r.t. the parameters: we obtain this by calling the `.detach()` method of `torch`.

In [None]:
model=QNet()

eps=0.9
optimizer=torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler=torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=.995)
loss=torch.nn.MSELoss()
n_episodes=2000

durations=[]
learning_rates=[]
epsilons=[]

for episode in range(n_episodes):
    eps=eps*0.995
    env.reset((float(np.random.randint(ncols)), float(np.random.randint(nrows))))
    t=0
    done=env.done
    while (t<100 and not done):
        t+=1
        q=model(torch.tensor(env.observe()))
        A_t=int(np.random.randint(4) if np.random.rand()<eps else torch.argmax(q).reshape(1))
        observation,reward,done=env.step(A_t)
        target=torch.tensor(reward) if done else reward+gamma*torch.max(model(torch.tensor(observation).detach()))
        l=loss(target, q[A_t])
        optimizer.zero_grad()
        l.backward()
        optimizer.step()
    scheduler.step()
    durations.append(t)

The network quickly learns a suitable policy:

In [None]:
plt.figure(figsize=(15, 8))
plt.title("Episode duration")
plt.plot(durations, 'o', markersize=2)
plt.plot(np.convolve(durations, np.ones(10), mode='valid')/10)

In [None]:
plot_policy(model, target_pos)

Now let's try to see what happens if we add a penalty cell: the agent should learn a policy that avoids passing through it:

In [None]:
model=QNet()
penalty_pos=(1.0, 4.0)
env=Gridworld(nrows, ncols, target_pos=target_pos, init_pos=(float(np.random.randint(ncols)), float(np.random.randint(nrows))), penalty_pos=penalty_pos)

eps=0.9
optimizer=torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler=torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=.995)
loss=torch.nn.MSELoss()
n_episodes=2000

durations=[]
learning_rates=[]
epsilons=[]

for episode in range(n_episodes):
    eps=eps*0.995
    env.reset((float(np.random.randint(ncols)), float(np.random.randint(nrows))))
    t=0
    done=env.done
    while (t<100 and not done):
        t+=1
        q=model(torch.tensor(env.observe()))
        A_t=int(np.random.randint(4) if np.random.rand()<eps else torch.argmax(q).reshape(1))
        observation,reward,done=env.step(A_t)
        target=torch.tensor(reward) if done else reward+gamma*torch.max(model(torch.tensor(observation).detach()))
        l=loss(target, q[A_t])
        optimizer.zero_grad()
        l.backward()
        optimizer.step()
    scheduler.step()
    durations.append(t)
    epsilons.append(eps)

In [None]:
plt.figure(figsize=(15, 8))
plt.title("Episode duration")
plt.plot(durations, 'o', markersize=2)
plt.plot(np.convolve(durations, np.ones(10), mode='valid')/10)

In [None]:
plot_policy(model, target_pos, penalty_pos)