# Bellman Equation

![](bellman01.png)
![](bellman02.png)
![](bellman03.png)

------------------------

## Reinforcement Learning: Q-table to solve Maze 

[reference](https://github.com/simoninithomas/Deep_reinforcement_learning_Course/blob/master/Q%20learning/FrozenLake/Q%20Learning%20with%20FrozenLake.ipynb)

In [None]:
# reinforcement learning: Maze

MAZE = [
    [0, 0, 1, 0, 0, 0],
    [1, 0, 0, 0, 1, 0],
    [1, 1, 1, 0, 1, 0],
    [0, 1, 0, 0, 0, 0],
    [0, 0, 1, 1, 0, 1],
    [0, 0, 0, 0, 0, 0],
]

MAZE = [
    [0,0],
    [1,0],
]


import numpy as np
import random

R, C = len(MAZE), len(MAZE[0])

# Define the rewards for every step
ACTIONS = [[1,0], [-1,0], [0,-1], [0,1]]
ACTIONS_STR = ['v', '^', '<', '>']
REWARDS = np.zeros((R, C))
for i in range(R):
    for j in range(C):
        if MAZE[i][j] == 0:
            r = -0.1
        elif MAZE[i][j] == 1:
            r = -1
        REWARDS[i][j] = r
REWARDS[R-1][C-1] = 1

def go_one_step(state:int, action:int):
    i, j = state//C, state%C
    ii, jj = i + ACTIONS[action][0], j + ACTIONS[action][1]
    new_state = ii * C + jj

    done = False
    if ii == R-1 and jj == C-1:
        done = True
        
    if ii < 0 or ii >= R or jj < 0 or jj >= C:
        r = -1
        new_state = state
    else:
        r = REWARDS[ii][jj]
    
    return new_state, r, done

def avail_actions(state:int):
    res = []
    i, j = state//C, state%C
    for a in range(len(ACTIONS)):
        ii, jj = i + ACTIONS[a][0], j + ACTIONS[a][1]
        if ii >= 0 and ii < R and jj >= 0 and jj < C:
            res.append(a)
    return res
            
        
        
action_size = 4 # ^ v < >
state_size = R * C

qtable = np.zeros((state_size, action_size))

for state in range(state_size):
    acts = avail_actions(state)
    for a in range(4):
        if a not in acts:
            qtable[state, a] = -1


In [None]:
# Hyperparameters
total_episodes = 1500 # Total episodes
learning_rate = 0.8 # Learning rate
max_step = 99 # Max step per episode
gamma = 0.95 # Discounting rate

# Exploration parameters
epsilon = 1.0
max_epsilon = 1.0
min_epsilon = 0.01
decay_rate = 0.005


In [None]:
# Train the Q-table
rewards = []

for episode in range(total_episodes):
    done = False
    total_rewards = 0
    state = 0
    
    for step in range(max_step):
        rnd = random.uniform(0,1)
        if rnd > epsilon:
            action = np.argmax(qtable[state, :])
        else:
            action = random.sample(avail_actions(state),1)[0]
            
        new_state, reward, done = go_one_step(state, action)
        
        print(state, new_state)
        
        ## Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
        qtable[state, action] = qtable[state, action] + learning_rate * (reward + gamma * np.max(qtable[new_state, :]) - qtable[state, action])
        
        total_rewards += reward
        state = new_state
        
        if done:
            break
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    rewards.append(total_rewards)
    
print("Score over time: " + str(sum(rewards)/total_episodes))
print(qtable)
        
            
        

In [None]:
# Solve the Maze

for episode in range(1):
    state = 0
    done = False
    actions = []
    
    for step in range(max_step):
        action = np.argmax(qtable[state, :])
        
        new_state, reward, done = go_one_step(state, action)
        
        state = new_state
        actions.append(action)
        
        if done:
            break
        
    print(list(map(lambda a: ACTIONS_STR[a], actions)))
        

## Reinforcement Learning: Deep Q Network

[REF 1](https://github.com/mswang12/minDQN/blob/main/minDQN.ipynb)
[REF 2](https://zhuanlan.zhihu.com/p/110769361)

## 简单的DQN

只需要用DNN代替前面的Q table即可

In [None]:
# Net: QTable
import torch
import torch.nn.functional as F

class Net(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output):
        super(Net, self).__init__()
        self.hidden = torch.nn.Linear(n_feature, n_hidden)
        self.predict = torch.nn.Linear(n_hidden, n_output)
        
    def forward(self, x):
        x = F.relu(self.hidden(x))
        x = self.predict(x)
        return x

dqn_target = Net(n_feature=1, n_hidden=10, n_output=4)    
dqn = Net(n_feature=1, n_hidden=10, n_output=4)

def trainDQN(x, y):
    optimizer = torch.optim.Adam(dqn.parameters(), lr=0.01)
    loss_func = torch.nn.MSELoss()
    
    for t in range(100):
        py = dqn(x)
        loss = loss_func(py, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()       


In [None]:
# pre train        
for i in range(10):
    n = 100000
    x = torch.from_numpy(np.array([[random.randint(0, R*C)] for _ in range(n)])).type(torch.FloatTensor)
    y = torch.from_numpy(np.array([[1,1,1,1] for _ in range(n)])).type(torch.FloatTensor)
    trainDQN(x, y)
        
dqn_target.load_state_dict(dqn.state_dict())

for state in range(R*C):
    x = torch.from_numpy(np.array([[state]])).type(torch.FloatTensor)
    y = dqn(x).data.numpy()[0]
    print(y)

In [None]:
# Hyperparameters
total_episodes = 500 # Total episodes
learning_rate = 0.8 # Learning rate
max_step = 99 # Max step per episode
gamma = 0.95 # Discounting rate

# Exploration parameters
epsilon = 1.0
max_epsilon = 1.0
min_epsilon = 0.01
decay_rate = 0.005

In [None]:
# Train
rewards = []

for episode in range(total_episodes):
    done = False
    total_rewards = 0
    state = 0

    for step in range(max_step):
    #while True:
        rnd = random.uniform(0,1)
        # 
        x = torch.from_numpy(np.array([[state]])).type(torch.FloatTensor)
    
        qs = dqn(x).data.numpy()[0]
        
        if rnd > epsilon:
            action, mx = 0, -100000
            for a in avail_actions(state):
                if qs[a] >= mx:
                    action = a
                    mx = qs[a]
            
        else:
            action = random.sample(avail_actions(state),1)[0]
            
        new_state, reward, done = go_one_step(state, action)        
        new_x = torch.from_numpy(np.array([[new_state]])).type(torch.FloatTensor)
        new_qs = dqn_target(new_x).data.numpy()[0]
        
        #print(qs,state, new_state,  action, reward, done)
        
        print(state, new_state)
        
        if done:
            max_future_q = reward
        else:
            max_future_q = reward + gamma*np.max(new_qs)
        
        
        ## Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
        #qs[action] = qs[action] + learning_rate*(reward+gamma*np.max(new_qs) - qs[action])
        
        qs[action] = (1-learning_rate)*qs[action] + learning_rate*max_future_q
        
        #print(qs)
        
        if step % 4 == 0:
            # update DQN
            x = torch.from_numpy(np.array([[state]])).type(torch.FloatTensor)
            y = torch.from_numpy(np.array([qs])).type(torch.FloatTensor)
            for i in range(100):
                trainDQN(x, y)    
            
        
        qs = dqn(x).data.numpy()[0]
        #print(qs)
    
        total_rewards += reward
        state = new_state
        
        if done:
            break
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    rewards.append(total_rewards)
    
    print(total_rewards)
    
    if episode % 1 == 0:
        dqn_target.load_state_dict(dqn.state_dict())
    
print("Score over time: " + str(sum(rewards)/total_episodes))


In [None]:
# Solve the Maze

for episode in range(1):
    state = 0
    done = False
    actions = []
    
    for step in range(max_step):
        x = torch.from_numpy(np.array([[state]])).type(torch.FloatTensor)
        qs = dqn_target(x).data.numpy()[0]
        action, mx = 0, -100000
        for a in avail_actions(state):
            if qs[a] >= mx:
                action = a
                mx = qs[a]
        
        new_state, reward, done = go_one_step(state, action)
        
        print(state, action, reward, qs)
        
        state = new_state
        actions.append(action)
        
        if done:
            break
        
    print(list(map(lambda a: ACTIONS_STR[a], actions)))

In [None]:
for state in range(R*C):
    x = torch.from_numpy(np.array([[state]])).type(torch.FloatTensor)
    y = dqn(x).data.numpy()[0]
    print(y)
    

## DQN 三个优化

![](dqn01.png)
![](dqn02.png)
![](dqn03.png)