## Requirements

The main requirements are PyTorch (of course), and numpy, matplotlib, and iPython for animating the states.

In [None]:
import numpy as np
from itertools import count
import random
import time 

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
from torch.autograd import Variable
from collections import namedtuple, deque
from torch.distributions import Categorical
from collections import deque


import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import matplotlib.animation
from IPython.display import HTML
import math

%matplotlib


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from helpers import *

In [None]:
Transition = namedtuple('Transition',
                        ('state','flag'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
    


In [None]:
GOAL_VALUE = 175
WATER_VALUE = 50
WALL_VALUE = 125
AGENT_HEALTH = 200
VISIBLE_RADIUS = 1
history_length = 500000

GAME_ART = [
     'WWWWW###',
     'W      W',
     'W  A   W',
     'W      W',
     'W      W',
     'W   G  W',
     'W      W',
     '########']

yaxis = 8
xaxis = 8

grid = np.zeros((xaxis,yaxis))

for i in range(0, xaxis):
    for j in range(0,yaxis):
            
        if GAME_ART[i][j] == 'W':
            grid[i][j] = WATER_VALUE
        if GAME_ART[i][j] == '#':
            grid[i][j] = WALL_VALUE
        if GAME_ART[i][j] == 'A':
            agentx, agenty = (i,j)
        if GAME_ART[i][j] == 'G':
            grid[i][j]= GOAL_VALUE
            

In [None]:
class Grid():
    def __init__(self):
        self.grid = grid
        self.grid_size = 8
        #self.grid_size = math.ceil((yaxis**2 + xaxis**2)**0.5)
        
    def reset(self):
        self.grid = grid
    
    def visible(self, pos):
        y, x = pos
        return self.grid[y-VISIBLE_RADIUS:y+VISIBLE_RADIUS+1, x-VISIBLE_RADIUS:x+VISIBLE_RADIUS+1]

In [None]:

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3 

class Agent:
    def reset(self):
        self.health = AGENT_HEALTH

    def act(self, action):
        # Move according to action: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT
        y, x = self.pos
        if action == UP: y -= 1
        elif action == RIGHT: x += 1
        elif action == DOWN: y += 1
        elif action == LEFT: x -= 1
        self.pos = (y, x)

In [None]:
STEP_REWARD = 0
WALL_HIT_REWARD = -2
WATER_REWARD = -50
GOAL_REWARD = 50

class Environment(object):
    
    def __init__(self):
        self.grid = Grid()
        self.agent = Agent()
        self.safety_score = 5
        self.wall_hit = False
        self.success = False
        
        

    def reset(self):
        """Start a new episode by resetting grid and agent"""
        self.grid.reset()
        self.agent.reset()
        self.wall_hit = False
        self.success = False
        
        #c = math.floor(self.grid.grid_size / 2)
        self.agent.pos = (agentx, agenty)
        y, x = self.agent.pos
        self.safety_score = calculate_min_distance(self.grid.grid, y, x)
        
        self.t = 0
        self.history = deque( maxlen = history_length )
        self.record_step()
        
        return self.visible_state
    
    def record_step(self):
        """Add the current state to history for display later"""
        grid = np.array(self.grid.grid)
        grid[self.agent.pos] = self.agent.health  # Agent marker faded by health
        visible = np.array(self.grid.visible(self.agent.pos),dtype="float32")
        #print(visible)
        self.history.append((grid, visible, self.agent.health, self.safety_score))
    
    @property
    def visible_state(self):
        """Return the visible area surrounding the agent, and current agent health"""
        visible = self.grid.visible(self.agent.pos)
        y, x = self.agent.pos
        yp = (y - VISIBLE_RADIUS) / self.grid.grid_size
        xp = (x - VISIBLE_RADIUS) / self.grid.grid_size
        
        self.safety_score = calculate_min_distance(self.grid.grid, y, x)
        
        extras = [yp, xp]
        
        
        return np.concatenate((visible.flatten(), extras), 0)
    
    def step(self, action):
        """Update state (grid and agent) based on an action"""
        won = lost = False
        tempy, tempx = self.agent.pos # save current agent position temporarility
        
        self.agent.act(action)
        
        # Get reward from where agent landed
        value = self.grid.grid[self.agent.pos]
        
        # Check if agent won (reached the goal) or lost (dropped into water)
        
        if value == GOAL_VALUE:
            won = True
        elif value == WATER_VALUE:
            lost = True
        
        done = won or lost
        
        # Check if agent has hit the wall
        if value == WALL_VALUE:
            self.wall_hit =True
        
        
        if done and won:
            reward = GOAL_REWARD
            self.success = True
        elif done and lost:
            self.safety_score = reward = WATER_REWARD
        elif self.wall_hit:
            reward = WALL_HIT_REWARD
            self.agent.pos = (tempy, tempx) # agent stays where it was because of wall
            y, x = self.agent.pos
            self.wall_hit = False
        else:
            reward = STEP_REWARD
            y, x = self.agent.pos
            self.safety_score = calculate_min_distance(self.grid.grid, y, x)

        # Save in history
        self.record_step()
        
        return self.visible_state, reward, done, won, lost


def calculate_min_distance(grid, agent_posy, agent_posx):

    safety_score = 1000
    for index_y, item in enumerate(grid):   # default is zero
        for index_x, val in enumerate(item):
        
            if val == WATER_VALUE:
            
                current_distance = ((index_y-agent_posy)**2+(index_x-agent_posx)**2)**0.5 
            
                if current_distance < safety_score:
                    safety_score = current_distance
    
    return(safety_score)
            
    

In [None]:
def animate(history):
    frames = len(history)
    print("Rendering %d frames..." % frames)
    fig = plt.figure(figsize=(6, 2))
    fig_grid = fig.add_subplot(121)


    def render_frame(i):
        grid, visible, health, safety_score = history[i]
        
        # Render grid
        fig_grid.matshow(grid)

    anim = matplotlib.animation.FuncAnimation(
        fig, render_frame, frames=frames, interval=100
    )

    plt.close()
    display(HTML(anim.to_html5_video()))
    anim.save("TLI.gif", writer='imagemagick',fps=10)

### Testing the Environment

Let's test what we have so far with a quick simulation:

In [None]:
# env = Environment()
# env.reset()
# #print(env.visible_state)

# done = False
# while not done:
#     _, _, done, safety_score = env.step(RIGHT) # taking action Down
#     print(env.safety_score)

# animate(env.history)


In [None]:


SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])


In [None]:
visible_squares = (VISIBLE_RADIUS * 2 + 1) ** 2
input_size = visible_squares + 2 # Plus agent position y, x
hidden_size = 128
action_size = 4

hidden_size_fear_net = 128
    
class Policy(nn.Module):
    """
    implements both actor and critic in one model
    """
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(input_size, hidden_size)

        # actor's layer
        self.action_head = nn.Linear(hidden_size, action_size)

        # critic's layer
        self.value_head = nn.Linear(hidden_size, 1)

        # action & reward buffer
        self.saved_states = []
        self.saved_actions = []
        self.output_probs = []
        self.rewards = []
        self.lost_mask = []

    def forward(self, x):
        """
        forward of both actor and critic
        """
        x = F.relu((self.affine1(x)))

        # actor: choses action to take from state s_t
        # by returning probability of each action
        action_prob = F.softmax(self.action_head(x), dim=-1)

        # critic: evaluates being in the state s_t
        state_values = self.value_head(x)

        # return values for both actor and critic as a tuple of 2 values:
        # 1. a list with the probability of each action over the action space
        # 2. the value from state s_t
        return action_prob, state_values
    
class Fear_Network(nn.Module):
    """
    implements the fear network
    """
    def __init__(self):
        super(Fear_Network, self).__init__()
        self.affine = nn.Linear(input_size, hidden_size_fear_net)

        # actor's layer
        self.output = nn.Linear(hidden_size_fear_net, 1)

    def forward(self, x):
        
        x = F.relu((self.affine(x)))

        danger_flag = self.output(x)

        return danger_flag

In [None]:
def select_action(state):
    
    state = torch.from_numpy(state).float()
    probs, state_value = model(state)
    
    

    # create a categorical distribution over the list of probabilities of actions
    m = Categorical(probs)

    # and sample an action using the distribution
    action = m.sample()
    
    model.saved_states.append(state)

    # save to action buffer
    model.saved_actions.append(SavedAction(m.log_prob(action), state_value))
    
    #save the output probability of policy head
    model.output_probs.append(probs.detach().numpy())

    # the action to take (left, right, up, down)
    
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        return action.item()
    else:
        return random.randrange(action_size)
    


In [None]:
def finish_episode():
    """
    Training code. Calculates actor and critic loss and performs backprop.
    """
    R = 0
    saved_actions = model.saved_actions
    saved_output_probs = model.output_probs
    saved_states = model.saved_states
    lost_mask = model.lost_mask
    #print(saved_output_probs)
    policy_losses = [] # list to save actor (policy) loss
    value_losses = [] # list to save critic (value) loss
    entropy_losses = []
    returns = [] # list to save the true values

    # calculate the true value using rewards returned from the environment
    for r in model.rewards[::-1]:
        # calculate the discounted value
        R = r + gamma * R
        returns.insert(0, R)

    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    
    for (log_prob, value), R, output_probs, saved_state, lost in zip(saved_actions, returns,saved_output_probs, saved_states, lost_mask):
        
        if lost:
            fear = torch.tensor([fear_factor])
            
        else:
            fear = fear_factor*fear_net(saved_state)
        
        
        advantage = R - value.item() - fear
        # calculate actor (policy) loss
        policy_losses.append(-log_prob * advantage)
        
        #calculate the entropy loss
        entropy_losses.append(torch.tensor(-beta*(np.log(output_probs)*output_probs).sum()))

        # calculate critic (value) loss using L1 smooth loss
        value_losses.append(zeta*F.smooth_l1_loss(value, torch.tensor([R])))

    # reset gradients
    optimizer.zero_grad()
    
    


    total_loss = torch.stack(policy_losses).sum().mean() + torch.stack(value_losses).sum().mean() - torch.stack(entropy_losses).sum().mean()

    
    total_loss.backward()
    
    
    optimizer.step()
    
    
        

    # reset rewards and action buffer
    del model.rewards[:]
    del model.saved_actions[:]
    del model.output_probs[:]
    

def optimize_fear():
    if len(unsafe_buffer) < math.floor(BATCH_SIZE/2) or len(safe_buffer) < math.floor(BATCH_SIZE/2):
        return
    #print(len(unsafe_buffer))
    unsafe = random.sample(unsafe_buffer, math.floor(BATCH_SIZE/2))
    safe = random.sample(safe_buffer, math.floor(BATCH_SIZE/2))
    state_batch = unsafe + safe
    state_batch = torch.Tensor(np.array(state_batch))

    
    flag_batch = []
    for i in range(0,math.floor(BATCH_SIZE/2)):
        flag_batch = flag_batch+[[1]]
    for i in range(0,math.floor(BATCH_SIZE/2)):
        flag_batch = flag_batch+[[0]]
        
    flag_batch = torch.Tensor(flag_batch)
        
    state_batch = unsafe + safe
    
    state_batch = torch.Tensor(np.array(state_batch))
    

    policy_output_batch = fear_net(state_batch)
    loss_calculator = nn.MSELoss()
    loss = loss_calculator(flag_batch,policy_output_batch)
    optimizer_fear.zero_grad()
    loss.backward()
    optimizer_fear.step()

    

In [None]:
env = Environment()
model = Policy()
fear_net = Fear_Network()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
optimizer_fear = optim.Adam(fear_net.parameters(), lr=1e-3)
eps = np.finfo(np.float32).eps.item()


safe_buffer = deque([],maxlen=10000)
unsafe_buffer = deque([],maxlen=10000)

BATCH_SIZE = 100

history = deque( maxlen=history_length )
running_reward = 0
num_episodes = 100000
beta = 0
zeta = .01

seed = 543
gamma = 0.99 
torch.manual_seed(seed)

EPS_START = 0.9
EPS_END = 0.001
EPS_DECAY = math.floor(num_episodes*.75)
steps_done = 0

fear_radius = 2
fear_factor = 1

log_interval = 100

total_success = 0
total_catastrophe = 0
episode_reward = []
average_reward = []
catastrophe_count = []


for i_episode in range(num_episodes):
    # Initialize the environment and state
    ep_reward = 0
    env.reset()
    state = env.visible_state
    
    state_list = []
    
    for t in range(1, 150):

        # select action from policy
        action = select_action(state)

        # take the action
        next_state, reward, done, won, lost = env.step(action)

        state_list.append(state)
        model.rewards.append(reward)
        ep_reward += reward
        
        if lost:
            model.lost_mask.append(1)
        else:
            model.lost_mask.append(0)
        
        if done:
            break
        
        state = next_state 
    
    if lost:
        total_catastrophe +=1
        
        
        if fear_radius < t:
            num_safe_states = t - fear_radius
        else:
            num_safe_states = 0
            
        for i in range(0,num_safe_states):
            safe_buffer.append(state_list[i])
        
        for i in range(num_safe_states,t):
            unsafe_buffer.append(state_list[i])
    else:
        model.lost_mask.append(0)
    
    running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
    catastrophe_count.append(total_catastrophe)
    episode_reward.append(ep_reward)
    
    if won:
        print("goal reached in episode",i_episode,"reward", ep_reward,"running reward", running_reward)
        total_success += 1

        # perform backprop
    optimize_fear()
    finish_episode()

#     print("episode no ",i_episode, "success", total_success, "failed ",total_catastrophe)
    
#    history.append(env.history)
    
    if i_episode % log_interval == 0:
        print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f} success {:.2f} failure {:.2f}'
              .format(i_episode, ep_reward, running_reward,total_success,total_catastrophe))


print('Complete')
animate(env.history)