In [99]:
import numpy as np
import torch.nn

action2name = {
    0: "N",
    1: "W",
    2: "S",
    3: "E",
}

name2action = {v: k for k, v in action2name.items()}

action2delta = {
    0: (0, 1),  # x,y format
    1: (-1, 0),
    2: (0, -1),
    3: (1, 0),
}

delta2action = {v: k for k, v in action2delta.items()}


class Agent:
    def __init__(
        self,
        init_position=[0, 0],
        env_shape=(100, 100),
        x_bounds=range(5, 95),
        y_bounds=range(5, 95),
        
    ):

        self._env_shape = env_shape
        self.position = np.array(init_position)
        self._x_bounds = x_bounds
        self._y_bounds = y_bounds
        self._actions, self._traj = self.generate_trajectory()
        self._found_fire = False
        self._view = np.zeros((5,5))
        
        self._policy = torch.nn.Sequential(
            torch.nn.Linear(25, 2048),
            torch.nn.ReLU(inplace=True),
            torch.nn.Linear(2048, 2048),
            torch.nn.ReLU(inplace=True),
            torch.nn.Linear(2048, 4)
        )
        
        self._optimizer = torch.optim.Adam(self._policy.parameters())

    def random_policy(self):
        Q = np.random.rand(4)
        return Q

    def action(self, epsilon):
        """
        sample and return action.
        epsilon is the fraction of random actions
        """
        
        # sample from trajectory (for debugging)
        # action = self._actions.pop(0)

        # sample randomly
        Q_rand = self.random_policy()
        
        # sample form policy
        view = self._view.flatten()
        self._tensor_in = torch.tensor(view, dtype=torch.float32)
        self._tensor_out = self._policy(self._tensor_in)
        Q_policy = self._tensor_out.detach().numpy()
        
        Q = Q_rand if np.random.rand() < epsilon else Q_policy

        viable = self.viable_actions()
        Q[~viable] = - np.inf
        action = np.argmax(Q)

        delta = action2delta[action]
        self.position += delta

        return action

    @property
    def observation(self):
        return self._view

    @observation.setter
    def observation(self, value):
        self._view = value
        
    def backprop(self, reward):
        self._optimizer.zero_grad()
        
        tensor_reward = torch.zeros(4, dtype=torch.float32)
        tensor_reward[action] = reward
        self._tensor_out.backward(tensor_reward)
        
        self._optimizer.step()
        

    def viable_actions(self):
        """
        returns a mask
        """

        viable = []
        x, y = self.position

        # check North
        if y + 1 in self._y_bounds:
            viable.append("N")

        # check South
        if y - 1 in self._y_bounds:
            viable.append("S")

        # check West
        if x - 1 in self._x_bounds:
            viable.append("W")

        # check East
        if x + 1 in self._x_bounds:
            viable.append("E")

        # convert to indices
        viable = [name2action[v] for v in viable]

        is_viable = np.zeros(4, dtype=np.bool)
        is_viable[viable] = True

        return is_viable

    def reset(position=[0, 0]):
        self.position = np.array(position)

    def extinguish(self, env, range_xy=(3, 3)):
        x, y = self.position
        dx, dy = range_xy
        env._state_map[y - dy : y + dy, x - dx : x + dx] = 0

    def observe(self, env, fov=(3, 3)):
        view = env._state_map
        self._view = env._state_map[y - dy : y + dy, x - dx : x + dx]

    def generate_trajectory(self, step_width=10):

        actions = []
        traj = []

        for x_step in self._x_bounds[::10]:

            midway = x_step + int(step_width / 2)

            # go north
            for y in self._y_bounds:
                traj.append((x_step, y))
                actions.append(name2action["N"])

            # go east
            for x in range(x_step, midway):
                traj.append((x, y))
                actions.append(name2action["E"])

            # go down
            for y in reversed(self._y_bounds):
                traj.append((midway, y))
                actions.append(name2action["S"])

            # go west
            for x in range(midway, x_step + step_width):
                traj.append((x, y))
                actions.append(name2action["E"])

        return actions, traj
    
    @staticmethod
    def save_video(filename, frame_buffer):
        writer = VideoWriter(filename, resolution=(500, 500), fps=60)
        for frame in frame_buffer:
            writer.write(np.array(frame)) 
        writer.close()


In [93]:
from environment import Environment

In [100]:
drone = Agent(init_position=[10, 10])
env = Environment(random_seed=260)

In [104]:
epsilon = range(0.9,0.05)

TypeError: 'float' object cannot be interpreted as an integer

In [None]:
EPISODES = 200
MAX_ITER = 1000
EPSILON_SCHEDULE = np.linspace(0.9, 0.05, num=EPISODES)
SAVE_VIDEO_EVERY = 1 #


for episode, epsilon in zip(range(EPISODES), EPSILON_SCHEDULE):
    
    print(episode)
    
    env.reset()
    drone.position = np.array([50,50])
    
    frame_buffer = []
    
    for it in range(MAX_ITER):
        
        action = drone.action(epsilon=0)
        position = drone.position
        drone.observation, reward = env.step(position, action, sim_step_every=30)

        drone.backprop(reward)

        frame_buffer.append(env.snapshot(drone.position))

        if env.done: break
            
    if e % SAVE_VIDEO_EVERY == 0: drone.save_video(f"episode-{episode}.mp4", frame_buffer)

0
1
2
3
4
5
