# Trabalho Prático I - Problema de Transporte de Objeto

Descrição na [ementa](enunciado_trabalho_I_v2.pdf).

Neste cenário, um agente deve percorrer a grade 7x6, encontrar o objeto e transportá-lo até na base. Essa tarefa deve ser executada na menor quantidade de passos detempo possível. O agente não possui nenhum conhecimento prévio sobre o ambiente, o qual possui paredes, as quais ele não pode transpor. O agente também não possui conhecimento prévio sobre a localização do objeto. A localização inicial do agente, disposição das paredes e objeto são sempre fixas, conforme indicado na ilustração. A cada passo de tempo, o agente pode executar os seguintes movimentos na grade:
- mover para cima, baixo, esquerda ou direita;
- permanecer na mesma célula;

Este cenário apresenta algumas restrições de movimentação:
- O agente pode realizar apenas uma movimentação por passo de tempo;
- Se o agente escolher se mover para uma célula que não está vazia, seja por conta de uma parede ou objeto, ele não se move, i.e., permanece na mesma célula;
- Qualquer tentativa de locomoção para além da grade, resultará na não movimentação do agente;
- O objeto sé pode ser agarrado pela sua esquerda ou direita;
- Quando o agente ́e posicionado à direita ou esquerda do objeto, o objeto ée agarrado automaticamente;
- Uma vez agarrado o objeto, o agente não pode soltá-lo;
- O agente, quando agarrado ao objeto, só consegue se mover para uma nova célula desde que não haja nenhuma restrição de movimentação para o agente e objeto;

O episódio ée concluído automaticamente quando o objeto entra na base ou se atingir um número máximo de passos de tempo sem resolver a tarefa. Em ambos os casos, um novo episódio ́é iniciado, com o agente e objeto situados conforme abaixo.

In [17]:
# Libs necessária
import gym
from gym.envs.toy_text import discrete

import numpy as np

#import random
import sys

# Estratégia

Pensei em algumas estratégias: Criar dois agentes, um que vai até o objeto e outro que vai com o objeto até o objetivo; Ou criar um agente que possui pontuação somente se chegar no objetivo com o objeto, porém, aqui precisamos de um modelo onde as transições não podem ser pré-definidas. Sendo assim o algoritmo foi alterado para identificar as transições possíveis de acordo com a posição do agente e sua interação com o ambiente (se tem ou não o objeto). Outro detalhe é o custo de andar (para objetivar o menor camino), pra isso foi importante colocar que um passo, mesmo que válido, aplica uma "punição" de -1.

Iniciamente usei o DiscreteEnv do Gym para isso. Mas foi necessário ampliar para busca das transições de forma dinâmica. Ainda extendi Env e reimplementei as partes do DiscreteEnv que me interessavam.

Outro detalhe da estratégia foram as paredes. Imaginei que poderia remover a opção de movimento válido nas paredes. Acredito que funciona pois diminui o espaço de transições possíveis. Porém, soa como "trapassa". Mas como está no enunciado, optei por seguir dessa forma. Pensei também em uma medida mais lógica que prática: objeto também custa. Então, segurar o objeto começa a custar tudo conforme o terreno onde está o objeto. Porém, não houve impacto na eficiência do algoritmo.

In [71]:
# Montando o cenário 
# Exemplo 1: https://towardsdatascience.com/creating-a-custom-openai-gym-environment-for-stock-trading-be532be3910e
# Exemplo 2: https://github.com/caburu/gym-cliffwalking/blob/master/gym_cliffwalking/envs/cliffwalking_env.py
# Exemplo 3: https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py

class ObjectTransportEnv(discrete.Env):
    """
    O mapa é descrito com:
    _ : caminho livre
    o : posição inicial do agente
    + : objeto a ser transportado
    $ : objetivo
    # : bloqueio
    O episódio termina chegar no objetivo.
    Sua recompensa é 1 se pegar o objeto e 1 se chegar ao objetivo com o objeto.
    Fora do mapa não é acessível. Mais regras na ementa acima.
    """

    metadata = {"render.modes": ["human", "ansi"]}
    l_free_path = b'_'
    l_agent = b'o'
    l_object = b'+'
    l_goal = b'$'
    l_wall = b'#'
    actions_movements = [
        (0, -1) # left
        ,(-1, 0) # up
        ,(0, 1) # rigth
        ,(1, 0) # down
    ]
    agent_steps = []

    def __init__(self, desc):
        self.desc = np.asarray(desc, dtype="c")
        self.nrow, self.ncol = self.desc.shape
        self.reward_range = (-100.0, 100.0)

        # 4 ações (cima, baixo, esquerda, direita)
        nA = len(self.actions_movements)
        # Espaço é o tamanho do mapa
        nS = self.nrow * self.ncol

        # estado inicial (se tiver mais que um - escolhe um aleatório no reset)
        isd = np.array(self.desc == self.l_agent).astype("float64").ravel()
        isd /= isd.sum()

        # lista com transições (probability, nextstate, reward, done)
        # P = {s: {} for s in range(nS)}        
        # for row in range(self.nrow):
        #     for col in range(self.ncol):
        #         s = self._to_s(row, col)
        #         # movimento protegido para não ir pra fora da lista
        #         #for action, predicted_pos in self._possible_actions(row, col):
        #         for action, predicted_pos in self._possible_actions(row, col):
        #             P[s][action] = [(1.0, *self._update_probability_matrix(predicted_pos))]

        #super(ObjectTransportEnv, self).__init__(nS, nA, P, isd)
        
        #self.P = P
        self.isd = isd
        self.nS = nS
        self.nA = nA

        self.action_space = gym.spaces.Discrete(self.nA)
        # Não entendi pra que é usado no modelo. Acredito que é pra quando as posições iniciais são aleatórias.
        self.observation_space = gym.spaces.Discrete(self.nS)

        self.seed()
        self.s = discrete.categorical_sample(self.isd, self.np_random)
        self.os = None

    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]

    def move(self, pos, action):
        incrow, inccol = action
        row, col = pos
        newpos = (row + incrow, col + inccol)
        return newpos

    def rowcol_to_state(self, row, col):
        return row * self.ncol + col

    def pos_to_state(self, pos):
        row, col = pos
        return self.rowcol_to_state(row, col)

    def state_to_rowcol(self, s = None):
        _s = s or self.s
        return _s // self.ncol, _s % self.ncol

    def is_inside(self, pos):
        row, col = pos
        inside = row >= 0 and col >= 0 and row < self.nrow and col < self.ncol
        return inside

    def is_valid_pos(self, pos):
        inside = self.is_inside(pos)
        walkable = False
        if inside:
            row, col = pos
            # Se está segurando, o ponto original do objeto não é mais o mesmo
            valids = [self.l_free_path, self.l_goal, (self.l_object if self.os else b'')]
            walkable = self.desc[row, col] in valids
        return inside and walkable

    def is_valid_current_action(self, action):
        transitions = self.possible_actions()
        return action in transitions

    def sample_from_available_current_actions(self):
        transitions = self.possible_actions()
        keys = list(transitions.keys())
        value = self.np_random.choice(keys)
        return value

    def action_possible_transitions(self, pos, action):
        newpos = self.move(pos, action)
        newpos_obj = None
        objpos = self.state_to_rowcol(self.os) if self.os else None
        if not self.is_valid_pos(newpos):
            return None # Se estiver fora do mapa já cai for
        if objpos:
            newpos_obj = self.move(objpos, action) # Objeto move na mesma direção do agente
            if not self.is_valid_pos(newpos_obj):
                return None # Se o objeto estiver fora do mapa ou na parede, também não é um caminho válido
        else:
            # Checa se começou a segurar o objeto à esquerda ou direita
            for action_check in [self.actions_movements[0], self.actions_movements[2]]:
                newpos_obj_temp = self.move(newpos, action_check)
                if self.is_inside(newpos_obj_temp) and self.desc[newpos_obj_temp[0], newpos_obj_temp[1]] == self.l_object:
                    newpos_obj = newpos_obj_temp

        #print(pos, newpos, objpos, newpos_obj)

        newrow, newcol = newpos
        newletter = self.desc[newrow, newcol]
        
        rewards_points = {
            self.l_goal: 100.0,
            self.l_object: -1, # mesmo que freepath. Ele só passa se estiver segurando o objeto.
            self.l_free_path: -1,
        }

        reward = rewards_points.get(self.desc[newrow, newcol])
        if reward == None:
            raise Exception("Achou um caminho válido sem recompensa. Valide a regra: ", pos, newpos, newletter, reward, newpos_obj)

        done = newpos_obj and bytes(newletter) in [self.l_goal]

        prob = 1.0 # Tudo é 100% de probabilidade pq não tem esse tipo de medição no nosso modelo

        newstate = self.pos_to_state(newpos)
        newobjectstate = (self.pos_to_state(newpos_obj) if newpos_obj else None)
        
        return prob, newstate, reward, done, newobjectstate

    def possible_actions(self, pos = None):
        _pos = pos or self.state_to_rowcol(self.s)
        _p = {}
        for index, action in enumerate(self.actions_movements):
            result = self.action_possible_transitions(_pos, action)
            if result:
                _p[index] = result
        #print(_p)
        return _p

    def reset(self):
        self.s = discrete.categorical_sample(self.isd, self.np_random)
        self.os = None
        self.agent_steps = [] # para renderizar o caminho
        return int(self.s)

    def step(self, action_index):
        action = self.actions_movements[action_index]
        pos = self.state_to_rowcol()
        transitions = [self.action_possible_transitions(pos, action)]
        i = discrete.categorical_sample([t[0] for t in transitions], self.np_random)
        p, s, r, d, os = transitions[i]
        self.s = s
        self.os = os
        self.agent_steps.append(self.s)
        return (int(s), r, d, {"prob": p})

    def render(self, mode="human"):
        outfile = StringIO() if mode == "ansi" else sys.stdout

        desc = self.desc.tolist()
        desc2 = desc[:]
        colors = {
            self.l_goal: 'blue',
            self.l_agent: 'red',
            self.l_wall: 'white',
            self.l_object: 'magenta',
        }
        
        for row in range(len(desc)):
            for col in range(len(desc[row])):
                sb = desc[row][col]
                color = colors.get(sb)
                ss = sb.decode('utf-8')
                desc2[row][col] = gym.utils.colorize(ss, color, highlight=True) if color else ss

        if self.agent_steps:
            for _s in self.agent_steps:
                row, col = self.state_to_rowcol(_s)
                color = colors.get(desc[row][col])
                walked = "~"
                desc2[row][col] = gym.utils.colorize(walked, color, highlight=True) if color else walked
            text = self.l_agent.decode('utf-8')
            if self.os:
                text += ' ' + self.l_object.decode('utf-8')
            desc2[row][col] = gym.utils.colorize(text, colors.get(self.l_agent), highlight=True)

        outfile.write("\n".join(" ".join(line) for line in desc2) + "\n")

        if mode != "human":
            with closing(outfile):
                return outfile.getvalue()

t1_map = [
    "__$$$__",
    "___#___",
    "___+___",
    "_______",
    "##_####",
    "o_____#"
]

t1_env = ObjectTransportEnv(t1_map)
t1_env.reset()
t1_env.render()


_ _ [44m$[0m [44m$[0m [44m$[0m _ _
_ _ _ [47m#[0m _ _ _
_ _ _ [45m+[0m _ _ _
_ _ _ _ _ _ _
[47m#[0m [47m#[0m _ [47m#[0m [47m#[0m [47m#[0m [47m#[0m
[41mo[0m _ _ _ _ _ [47m#[0m


In [72]:
def Qlearning(environment, num_episodes=100, alpha=0.3, gamma=0.9, epsilon=1.0, decay_epsilon=0.1, max_epsilon=1.0, min_epsilon=0.01):
  
  # initializing the Q-table
  Q = np.zeros((environment.observation_space.n, environment.action_space.n))
  
  # additional lists to keep track of reward and epsilon values
  rewards = []
  epsilons = []
  last_accumulated_reward = -999999

  # episodes
  for episode in range(num_episodes):
      
      # reset the environment to start a new episode
      state = environment.reset()

      # reward accumulated along episode
      accumulated_reward = 0
      
      # steps within current episode
      for step in range(100):
          action = None

          # epsilon-greedy action selection
          # exploit with probability 1-epsilon
          if np.random.uniform(0, 1) > epsilon:
              action = np.argmax(Q[state,:])
              if not environment.is_valid_current_action(action):
                action = None
              #else:
              #  print('action1', ['left','up','right','down'][action], action, Q[state,:])
          # explore with probability epsilon
          if not action:
              #action = environment.action_space.sample()
              action = environment.sample_from_available_current_actions()
              #print('action2', ['left','up','right','down'][action], action)

          # perform the action and observe the new state and corresponding reward
          new_state, reward, done, info = environment.step(action)
          #print('state', new_state, 'reward', reward, 'done', done)

          # update the Q-table
          Q[state, action] = Q[state, action] + alpha * (reward + gamma * np.max(Q[new_state, :]) - Q[state, action])
          
          # update the accumulated reward
          accumulated_reward += reward
          #print(Q)

          # update the current state
          state = new_state

          # end the episode when it is done
          if done == True:
              break
      
      # decay exploration rate to ensure that the agent exploits more as it becomes experienced
      epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_epsilon*episode)
      
      # update the lists of rewards and epsilons
      rewards.append(accumulated_reward)
      epsilons.append(epsilon)

      # Teste pra mostrar cada caminho melhor
      # if accumulated_reward > last_accumulated_reward:
      #   environment.render()
      #   print('>' * 10, episode, last_accumulated_reward, '->', accumulated_reward)
      #   last_accumulated_reward = accumulated_reward 
      
      #break


  # render the environment
  environment.render()
  print(accumulated_reward)
    
  # return the list of accumulated reward along episodes
  return rewards

In [79]:
%%time

num_episodes=1000
alpha=0.3 # Peso para atualização do novo caminho descoberto
gamma=0.9
epsilon=1.0
decay_epsilon=0.1

# run Q-learning
rewards = Qlearning(t1_env, num_episodes, alpha, gamma, epsilon, decay_epsilon)

# print results
print ("Average reward (all episodes): " + str(sum(rewards)/num_episodes))
print ("Average reward (last 10 episodes): " + str(sum(rewards[-10:])/10))

_ _ [44m$[0m [44m$[0m [41mo +[0m _ _
_ _ _ [47m#[0m ~ _ _
_ _ ~ ~ ~ _ _
_ _ ~ _ _ _ _
[47m#[0m [47m#[0m ~ [47m#[0m [47m#[0m [47m#[0m [47m#[0m
[41mo[0m ~ ~ _ _ _ [47m#[0m
92.0
Average reward (all episodes): 92.497
Average reward (last 10 episodes): 91.8
Wall time: 651 ms


# Solução

A solução me surpreendeu.

Na minha cabeça o melhor caminho seria pela esquerda da parede em frente ao objetivo. Mas pra fazer pela esquerda seria necessário dois passos a mais. O algoritmo conseguiu achar o caminho pela direita da parede pois no meu exercício mental, o objeto deveria chegar ao objetivo também. Porém, isso não está nas regras.

# Observações

A solução implementada permite um mapa flexível com vários objetos e se entenderia dinamicamente.