In [1]:
import numpy as np
import matplotlib.pyplot as plt

レイヤの実装

In [2]:
class layer:
  # W,b,f,u,x_output,delta,eta # node  input: n output: m,  batch l
  def __init__(self, W, b, f, df_y, eta):
    self.W = W # (m,n)
    self.b = b # (m)
    self.f = f 
    self.df_y = df_y
    self.eta = eta
    self.m = self.W.shape[0]
    self.n = self.W.shape[1]

  def forward_old(self, x_input): # x_input: (n,l), x_output,u: (m,l)
    self.x_input = x_input
    self.l = self.x_input.shape[1]
    self.u = (np.dot(self.W, x_input).T + self.b).T
    self.x_output = self.f(self.u)
    return self.x_output

  def forward(self, x_input, dropout_p = 0.0): # x_input: (n,l), x_output,u: (m,l)
    self.x_input = x_input
    self.l = self.x_input.shape[1]
    self.u = (np.dot(self.W, x_input).T + self.b).T
    self.x_output = self.f(self.u)
    if dropout_p != 0.0:
        dropout_mask = np.random.random_sample(self.x_output.shape)
        dropout_mask[dropout_mask < dropout_p] = 0.0
        dropout_mask[dropout_mask > 0.0] = 1.0
        self.x_output = self.x_output * dropout_mask
    return self.x_output

  def bp(self, c): # c: (m,l)
    self.delta = c * self.df_y(self.x_output); # (m,l)
    return np.dot(self.W.T, self.delta) # (n,l)

  def update(self):
    self.W = self.W - self.eta * np.dot(self.delta, self.x_input.T) / self.l
    self.b = self.b - self.eta * np.mean(self.delta, axis=1)

NNの実装

In [3]:
class dqn:
    def __init__(self, node_list, out_func=lambda x: x):
        self.out_func = out_func
        self.layerlist = []
        self.node_list = node_list
        self.state_n = node_list[0]
        self.action_n = node_list[-1]
        self.dropout_list = np.zeros(len(node_list)-1)
        for i in range(len(node_list)-1):
            self.layerlist.append(layer(np.random.randn(node_list[i+1], node_list[i])*0.03,
                                        np.random.randn(node_list[i+1])*0.03, 
                                        lambda x : 1/(1 + np.exp(-x)),
                                        lambda y : y*(1-y),
                                        0.5))
        self.set_W_n()

    def set_dropout(self, drop_list):
        self.dropout_list = drop_list
  
    def set_W(self, dev_list):
        for i in range(len(dev_list)):
            self.layerlist[i].W = np.random.standard_normal(self.layerlist[i].W.shape)*dev_list[i]
            self.layerlist[i].b = np.random.standard_normal(self.layerlist[i].b.shape)*dev_list[i]
  
    def set_W_n(self):
        dev_list = []
        for i in range(len(self.layerlist)):
            dev_list.append(1/np.sqrt(self.layerlist[i].W.shape[1]))
            self.set_W(dev_list)
            # print(dev_list)

    def set_func(self, func_list):
        for i in range(len(func_list)):
            self.layerlist[i].f = func_list[i][0]
            self.layerlist[i].df_y = func_list[i][1]
  
    def set_func_relu(self):
        for i in range(len(self.layerlist)-1):
            self.layerlist[i].f = relu()[0]
            self.layerlist[i].df_y = relu()[1]
        self.layerlist[-1].f = identify()[0]
        self.layerlist[-1].df_y = identify()[1]

    def set_func_sigmoid(self):
        for i in range(len(self.layerlist)):
            self.layerlist[i].f = sigmoid()[0]
            self.layerlist[i].df_y = sigmoid()[1]

    def set_l_rate(self, l_list):
        for i in range(len(l_list)):
            self.layerlist[i].eta = l_list[i] 
    
    def set_rate_all(self, l_rate):
        for i in range(len(self.layerlist)):
            self.layerlist[i].eta = l_rate
  
    def forward(self, state): # state:(n,1), n:state size = NN input size
        x = state
        x_record = [] # record x for learning
        for l in self.layerlist:
            x_record.append(x)
            x = l.forward(x)
        x = self.out_func(x) # use out_func
        x_record.append(x)
        action = np.argmax(x, axis=0)
        return x_record, action
    
    def forward_old(self, state):
        x = state
        for l in self.layerlist:
            x = l.forward(x)
        x = self.out_func(x)
        maxQ = np.max(x, axis=0)
        return maxQ

    def collect_init(self):
        self.x_collect = [[] for _ in range(len(self.layerlist)+1)]

    def play(self, state, epsilon):
        self.x_record, self.action_best = self.forward(state)
        if epsilon > np.random.uniform(0,1):
            next_action = np.random.choice(self.action_n, self.action_best.size)
        else:
            next_action = self.action_best
        return next_action
    
    def collect(self):
        for i in range(len(self.x_collect)):
            self.x_collect[i].append(self.x_record[i][:,0])
        # print(self.x_collect)
        return self.x_collect
    
    def collect_end(self):
        self.x_collect_arr = []
        for i in range(len(self.x_collect)):
            self.x_collect_arr.append(np.array(self.x_collect[i]).T)
        # print(self.x_collect_arr)
        return self.x_collect_arr
    
    def update(self, label):
        # layerのxをx_recで書き換える
        for i in range(len(self.layerlist)):
            self.layerlist[i].x_input = self.x_collect_arr[i]
            self.layerlist[i].x_output = self.x_collect_arr[i+1]
        # bp
        c = self.x_collect_arr[-1] - label
        for l in reversed(self.layerlist):
            c = l.bp(c)
        for l in self.layerlist:
            l.update()

def relu(k=1.0):
    def relu_func(x):
        mask = (x<0)
        x[mask] = 0
        return k*x
    return (relu_func, lambda y:k*(y>0))

def identify():
    return (lambda x:x, lambda y:1)

def sigmoid(alpha=1.0):
    return (lambda x:1/(1+np.exp(-alpha*x)), lambda y:alpha*y*(1-y))

def tanh():
    return (lambda x:np.tanh(x), lambda y:(1-y*y))

def softmax(x):
    x_max = np.max(x, axis=0)
    exp_x = np.exp(x - x_max)
    sum_e = np.sum(exp_x, axis=0)
    return exp_x / sum_e

In [170]:
class MyEnvSim():
    def __init__(self):
        self.reset()
        self.state_table = np.array([[1,0],[0,1]])
        self.reward_table = np.array([[0,0],[0,1]])

    def reset(self):
        # self._state = np.array([[0],[0]]).T # do not use this
        self._state = np.array([[0]])
        return self._state

    def step(self, action):
        self._state = np.array(self.state_table[self._state, action])
        reward = self.reward_table[self._state, action]
        done = False
        return self._state, reward, done
    
    def get_rewards(self, states): # states:(n,T), return (m,T) n:state,m:action
        return self.reward_table[states][0].T
        
    def get_nextS(self, states): # states:(n,T), return nextS:(m,T)
        return self.state_table[states][0].T
        

1ループについて試す。

In [27]:
net = dqn([1,4,4,2])
#net.set_func([identify(),identify(),identify()])
net.set_func([tanh(), tanh(), identify()])
env = MyEnvSim()
#state = net.state_reset(env.reset()) # いらない
#loop
env.reset()
net.collect_init()

#loop
action = net.play(state, 0.3)
#print(action)
next_state, reward, done = env.step(action)
net.collect()
state = next_state
action = net.play(state, 0.3)
#print(action)
next_state, reward, done = env.step(action)
a = net.collect()
state = next_state
#np.array(a)
#np.array(a[1])

#loop end
x_all = net.collect_end()
# get reward
r_all = env.get_rewards(x_all[0])
print(r_all)
sn_all = env.get_nextS(x_all[0])
#print(sn_all)
data_oldQ_input = sn_all.ravel(order='F').reshape(1,sn_all.size)
#print(data_oldQ_input)
maxQold = net.forward_old(data_oldQ_input).reshape(sn_all.shape, order='F')
print(maxQold)

gamma = 0.2
d = r_all + gamma * maxQold
print(d) # ラベル

# bp
net.update(d)



[[0 0]
 [1 1]]
[[0 0]
 [0 0]]
[[0. 0.]
 [1. 1.]]


ループでやってみる

成功例

(1,10,10,2), identify, lrate0.3, gamma0.5, epsilon=0.5 * (1 / (i+1))

上と同じ、(tanh,tanh,identify)

In [177]:
net = dqn([1,10,10,2])
# net.set_func([identify(),identify(),identify()])
net.set_func([tanh(), tanh(), identify()])
net.set_rate_all(0.3)
env = MyEnvSim()
gamma = 0.5

for i in range(50):
    state = env.reset()
    net.collect_init()
    epsilon = 0.5 * (1 / (i+1))
    reward_sum = 0
    action_list = []
    for j in range(7):
        action = net.play(state, epsilon) # calc NN and select action using e-greedy
        action_list.append(action[0])
        next_state, reward, done = env.step(action)
        net.collect() # record x
        state = next_state
        reward_sum += reward
    x_all = net.collect_end() # all x
    r_all = env.get_rewards(x_all[0]) # all r
    sn_all = env.get_nextS(x_all[0]) # all nextS
    data_oldQ_input = sn_all.ravel(order='F').reshape(1,sn_all.size)
    maxQold = net.forward_old(data_oldQ_input).reshape(sn_all.shape, order='F')
    #print(x_all)
    label = r_all + gamma * maxQold
    #print(label)
    net.update(label)
    print(reward_sum, action_list)


[[0]] [0, 0, 0, 0, 0, 0, 0]
[[3]] [1, 0, 0, 0, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[0]] [0, 0, 0, 0, 0, 0, 0]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[5]] [1, 0, 1, 1, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[0]] [1, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[5]] [0, 1, 1, 1, 1, 1, 0]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[4]] [0, 1, 1, 1, 0, 0, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1, 1, 1]
[[6]] [0, 1, 1, 1, 1

### プログラムの説明

外側のループ

1. 環境のリセット
1. NNのニューロンの値保存の初期化
1. epsilonの設定
1. ゲームをプレイする（ループ）
1. プレイ中のすべてのニューロンの値Xを得る
1. プレイ中の各状態sについて、全てのaを選択したときの報酬、次のs'をそれぞれ計算する
1. 上で求めた次のs'から、maxQ(s')を求める
1. 上で求めた報酬とmaxQ(s')でラベルLを作成
1. XとLを使って誤差逆伝播

内側のループ（ゲームプレイ）

1. 状態とNNから、epsilon-greedyで行動を選択する
1. 環境にactionを入力し、次の状態と報酬を得る
1. NNのニューロンの値xを記録する

#### envに必要な機能

1. reset: 状態をリセットする
1. step: 行動を一つ指定したとき、状態を遷移させ、（次の状態、報酬、終了判定）を返す
1. get_rewards: 状態を指定したとき、全ての行動に対する報酬を返す
1. get_nextS: 状態を指定したとき、全ての行動に対する次の状態を返す

get_rewardsは、状態がn個の変数で表されるとき、T個の状態を表した(n,T)の行列に対して、(m,T)の行列を返す必要がある(mは行動の変数の個数）。

get_nextSも同様。

### 迷路をDQNで解く

通路上での報酬も負にすると、うまくいきやすい。

In [57]:
class MyMaze():
    def __init__(self, maze, wall_reward, start, goal):
        self.maze = maze
        self.wall_reward = wall_reward
        self.maze_ravel = self.maze.ravel()
        self.h = self.maze.shape[0]
        self.w = self.maze.shape[1]
        self.start = start[0]*self.w + start[1]
        self.goal = goal[0]*self.w + goal[1]
        # print(self.start)
        # print(self.goal)
        self.move = np.array([[-1,0],[0,1],[1,0],[0,-1]])
        self.maze_size = maze.size
        # print(self.maze_size)
        self.reset()

    def reset(self):
        self.state = np.zeros(self.maze.size)
        self.state[self.start] = 1
        return self.state
     
    def step_all(self, state): # state:(h*w), nxt_num:(m), nxt_rw:(m)
        num = np.where(state==1)[0][0] # 状態の番号0~8
        idx = np.array([num//self.w, num%self.w]) #状態の場所 (2)
        nxt = self.move + idx # 移動先の場所 (4,2)
        is_wall_h = (nxt[:,0] >= self.h) | (nxt[:,0] < 0)
        is_wall_w = (nxt[:,1] >= self.w) | (nxt[:,1] < 0)
        is_wall = is_wall_h | is_wall_w
        nxt[is_wall] = idx
        nxt_num = nxt[:,0] * self.w + nxt[:,1] # 移動先の番号 (4)
        nxt_rw = self.maze_ravel[nxt_num] # 移動先の報酬 (4)
        nxt_num[is_wall] = num # 壁なら番号をもとのまま
        nxt_rw[is_wall] = self.wall_reward # 壁なら報酬0
        return nxt_num, nxt_rw
    
    def step_all_multi(self, states): # states,(h*w,T), return nextS_num:(mT), rewards:(mT)
        n = states.shape[0]
        T = states.shape[1]
        msk = states.T == 1
        Z = np.zeros((T,n))
        B = np.arange(n)
        Z += B
        snum = Z[msk]
        idx = np.array([snum//self.w,snum%self.w]).T 
        nxt = np.hstack([idx+move[0], idx+move[1], idx+move[2], idx+move[3]]).reshape(4*T, 2)
        bf =  np.hstack([idx,idx,idx,idx]).reshape(4*T, 2)
        is_wall_h = (nxt[:,0] >= self.h) | (nxt[:,0] < 0)
        is_wall_w = (nxt[:,1] >= self.w) | (nxt[:,1] < 0)
        is_wall = is_wall_h | is_wall_w
        nxt[is_wall] = bf[is_wall]
        nxt_num = nxt[:,0] * self.w + nxt[:,1] # 移動後の番号 (mT)
        # print(nxt_num)
        nxt_rw = self.maze_ravel[nxt_num.astype('int32')] # 移動先の報酬 (mT)
        nxt_rw[is_wall] = self.wall_reward # 壁なら報酬0
        return nxt_num, nxt_rw.reshape((4,T), order='F')
    
    def step(self, action): # action:0~3
        nxt_num, nxt_rw = self.step_all(self.state)
        nxtn = nxt_num[action]
        nxtr = nxt_rw[action]
        self.state = np.zeros(self.maze.size)
        self.state[nxtn] = 1
        done = False
        if nxtn == self.goal:
            done = True
        return self.state, nxtr, done
    
    def get_R_and_NS(self, states): # states:(n,T)
        nextS_list = []
        reward_list = []
        for i in range(states.shape[1]):
            nxtn, nxtr = self.step_all(states[:,i])
            nextS_list.append(nxtn)
            reward_list.append(nxtr)
        nextS = np.array(nextS_list).T
        reward = np.array(reward_list).T
        return nextS, reward

In [64]:
maze = np.array([[-0.5,-1,-1],
                 [-0.5,-1,-1],
                 [-0.5,-0.5,2]])
wall_reward = -2
start = [0,0]
goal = [2,2]
env = MyMaze(maze, wall_reward, start, goal)
maze_size = env.maze_size
net = dqn([maze_size,20,20,4])
net.set_func([tanh(), tanh(), identify()])
net.set_rate_all(0.3)
gamma = 0.5

for i in range(101):
    state = env.reset()
    net.collect_init()
    epsilon = 0.5 * (1 / (i+1))
    reward_sum = 0
    action_list = []
    for j in range(70):
        action = net.play(state.reshape(state.size, 1), epsilon) # calc NN and select action using e-greedy
        # action_list.append(action[0])
        next_state, reward, done = env.step(action[0])
        # print(next_state, reward, done)
        net.collect() # record x
        state = next_state
        reward_sum += reward
        if done:
            break
    if i % 10 == 0:
        print(reward_sum)
    x_all = net.collect_end() # all x
    nextS, reward = env.step_all_multi(x_all[0])
    nextS_n_mT = np.zeros((maze_size, nextS.size))
    nextS_n_mT[nextS.astype('int32'), np.arange(nextS.size)] = 1
    maxQold = net.forward_old(nextS_n_mT).reshape(reward.shape, order='F')
    label = reward + gamma * maxQold
    net.update(label)

-19.0
0.5
-137.0
0.5
0.5
0.5
0.5
0.5
0.5
0.5
0.5


In [65]:
maze = np.array([[-0.5,-1,-1],
                 [-0.5,-1,-1],
                 [-0.5,-0.5,2]])
wall_reward = -2
start = [0,0]
goal = [2,2]
env = MyMaze(maze, wall_reward, start, goal)
maze_size = env.maze_size
net = dqn([maze_size,20,20,4])
net.set_func([tanh(), tanh(), identify()])
net.set_rate_all(0.3)
gamma = 0.5

for i in range(100):
    state = env.reset()
    net.collect_init()
    epsilon = 0.5 * (1 / (i+1))
    reward_sum = 0
    action_list = []
    for j in range(70):
        action = net.play(state.reshape(state.size, 1), epsilon) # calc NN and select action using e-greedy
        # action_list.append(action[0])
        next_state, reward, done = env.step(action[0])
        # print(next_state, reward, done)
        net.collect() # record x
        state = next_state
        reward_sum += reward
        if done:
            break
    if i % 10 == 0:
        print(reward_sum)
    x_all = net.collect_end() # all x
    nextS, reward = env.get_R_and_NS(x_all[0])
    nextS_ravel = nextS.ravel(order='F')
    nextS_n_mT = np.zeros((maze_size, nextS_ravel.size))
    nextS_n_mT[nextS_ravel, np.arange(nextS_ravel.size)] = 1
    maxQold = net.forward_old(nextS_n_mT).reshape(reward.shape, order='F')
    label = reward + gamma * maxQold
    net.update(label)

-105.0
-137.0
-138.0
-1.0
-52.5
0.5
0.5
0.5
0.5
0.5


In [71]:
maze = np.array([[-0.5,-0.5,-0.5,-1],
                 [-1,-0.5,-1,-1],
                 [-1,-0.5,-1,-1],
                 [-0.5,-0.5,-0.5,10]])
wall_reward = -2
start = [0,0]
goal = [3,3]
env = MyMaze(maze, wall_reward, start, goal)
maze_size = env.maze_size
net = dqn([maze_size,40,40,4])
net.set_func([tanh(), tanh(), identify()])
net.set_rate_all(0.2)
gamma = 0.5

for i in range(401):
    state = env.reset()
    net.collect_init()
    epsilon = 0.5 * (1 / (i+1))
    reward_sum = 0
    action_list = []
    for j in range(100):
        action = net.play(state.reshape(state.size, 1), epsilon) # calc NN and select action using e-greedy
        # action_list.append(action[0])
        next_state, reward, done = env.step(action[0])
        # print(next_state, reward, done)
        net.collect() # record x
        state = next_state
        reward_sum += reward
        if done:
            # print(done)
            break
    if i%100 == 0:
        print(reward_sum)
    x_all = net.collect_end() # all x
    nextS, reward = env.get_R_and_NS(x_all[0])
    nextS_ravel = nextS.ravel(order='F')
    nextS_n_mT = np.zeros((maze_size, nextS_ravel.size))
    nextS_n_mT[nextS_ravel, np.arange(nextS_ravel.size)] = 1
    maxQold = net.forward_old(nextS_n_mT).reshape(reward.shape, order='F')
    label = reward + gamma * maxQold
    net.update(label)

-31.5
-50.0
-50.0
7.5
7.5


In [114]:
maze = np.array([[-0.5,-0.5,-0.5,-1],
                 [-1,-0.5,-1,-1],
                 [-1,-0.5,-1,-1],
                 [-0.5,-0.5,-0.5,10]])
wall_reward = -2
start = [0,0]
goal = [3,3]
env = MyMaze(maze, wall_reward, start, goal)
maze_size = env.maze_size
net = dqn([maze_size,40,40,4])
net.set_func([tanh(), tanh(), identify()])
net.set_rate_all(0.2)
gamma = 0.3

for i in range(401):
    state = env.reset()
    net.collect_init()
    epsilon = 0.5 * (1 / (i+1))
    reward_sum = 0
    action_list = []
    for j in range(50):
        action = net.play(state.reshape(state.size, 1), epsilon) # calc NN and select action using e-greedy
        # action_list.append(action[0])
        next_state, reward, done = env.step(action[0])
        # print(next_state, reward, done)
        net.collect() # record x
        state = next_state
        reward_sum += reward
        if done:
            # print(done)
            break
    if i%100 == 0:
        print(reward_sum)
    x_all = net.collect_end() # all x
    nextS, reward = env.step_all_multi(x_all[0])
    nextS_n_mT = np.zeros((maze_size, nextS.size))
    nextS_n_mT[nextS.astype('int32'), np.arange(nextS.size)] = 1
    maxQold = net.forward_old(nextS_n_mT).reshape(reward.shape, order='F')
    label = reward + gamma * maxQold
    net.update(label)

-65.5
6.0
7.0
7.5
7.5
