<a href="https://colab.research.google.com/github/juliusfrost/Research-Paper-Implementations/blob/master/Dyna_Q.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dyna-Q

Q learning with Model based planning

Works for discrete state transitions.

## Build Environment

In [0]:
from collections import defaultdict
import random
import copy
import math
import numpy as np

random.seed(0)

###define the maze environment

In [0]:
grid = [[0,0,0,0,0],
        [0,1,0,1,0],
        [0,1,0,0,0],
        [0,1,0,1,0],
        [0,0,0,1,0]]

class Env:
    def __init__(self):
        self.grid = grid
        self.start = (0,0)
        self.end = (len(self.grid)-1, len(self.grid)-1)
        self.position = self.start
        self.goal = self.end
        
    def reset(self):
        self.grid = grid
        self.position = self.start
        return self.get_state()
    
    def possible_actions(self):
        up, down, left, right = (-1, 0), (1, 0), (0, -1), (0, 1)
        return (up, down, left, right)
    
    def step(self, action):
        self.apply_action(action)
        reward = self.reward()
        next_state = self.get_state()
        done = self.is_done()
        return reward, next_state, done
    
    def is_done(self):
        return self.position == self.goal
    
    def apply_action(self, action):
        if action in self.actions_avail():
            self.position = self.next_position(action)
    
    def next_position(self, action):
        return ((self.position[0] + action[0]), (self.position[1] + action[1]))
    
    def actions_avail(self):
        avail = []
        for action in self.possible_actions():
            next_position = self.next_position(action)
            if 0 <= next_position[0] < len(self.grid) and 0 <= next_position[1] < len(self.grid[0]):
                if self.grid[next_position[0]][next_position[1]] == 0:
                    avail.append(action)
        return avail
        
    def random_action(self):
        avail = self.actions_avail()
        i = random.randint(0, len(avail)-1)
        return avail[i]
        
    def get_state(self):
        return self.position
        
    def reward(self):
        if self.position == self.goal:
            return 10
        else:
            return -1
        
    def render(self):
        grid_show = copy.deepcopy(self.grid)
        grid_show[self.position[0]][self.position[1]] = 2
        grid_show[self.goal[0]][self.goal[1]] = 3
        print('Maze:')
        for row in grid_show:
            print(row)
        

###try random actions

In [3]:
env = Env()
env.reset()
env.render()
for i in range(10):
    env.step(env.random_action())
    env.render()

Maze:
[2, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 2, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 2, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 2, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 2, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 2, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 2]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 2]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 2]
[0, 0, 0, 1, 3]
Maze:
[0, 0, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 1, 0, 0, 0]
[0, 1, 0, 1, 0]
[0, 0, 0, 1, 3]


##Dyna-Q

###Hyperparameters

In [0]:
# discount factor
gamma = 0.9
# step size
alpha = 0.5
# with probability epsilon choose random action
epsilon = 0.1
# maximum steps for episode termination
max_steps = 100
# planning steps to update q function based on model
n = 10
# number of episodes to train for
episodes = 50

###define functions and updates

In [0]:
seen_states = set()
seen_actions = defaultdict(set)
Q_table = defaultdict(float)
Model_table = defaultdict(lambda: (0, 0))

env = Env()
possible_actions = env.possible_actions()

def Q(state, action):
    return Q_table[(state, action)]


def max_Q(state):
    best_value = - math.inf
    selected_action = None
    for action in possible_actions:
        value = Q(state, action) 
        if value > best_value:
            best_value = value
            selected_action = action
    return selected_action, best_value

def Q_update(state, action, reward, next_state):
    _, best_value = max_Q(next_state)
    Q_table[(state, action)] = Q_table[(state, action)] + alpha * (reward + gamma * best_value - Q_table[(state, action)])

def Model(state, action):
    return Model_table[(state, action)]
    

def Model_update(state, action, reward, next_state):
    Model_table[(state, action)] = (reward, next_state)
    

def epsilon_greedy(state, Q):
    if random.random() < epsilon:
        action = random.choice(possible_actions)
    else:
        action, best_value = max_Q(state)
    
    return action

###Algorithm

In [6]:
for i in range(episodes):
    state = env.reset()
    
    # epsilon decay
    epsilon *= 0.99
    
    done = False
    total_reward = 0
    step = 0
#     if i == episodes-1:
#         env.render()
    while not done and step < max_steps:
        action = epsilon_greedy(state, Q)
        reward, next_state, done = env.step(action)
#         if i == episodes-1:
#             env.render()
        
        Q_update(state, action, reward, next_state)
        Model_update(state, action, reward, next_state)
        
        seen_states.add(state)
        seen_actions[state].add(action)
        
        state = next_state
        total_reward+=reward
        step += 1
        
        for planning_step in range(n):
            s = random.choice(list(seen_states))
            a = random.choice(list(seen_actions[s]))
            r, ns = Model(s, a)
            Q_update(s, a, r, ns)
    print(i, 'reward =', total_reward, 'steps =', step)

0 reward = -74 steps = 85
1 reward = -14 steps = 25
2 reward = 0 steps = 11
3 reward = 3 steps = 8
4 reward = 1 steps = 10
5 reward = 2 steps = 9
6 reward = 1 steps = 10
7 reward = 2 steps = 9
8 reward = 2 steps = 9
9 reward = 1 steps = 10
10 reward = 3 steps = 8
11 reward = 1 steps = 10
12 reward = 3 steps = 8
13 reward = 3 steps = 8
14 reward = 0 steps = 11
15 reward = 3 steps = 8
16 reward = 3 steps = 8
17 reward = 2 steps = 9
18 reward = 2 steps = 9
19 reward = 3 steps = 8
20 reward = 3 steps = 8
21 reward = 3 steps = 8
22 reward = 1 steps = 10
23 reward = 3 steps = 8
24 reward = 3 steps = 8
25 reward = 2 steps = 9
26 reward = 3 steps = 8
27 reward = 1 steps = 10
28 reward = 2 steps = 9
29 reward = 3 steps = 8
30 reward = 2 steps = 9
31 reward = 3 steps = 8
32 reward = 2 steps = 9
33 reward = 3 steps = 8
34 reward = 3 steps = 8
35 reward = 3 steps = 8
36 reward = 3 steps = 8
37 reward = 3 steps = 8
38 reward = 3 steps = 8
39 reward = 1 steps = 10
40 reward = 3 steps = 8
41 reward =

Possible actions. Same order shown in Q function and Model

In [7]:
up, down, left, right = possible_actions
print('up',up)
print('down',down)
print('left',left)
print('right',right)

up (-1, 0)
down (1, 0)
left (0, -1)
right (0, 1)


###Learned Q function

Some places may still be 0 because the agent did not explore that part of the state-action space.

In [8]:
# for k,v in Q_table.items():
#     print('state, action:', k, 'reward', v)
# print(len(seen_states))
value_grid = np.zeros((4, *np.array(grid).shape))
for k, v in Q_table.items():
    state, action = k
    reward = v
    value_grid[possible_actions.index(action), state[0], state[1]] = round(reward, 1)

print('value grid per action')
print(value_grid)

max_value_grid = np.max(value_grid, axis=0)

print('\nMax value grid') 
print(max_value_grid)

value grid per action
[[[-1.4 -0.4  0.6  1.8  3.1]
  [-1.4  0.   0.6  0.   3.1]
  [-2.3  0.   1.8  4.6  4.6]
  [-3.   0.   3.1  0.   6.2]
  [-2.3 -0.4  1.8  0.   0. ]]

 [[-2.3 -0.4  1.8  1.8  4.6]
  [-3.   0.   3.1  0.   6.2]
  [-2.3  0.   1.8  4.6  8. ]
  [-1.4  0.   0.6  0.  10. ]
  [-1.4 -0.4  0.6  0.   0. ]]

 [[-1.4 -1.4 -0.4  0.6  1.8]
  [-2.3  0.   1.8  0.   4.6]
  [-3.   0.   3.1  3.1  4.6]
  [-2.3  0.   1.8  0.   8. ]
  [-1.4 -1.4 -0.4  0.   0. ]]

 [[-0.4  0.6  1.8  3.1  3.1]
  [-2.3  0.   1.8  0.   4.6]
  [-3.   0.   4.6  6.2  6.2]
  [-2.3  0.   1.8  0.   8. ]
  [-0.4  0.6  0.6  0.   0. ]]]

Max value grid
[[-0.4  0.6  1.8  3.1  4.6]
 [-1.4  0.   3.1  0.   6.2]
 [-2.3  0.   4.6  6.2  8. ]
 [-1.4  0.   3.1  0.  10. ]
 [-0.4  0.6  1.8  0.   0. ]]


###Learned Model

Some places may still be 0 because the agent did not explore that part of the state-action space.

In [9]:
# for k,v in Model_table.items():
#     print('state, action:', k, 'reward, next_state:', v)

value_grid = np.zeros((4, *np.array(grid).shape))
for k,v in Model_table.items():
    state, action = k
    reward, next_state = v
    value_grid[possible_actions.index(action), state[0], state[1]] = reward

print('value grid per action')
print(value_grid)

value grid per action
[[[-1. -1. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1.  0. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1. -1. -1.  0.  0.]]

 [[-1. -1. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1.  0. -1. -1. -1.]
  [-1.  0. -1.  0. 10.]
  [-1. -1. -1.  0.  0.]]

 [[-1. -1. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1.  0. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1. -1. -1.  0.  0.]]

 [[-1. -1. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1.  0. -1. -1. -1.]
  [-1.  0. -1.  0. -1.]
  [-1. -1. -1.  0.  0.]]]
