### Q-Planning

In [None]:
import numpy as np
import matplotlib.pyplot as plt

### Tabular Dyna-Q

In [None]:
# Initialize Q(s, a) and Model(s, a) for all s ∈ S and a ∈ A(s)
def DynaQ(Q, ModelReward, ModelState1, Sequence, n, alpha, gamma, UNOBSERVED):
    # Loop forever:
    for state, is_terminal, next_state, action, reward in Sequence:
        # (a) S ← current (nonterminal) state
        # (b) A ← ε-greedy(S, Q)
        # (c) Take action A; observe resultant reward, R, and state, S`
        state, is_terminal, next_state, action, reward = next(Sequence)
        # (d) Q(S, A) ← Q(S, A) + α*[R + γ*max(a)Q(S`, a) - Q(S, A)]
        Q[state][action] = Q[state][action] + alpha*( reward + gamma*max(Q[state]) - Q[state][action])
        # (e) Model(S, A) ← R, S` (assuming deterministic environment)
        ModelReward[state][action] = reward
        ModelState1[state][action] = next_state
        # (f) Loop repeat n times:
        observed = np.transpose(np.nonzero((ModelState1 != UNOBSERVED).all(axis=-1)))
        for sample in np.random.randint(len(observed), size=n):
            #   S ← random previously observed state
            #   A ← random action previously taken in S
            SA = tuple(observed[sample])
            ### S = SA[:-1]
            ### A = SA[-1]
            #   R, S` ← Model(S, A)
            R = ModelReward[SA]
            S1 = tuple(ModelState1[SA])
            #   Q(S, A) ← Q(S, A) + α*[R + γ*max(a)Q(S`, a) - Q(S, A)]
            Q[SA] = Q[SA] + alpha*( R + gamma*max(Q[S1]) - Q[SA])


### Generic sequence generator

In [None]:
class SequenceGenerator:
    def __init__(self, getAction, getStartState, getTransition, episode_imax=1, steps_max=0,
                callBack=None):
        self.episode_imax = episode_imax
        self.get_action = getAction
        self.get_start_state = getStartState
        self.get_transition = getTransition
        self.steps_max = steps_max
        self.callback = callBack

    def __iter__(self):
        self.episode_i=1
        self.state = self.get_start_state(self.episode_i)
        self.step_i = 1
        return self

    def __next__(self):
        if self.episode_imax > 0 and self.episode_i > self.episode_imax or \
            self.steps_max > 0 and self.step_i > self.steps_max:
            raise StopIteration

        action = self.get_action(self.state, self.episode_i)
        keep_state = self.state
        is_terminal, self.state, reward = self.get_transition(keep_state, action)
        if self.callback: self.callback(self.episode_i, self.step_i, reward)
        self.step_i += 1
        self.episode_i += int(is_terminal)
        return keep_state, is_terminal, self.state, action, reward

REWARD_I = 4
STATE_I = 0

class EpsilonGreedyPolicy:
    def __init__(self, Q, Epsilon=0.1):
        self.Q = Q;
        self.epsilon = Epsilon
        
    def __call__(self, state, episode_i=1):
        q = self.Q[state]
        if np.random.rand(1)[0] < self.epsilon:
            return np.random.randint(0,len(q))
        return np.argmax(q)

### Shortcut Maze Problem

In [None]:
maze_shape = (6,9,)
maze_obstacles_1 = [[2,1,3,9]]
maze_obstacles_2 = [[2,1,3,8]]
OBSTACLE_TAG = 0
START_TAG = 2
start_cell = (0,3)
TARGET_TAG = 3
target_cell = (5,8)

def buildMaze(shape, obstacles):
    maze = np.ones(shape, dtype = np.uint8)
    for begin_row, begin_column, end_row, end_column in obstacles:
        for r in range(begin_row, end_row):
            for c in range(begin_column, end_column):
                maze[r,c] = OBSTACLE_TAG
    return maze

maze1 = buildMaze(maze_shape, maze_obstacles_1)
maze2 = buildMaze(maze_shape, maze_obstacles_2)

In [None]:
# Actions legend: Up, Right, Down, Left
ACTIONS = np.array([[1,0], [0,1], [-1,0], [0,-1]])
ACTIONS_NUM = len(ACTIONS)
NO_REWARD = 0
REWARD = 1
UNOBSERVED = (-1,-1,)
shape = maze_shape + (ACTIONS_NUM,)
#Q = np.zeros(shape)
Q = (np.random.random(shape)-0.5)*0.01
#dt = np.dtype([('reward', float, 1),('state', np.int8, 2)])
#Model = np.broadcast_to(np.array((0,UNOBSERVED,), dtype = dt), shape)
ModelReward = np.zeros(shape)
ModelState1 = np.full(shape+(2,),-1)
#def mazeGetStartState():
#    return start_cell

def maze_getTransition(maze, state, action):
    next_state = tuple(np.array(list(state)) + ACTIONS[action])
    if not (next_state in np.ndindex(maze.shape) and maze[next_state] > 0):
        return (False, state, NO_REWARD)
    elif next_state == target_cell:
        return (True, start_cell, REWARD)

    return (False, next_state, NO_REWARD)

### Problem Setup

In [None]:
alpha = 0.2
gamma = 0.95
n = 5
TotalReward = 0.0
Reward = [0.0]

def callback(e,s,r):
    global TotalReward
    TotalReward += r
    if s % 100 == 0: Reward.append(TotalReward)

gen = SequenceGenerator(EpsilonGreedyPolicy(Q, 0.4),
                        lambda e: start_cell,
                        lambda s,a: maze_getTransition(maze1, s, a),
                        0,
                        3000,
                        callback
                       )
sequence = iter(gen)

In [None]:
DynaQ(Q, ModelReward, ModelState1, sequence, n, alpha, gamma, UNOBSERVED)

In [None]:
Reward

### Testing

In [None]:
observed = np.transpose(np.nonzero((ModelState1 != UNOBSERVED).all(axis=-1)))
len(observed)

In [None]:
state, is_terminal, next_state, action, reward = next(sequence)
# (d) Q(S, A) ← Q(S, A) + α*[R + γ*max(a)Q(S`, a) - Q(S, A)]
Q[state][action] = Q[state][action] + alpha*( reward + gamma*max(Q[state]) - Q[state][action])
# (e) Model(S, A) ← R, S` (assuming deterministic environment)
ModelReward[state][action] = reward
ModelState1[state][action] = next_state
# (f) Loop repeat n times:
observed = np.transpose(np.nonzero((ModelState1 != UNOBSERVED).all(axis=-1)))
len(observed), state, next_state

In [None]:
for sample in np.random.randint(len(observed), size=n):
    #   S ← random previously observed state
    #   A ← random action previously taken in S
    SA = tuple(observed[sample])
    ### S = SA[:-1]
    ### A = SA[-1]
    #   R, S` ← Model(S, A)
    R = ModelReward[SA]
    S1 = tuple(ModelState1[SA])
    #   Q(S, A) ← Q(S, A) + α*[R + γ*max(a)Q(S`, a) - Q(S, A)]
    Q[SA] = Q[SA] + alpha*( R + gamma*max(Q[S1]) - Q[SA])
    print(SA,S1)

In [None]:
maze = maze1
gen = SequenceGenerator(EpsilonGreedyPolicy(Q, 0.4), 
                        lambda e: start_cell,
                        lambda s,a: maze_getTransition(maze, s, a),
                        1
                       )
sequence = iter(gen)

In [None]:
x = np.array([[(1,66),(77,-33)],
              [(-3,2),(77,9)],
              [(0,31),(1,45)]])
#np.transpose(np.nonzero(x[...,0] == 1))
#(x == [-3,2]).all(axis = -1)
np.transpose(np.nonzero((x == [-3,2]).all(axis = -1)))
#x[...,1]

In [None]:
import itertools

l = list(itertools.repeat((-1,-1,),10))
a = np.array(l)
b = a.reshape((2,5,2))
(b == (-1,-1))
