<a href="https://colab.research.google.com/github/kailing231/treasurecube/blob/main/TreasureHunt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# environment.py

import numpy as np
import time
import random

from abc import ABC, abstractmethod


class AbstractEnvironment(ABC):
    def __init__(self):
        self.agent_sign = '+'
        self.goal_sign = 'G'
        self.corridor_sign = '-'

    def render(self):
        raise NotImplemented

    def reset(self):
        raise NotImplemented

    def step(self, action):
        raise NotImplemented

class TreasureCube(AbstractEnvironment):
    def __init__(self, max_step=20):
        super(TreasureCube, self).__init__()
        self.dim = 4
        self.max_step = max_step
        self.curr_pos = [0, 0, 0]  # (z, x, y)
        self.time_step = 0
        self.end_pos = [self.dim - 1, self.dim - 1, self.dim - 1]
        self.visual_state = []
        self.seed = None
        self.set_seed()
        self.all_actions = ['right', 'left', 'up', 'down', 'forward', 'backward']
        self.slip_actions = dict()
        self.set_action_rules()

    def reset(self):
        self.curr_pos = [0, 0, 0]
        self.time_step = 0
        self.end_pos = [self.dim - 1, self.dim - 1, self.dim - 1]
        self._reset_visual_states(self.curr_pos, self.end_pos)
        return ''.join(str(pos) for pos in self.curr_pos)

    def randomState(self):
        x = random.randrange(0,3)
        y = random.randrange(0,3)
        z = random.randrange(0,3)
        return [x,y,z]

    def step(self, action, stochastic=True):
        in_action = action  # action from agent
        assert action in self.all_actions
        reward = -0.1
        is_terminate = False
        pre_pos = self.curr_pos
        r = random.random()
        if action == 'right':
            if r < 0.1:
                action = 'up'
            elif r < 0.2:
                action = 'down'
            elif r < 0.3:
                action = 'forward'
            elif r < 0.4:
                action = 'backward'
            else:
                action = 'right'
        elif action == 'left':
            if r < 0.1:
                action = 'up'
            elif r < 0.2:
                action = 'down'
            elif r < 0.3:
                action = 'forward'
            elif r < 0.4:
                action = 'backward'
            else:
                action = 'left'
        elif action == 'up':
            if r < 0.1:
                action = 'left'
            elif r < 0.2:
                action = 'right'
            elif r < 0.3:
                action = 'forward'
            elif r < 0.4:
                action = 'backward'
            else:
                action = 'up'
        elif action == 'down':
            if r < 0.1:
                action = 'left'
            elif r < 0.2:
                action = 'right'
            elif r < 0.3:
                action = 'forward'
            elif r < 0.4:
                action = 'backward'
            else:
                action = 'down'
        elif action == 'forward':
            if r < 0.1:
                action = 'left'
            elif r < 0.2:
                action = 'right'
            elif r < 0.3:
                action = 'up'
            elif r < 0.4:
                action = 'down'
            else:
                action = 'forward'
        else:
            if r < 0.1:
                action = 'left'
            elif r < 0.2:
                action = 'right'
            elif r < 0.3:
                action = 'up'
            elif r < 0.4:
                action = 'down'
            else:
                action = 'backward'

        if not stochastic:
            action = in_action

        assert action in self.all_actions
        if action == 'left':
            if self.curr_pos[1] == 0:  # wall
                pass
            else:
                self.curr_pos[1] -= 1
        elif action == 'right':
            if self.curr_pos[1] == self.dim - 1:  # wall
                pass
            elif self.curr_pos[1] == self.dim - 2 and self.curr_pos[0] == self.dim - 1 and self.curr_pos[
                2] == self.dim - 1:
                self.curr_pos[1] += 1
                is_terminate = True
                reward = 1
            else:
                self.curr_pos[1] += 1

        elif action == 'forward':
            if self.curr_pos[0] == self.dim - 1:  # wall
                pass
            elif self.curr_pos[0] == self.dim - 2 and self.curr_pos[1] == self.dim - 1 and self.curr_pos[
                2] == self.dim - 1:
                self.curr_pos[0] += 1
                is_terminate = True
                reward = 1
            else:
                self.curr_pos[0] += 1
        elif action == 'backward':
            if self.curr_pos[0] == 0:  # wall
                pass
            else:
                self.curr_pos[0] -= 1

        elif action == 'up':
            if self.curr_pos[2] == self.dim - 1:  # wall
                pass
            elif self.curr_pos[2] == self.dim - 2 and self.curr_pos[0] == self.dim - 1 and self.curr_pos[
                1] == self.dim - 1:
                self.curr_pos[2] += 1
                is_terminate = True
                reward = 1
            else:
                self.curr_pos[2] += 1
        elif action == 'down':
            if self.curr_pos[2] == 0:
                pass
            else:
                self.curr_pos[2] -= 1

        assert action in self.all_actions
        self.time_step += 1
        if self.time_step == self.max_step - 1:
            is_terminate = True

        self._reset_visual_states(self.curr_pos, self.end_pos)
        return reward, is_terminate, ''.join(str(pos) for pos in self.curr_pos)

    def render(self):
        print(' '.join(['*'] * self.dim))
        for i in range(self.dim):
            for line in self.visual_state[i]:
                print(' '.join(line))
            print(' '.join(['#'] * self.dim))
        print(' '.join(['*'] * self.dim))

    def set_seed(self, seed=10086):
        self.seed = seed
        random.seed(seed)

    def _reset_visual_states(self, agent_pos, goal_pos):
        self.visual_state = [[[self.corridor_sign] * self.dim for _ in range(self.dim)] for _ in range(self.dim)]
        self.visual_state[agent_pos[0]][agent_pos[1]][agent_pos[2]] = self.agent_sign
        self.visual_state[goal_pos[0]][goal_pos[1]][goal_pos[2]] = self.goal_sign

    def set_action_rules(self):
        self.slip_actions['right'] = ['up', 'down', 'forward', 'backward', 'right']
        self.slip_actions['left'] = ['up', 'down', 'forward', 'backward', 'left']
        self.slip_actions['up'] = ['left', 'right', 'forward', 'backward', 'up']
        self.slip_actions['down'] = ['left', 'right', 'forward', 'backward', 'down']
        self.slip_actions['forward'] = ['left', 'right', 'up', 'down', 'forward']
        self.slip_actions['backward'] = ['left', 'right', 'up', 'down', 'backward']



test.py

In [54]:
import argparse
import random

import pandas as pd

In [58]:
# initial all 0's
# each coordinate value = [0,3]
# row = state, no of states = 4*4*4 = 64
# col = action, no of actions = 6

env = TreasureCube();
length = env.dim

indexnames = []
for x in range(length):
    for y in range(length):
        for z in range(length):
          indexnames.append(str(x) + str(y) + str(z))

Qtable = np.zeros([64, 6])
Qtable = pd.DataFrame(Qtable, index=indexnames, columns=env.all_actions)

In [66]:
Qtable["000"]

KeyError: ignored

In [63]:
class RandomAgent(object):
    def __init__(self):
        self.action_space = ['left','right','forward','backward','up','down'] # in TreasureCube
        self.Q = []

    def take_action(self, state):
        action = random.choice(self.action_space)
        return action

    # implement your train/update function to update self.V or self.Q
    # you should pass arguments to the train function
    def train(self, state, action, next_state, reward):
#        pass
      alpha = 0.5 # learning rate
      gamma = 0.99 # discount factor
      epsilon = 0.01 # exploration rate
      
      # For plotting metrics
      all_epochs = [] # todo remove
      all_penalties = [] # todo remove
      
      for i in range(1, 100001):        
        state = env.reset() # reset environment to a new, random state
        print("*** Current state = ", state)
#            state = env.randomState()

        epochs = 0
        penalties = 0
        reward = 0
        done = False

        while not done:
          if random.uniform(0, 1) < epsilon:
            action = env.all_actions.sample() # Explore action space
          else:   
            action = np.argmax(Qtable[state]) # Exploit learned values
#            action = np.argmax(Qtable["".join([str(c) for c in state])])

          # returns: reward, is_terminate, ''.join(str(pos) for pos in self.curr_pos)
#          next_state, reward, done, info = env.step(action) 
          reward, done, next_state = env.step(action) 

          old_value = Qtable[state, action]
          next_max = np.max(Qtable[next_state])

          new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
          Qtable[state, action] = new_value

          if reward == -10:
            penalties += 1

          state = next_state
          epochs += 1

        if i % 100 == 0:
          clear_output(wait=True)
          print(f"Episode: {i}")

      print("Training finished.\n")

In [64]:
# TODO
max_episode = 500
max_step = 500

env = TreasureCube()
agent = RandomAgent()

for epsisode_num in range(0, max_episode):
        state = env.reset()
        terminate = False
        t = 0
        episode_reward = 0
        while not terminate:
            action = agent.take_action(state)
            reward, terminate, next_state = env.step(action)
            episode_reward += reward
            # you can comment the following two lines, if the output is too much
            env.render() # comment
            print(f'step: {t}, action: {action}, reward: {reward}') # comment
            t += 1
            agent.train(state, action, next_state, reward)
            state = next_state
        print(f'epsisode: {epsisode_num}, total_steps: {t} episode reward: {episode_reward}')

* * * *
+ - - -
- - - -
- - - -
- - - -
# # # #
- - - -
- - - -
- - - -
- - - -
# # # #
- - - -
- - - -
- - - -
- - - -
# # # #
- - - -
- - - -
- - - -
- - - G
# # # #
* * * *
step: 0, action: left, reward: -0.1
*** Current state =  000


KeyError: ignored

In [61]:
# def test_cube(max_episode, max_step):
#     env = TreasureCube(max_step=max_step)
#     agent = RandomAgent() # TODO replace ??

#     for epsisode_num in range(0, max_episode):
#         state = env.reset()
#         terminate = False
#         t = 0
#         episode_reward = 0
#         while not terminate:
#             action = agent.take_action(state)
#             reward, terminate, next_state = env.step(action)
#             episode_reward += reward
#             # you can comment the following two lines, if the output is too much
#             env.render() # comment
#             print(f'step: {t}, action: {action}, reward: {reward}') # comment
#             t += 1
#             agent.train(state, action, next_state, reward)
#             state = next_state
#         print(f'epsisode: {epsisode_num}, total_steps: {t} episode reward: {episode_reward}')


# if __name__ == '__main__':
#     parser = argparse.ArgumentParser(description='Test')
#     parser.add_argument('--max_episode', type=int, default=500)
#     parser.add_argument('--max_step', type=int, default=500)
#     args = parser.parse_args()

#     test_cube(args.max_episode, args.max_step)


usage: ipykernel_launcher.py [-h] [--max_episode MAX_EPISODE]
                             [--max_step MAX_STEP]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-8caa3149-f1cc-4e6c-977c-18c4ab236a99.json


SystemExit: ignored

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
