In [None]:
from pycolab import ascii_art
from pycolab.prefab_parts import sprites as prefab_sprites
from pycolab.rendering import ObservationToFeatureArray

import numpy as np
import pycolab
import matplotlib.pyplot as plt
%matplotlib inline

# Introduction

# Implementations

### Environments

#### 1. Blocking Maze

In [None]:
GOAL_LOCATION = (1, 9)
GAME_ART = ['###########',
            '#         #',
            '#         #',
            '#         #',
            '######### #',
            '#         #',
            '#   P     #',
            '###########']


def make_game():
    """Builds and returns game."""
    return ascii_art.ascii_art_to_game(GAME_ART, what_lies_beneath=' ', sprites={'P': PlayerSprite})


class PlayerSprite(prefab_sprites.MazeWalker):
    """A `Sprite` for our player.
    This `Sprite` ties actions to going in the four cardinal directions. If we
    reach a magical location, the agent receives a reward of 1 and the epsiode terminates.
    """

    def __init__(self, corner, position, character):
        """Inform superclass that the '#' delimits the walls."""
        super(PlayerSprite, self).__init__(corner, position, character, impassable='#')
    


    def update(self, actions, board, layers, backdrop, things, the_plot):
        del layers, backdrop, things   # Unused in this application.

        # Apply motion commands.
        if actions == 0:    # walk upward?
            self._north(board, the_plot)
            #print('Walks up')
        elif actions == 1:  # walk downward?
            self._south(board, the_plot)
            #print('Walks down')
        elif actions == 2:  # walk leftward?
            self._west(board, the_plot)
            #print('Walks left')
        elif actions == 3:  # walk rightward?
            self._east(board, the_plot)
            #print('Walks right')

        # See if we've found the mystery spot.
        if self.position == GOAL_LOCATION:
            the_plot.add_reward(1.0)
            the_plot.terminate_episode()
        else:
            the_plot.add_reward(0.0)            

In [None]:
def show_board(obs):
    
    board = 10 * np.array(obs.layers['P'], dtype=np.float)
    board += 2 * np.array(obs.layers['#'], dtype=np.float)
    
    goal_mask = np.zeros(shape=board.shape)
    goal_mask[GOAL_LOCATION] = 1
    board += 7 * goal_mask

    plt.figure(figsize=(2,1))
    plt.imshow(board)
    plt.axis('off')
    plt.show()

In [None]:
# Instanciates our game object
game = make_game()

# Finalize the engine. Set-up and compute the first observation of the game
obs, reward, gamma = game.its_showtime();
print(reward, gamma)

# Take actions at random until termination
while not(game.game_over):
    a = np.random.randint(4)
    obs, reward, gamma = game.play(a)
    print(reward, gamma)
    show_board(obs)
print('GAME OVER')

#### 2. Shortcut Maze

### Dyna-Q Algorithm

In [None]:
def dynaQ():
    
    
    
    return

In [None]:
class SimpleModel(object):
    def __init__(self):
        self._mapping = dict()
    
    def feed(self, state, action, next_state, reward):
        if tuple(state) not in self._mapping.keys():
            self._mapping[tuple(state)] = dict()
        self._mapping[tuple(state)][action] = (reward, list(next_state))
    
    def sample(self):
        #
        state_index = np.random.choice(range(0, len(self._mapping.keys())))
        state = list(self._mapping)[state_index]
        #
        action_index = np.random.choice(range(0, len(self._mapping[state].keys())))
        action = list(self._mapping[state])[action_index]
        reward, next_state = self._mapping[state][action]
        return list(state), action, reward, list(next_state)

In [None]:
class DynaQ(object):
    def __init__(self):
        # discount factor
        self.gamma = 0.95

        # probability for exploration
        self.epsilon = 0.1

        # step size
        self.alpha = 0.1

        # n-step planning
        self.n_planning_steps = 5

        # average over several independent runs
        self.runs = 10
        
    # action selection with epsilon-greedy scheme
    def _select_action(self, state, state_action_values):
        if np.random.binomial(1, self.epsilon) == 1:
            return np.random.randint(4)
        else:
            values = state_action_values[state[0], state[1], :]
            return np.random.choice([action for action, value in enumerate(values) if value == np.max(values)])

    # tabular dyna-Q algorithm
    def apply(self, state_action_values, model, game):
        #
        obs, reward, gamma = game.its_showtime()
        position = np.array(obs.layers['P'])
        current_state = np.unravel_index(position.argmax(), position.shape)

        steps = 0
        while not(game.game_over):
            #
            steps += 1

            # choose an action to execute
            action = self._select_action(current_state, state_action_values)

            # take action
            obs, reward, gamma = game.play(action)
            position = np.array(obs.layers['P'])
            next_state = np.unravel_index(position.argmax(), position.shape)

            # Q-Learning update
            state_action_values[current_state[0], current_state[1], action] += \
                self.alpha * (reward + self.gamma * np.max(state_action_values[next_state[0], next_state[1], :]) -
                state_action_values[current_state[0], current_state[1], action])

            # feed the model with experience
            model.feed(current_state, action, next_state, reward)

            # sample experience from the model
            for t in range(0, self.n_planning_steps):
                sample_state, sample_action, sample_reward, sample_next_state = model.sample()
                state_action_values[sample_state[0], sample_state[1], sample_action] += \
                    self.alpha * (sample_reward + self.gamma * np.max(state_action_values[sample_next_state[0], sample_next_state[1], :]) -
                    state_action_values[sample_state[0], sample_state[1], sample_action])

            current_state = next_state
            show_board(obs)
        print('GAME OVER')

        return steps

In [None]:
# instantiate game
game = make_game()

# initialize state action values
state_action_values = np.zeros((game.rows, game.cols, 4))

# instantiate simple model
model = SimpleModel()

# instantiate dyna-Q algorithm
algorithm = DynaQ()

# apply algorithm
algorithm.apply(state_action_values, model, game)

# Experiments

# How dows Dyna-Q relate to Experience Replay ?

# Conclusion