In [1]:
import numpy as np
import gym

gym environment

In [2]:
env_name = 'FrozenLake-v0'

In [3]:
env = gym.make(env_name)

behavior define

In [28]:
# binary expand categorical type to preprocess states
def binary_expand(n, idx=env.observation_space.n):
    if type(idx) is int:
        idx = np.array(list(range(idx)))
    res = np.zeros(idx.shape)
    res[n==idx] = 1
    return res

preprocess_func = binary_expand

In [5]:
# active function: sigmoid
def sigmoid(v):
    return 1.0 / (1.0 + np.exp(-v))

def sigmoid_derivative(v):
    return v * (1 - v)

In [6]:
# nn model define by layers:
# (node_count, active_function)
model_define = [
    (100, None, None),
    (env.action_space.n, sigmoid, sigmoid_derivative),
]

hyperparameters

In [7]:
episodes = 10000
learning_rate = 0.01
discount = 0.99

epsilon = 1
epsilon_decay = 0.999
epsilon_min = 0.01

deep Q engine

In [10]:
# buffer to temporarily store hidden layer result for back propagation
hidden_layer_input_buf = []

In [32]:
# preprocess state information into input parameters
preprocess_state = preprocess_func

In [12]:
# forward pass, calculate predict value with current model
def model_forward(state):
    cur_res = state
    hidden_layer_input_buf.clear()
    for layer, activation_func, _ in model:
        hidden_layer_input_buf.append(cur_res)
        cur_res = np.dot(cur_res, layer)
        if activation_func:
            cur_res = activation_func(cur_res)
    return cur_res

In [83]:
# back propagation to update model
def back_propagation(td_err):
    grads = []
    delta = td_err
    for layer_out, (layer, _, activation_derivative) in zip(reversed(hidden_layer_input_buf), reversed(model)):
        grads.append(np.outer(layer_out.T, delta))
        delta = np.dot(layer, delta)
        if activation_derivative:
            delta = delta * activation_derivative(layer_out)
    grads.reverse()
    return grads

In [97]:
# update model with gradients
def update_model(grads):
    for i in range(len(model)):
        model[i][0] = model[i][0] + learning_rate * grads[i]

In [99]:
model = []
prev_node_count = env.observation_space.n
for node_count, activation_func, activation_derivative in model_define:
    model.append([np.random.randn(prev_node_count, node_count)/np.sqrt(prev_node_count),
                  activation_func,
                  activation_derivative])
    prev_node_count = node_count

In [100]:
# for test
episodes = 1

In [106]:
step_count_list = []
for ep in range(episodes):
    end = False
    step_count = 0
    
    # startup state
    new_state = preprocess_state(env.reset())
    
    # run until game end
    while not end:
        # predict with the latest model
        state = new_state
        q_values = model_forward(state)
        
        # epsilon-greedy action selection
        if np.random.rand(1) > epsilon:
            action = np.argmax(q_values)
        else:
            action = env.action_space.sample()

        # step forward
        new_state_no, reward, end, _ = env.step(action)
        step_count += 1

#         output for test
#         print(action, new_state_no, reward, end)
#         env.render()
        
        # update q values with actual returns
        # save new state for the next step
        new_state = preprocess_state(new_state_no)
        
        # calculate error for back propagtion with Bellman equation
        td_err = np.zeros_like(q_values)
        td_err[action] = reward + discount * np.max(model_forward(new_state)) - q_values[action]
        
        # back propagation with td error
        grads = back_propagation(td_err)
        
        # update model with gradients
        update_model(grads)
        
    # update epsilon
    epsilon = max(epsilon*epsilon_decay, epsilon_min)
    
    # record step counts
    step_count_list.append(step_count)
    # print informations
    # TODO
    if ep%50 == 0:
        print('In episode {}, {} steps are used.'.format(ep, step_count))
    

In episode 0, 3 steps are used.


run with model

In [None]:
end = False
state = preprocess_state(env.reset())
env.render()
while end:
    q_values = model_forward(state)
    action = np.argmax(q_values)
    state_no, _, end, _ = env.step(action)
    env.render()
    state = preprocess_state(state_no)