# TX00DQ05-3001 Exercises 4

In [3]:
import numpy as np
import numpy.linalg as LA
from numpy import random

## Functions and Common Variables

In [4]:
# YOUR CODE
ROWS_COUNT = 4
COLUMNS_COUNT = 4
TERMINATING = [(0,0), (ROWS_COUNT-1, COLUMNS_COUNT-1)]
ACTIONS = ['←','↑','↓','→']

alpha = 0.5
lamb = 0.5
gamma = 1
rewards = [-1, -1, -1, -1]

def random_state():
    row = 0
    column = 0
    
    while (row, column) in TERMINATING:
        row = random.randint(0,ROWS_COUNT)
        column = random.randint(0,COLUMNS_COUNT)

    return (row, column)

def random_action(action_prob):
    if type(action_prob) is dict:
        prob = []
        for a in ACTIONS:
            prob.append(action_prob[a])
    else:
        prob = action_prob
    selected_action = random.choice(ACTIONS, p=prob)
    return selected_action

def find_action_prob(Q_state, epsilon):
#   Find prob. of actions from Q for randoming action
    val_list = []
    action_count = len(ACTIONS)
    prob = [0,0,0,0]

    for a in ACTIONS:
        val_list.append(Q_state[a])

    best_action_index = val_list.index(max(val_list))
    for a in ACTIONS:
        i = ACTIONS.index(a)
        if i == best_action_index:
            prob[i] = 1 - epsilon + (epsilon/action_count)
        else:
            prob[i] = epsilon/action_count
    return prob

def init_state_value():
    return np.zeros((ROWS_COUNT, COLUMNS_COUNT))

def init_action_value(rows, columns):
    q = []
    for r in range(rows):
        q.append([])
        for c in range(columns):
            temp_dict = {}
            for a in ACTIONS:
                temp_dict.update({a : 0})
            q[r].append(temp_dict)
    return q

def init_policy():
#   Initial policy with all actions with 0.25 prob for all states
    policy = []
    prob = [0.25, 0.25, 0.25, 0.25]
    for r in range(ROWS_COUNT):
        policy.append([])
        for c in range(COLUMNS_COUNT):
            temp_dict = {}
            for a in range(len(ACTIONS)):
                if (r,c) in TERMINATING:
                    p = 0
                else:
                    p = prob[a]
                temp_dict.update({ACTIONS[a] : p})
            policy[r].append(temp_dict)
    return policy

def next_state(action, state):
#   Find the next state
    row = state[0]
    column = state[1]
    
    if action == ACTIONS[0]:
        column -= 1
    elif action == ACTIONS[1]:
        row -= 1
    elif action == ACTIONS[2]:
        row += 1
    elif action == ACTIONS[3]:
        column += 1
        
    if row >= ROWS_COUNT:
        row = ROWS_COUNT - 1
    elif row <= 0:
        row = 0
    if column >= COLUMNS_COUNT:
        column = COLUMNS_COUNT - 1
    elif column <= 0:
        column = 0
    
    return (row, column)

def extract_policy(Q):
    policy = []
    for r in range(len(Q)):
        policy.append([])
        for c in range(len(Q[r])):
            if (r,c) in TERMINATING:
                policy[r].append('-')
            else:
                val_list = []
                for a in ACTIONS:
                    val_list.append(Q[r][c][a])
                best_action_index = val_list.index(max(val_list))
                policy[r].append(ACTIONS[best_action_index])
    return policy

def print_Q(value_function):
#   For better output format
    for a in ACTIONS:
        print(a)
        for r in range(ROWS_COUNT):
            print('[',end='')
            for c in range(COLUMNS_COUNT):
                print("{0:.2f}".format(round(value_function[r][c][a],2)),end=' ')
#                 print(value_function[r][c][a],end=' ')
            print(']')
    return

## Exercise 1: TD(0) value function estimation

Implement value function estimation for Sutton & Barto example 4.1 with TD(0) algorithm.

In [5]:
def value_function_estimation_TD0(policy):
    global ROWS_COUNT, COLUMNS_COUNT, TERMINATING
    ROWS_COUNT = 4
    COLUMNS_COUNT = 4
    TERMINATING = [(0,0), (ROWS_COUNT-1, COLUMNS_COUNT-1)]
    
    V = init_state_value()
    
    for i in range(100):
        S = random_state()    
        while not S in TERMINATING:
            (row, column) = S
            
#           Choose A from given policy
            A = random_action(policy[row][column])
            
#           Observe R and S'
            R = rewards[ACTIONS.index(A)]
            S_next = next_state(A, S)
            (row_next, column_next) = S_next
            
            V[row][column] += alpha * (R + gamma*V[row_next][column_next] - V[row][column])
            S = S_next
    return V

In [6]:
policy = init_policy()

V = value_function_estimation_TD0(policy)
V

array([[  0.        , -20.53439918, -23.52482805, -23.19499997],
       [-17.82451184, -22.79161997, -22.30013459, -16.1889571 ],
       [-24.86362554, -22.74832521, -16.94256773,  -7.65735609],
       [-15.38941534,  -6.18471302,  -1.50513969,   0.        ]])

## Exercise 2: Implement TD(0) control 

Solve Sutton & Barto example 4.1 with TD(0) control (Sarsa) algorithm. Apply the algorithm to the windy world of Sutton & Barto example 6.5. 

### Grid World

In [7]:
def sarsa():
    global ROWS_COUNT, COLUMNS_COUNT, TERMINATING
    ROWS_COUNT = 4
    COLUMNS_COUNT = 4
    TERMINATING = [(0,0), (ROWS_COUNT-1, COLUMNS_COUNT-1)]
    
    Q = init_action_value(ROWS_COUNT, COLUMNS_COUNT)
    
#   Episodes loop
    for i in range(100):
        epsilon = 1/(i+1)
        S = random_state()
        (row, column) = S
        
#       Choose A from S by epsilon-greedy
        action_prob = find_action_prob(Q[row][column], epsilon)
        A = random_action(action_prob)

#       Steps loop
        while not S in TERMINATING:
            (row, column) = S
            
#           Observe R and S'
            R = rewards[ACTIONS.index(A)]
            S_next = next_state(A, S)
            (row_next, column_next) = S_next
            
#           Choose A' from S'
            action_prob = find_action_prob(Q[row_next][column_next], epsilon)
            A_next = random_action(action_prob)
            
            Q[row][column][A] += alpha * (R + gamma*Q[row_next][column_next][A] - Q[row][column][A])
            S = S_next
            A = A_next
            
    return Q

In [8]:
Q = sarsa()
print_Q(Q)

←
[0.00 -1.00 -1.99 -2.63 ]
[-1.00 -1.99 -2.76 -2.16 ]
[-2.50 -2.78 -2.77 -1.84 ]
[-3.50 -2.00 -1.62 0.00 ]
↑
[0.00 -1.50 -2.50 -3.00 ]
[-1.00 -2.12 -2.75 -2.25 ]
[-2.00 -2.78 -2.25 -1.00 ]
[-2.91 -2.44 -1.50 0.00 ]
↓
[0.00 -1.00 -2.68 -2.53 ]
[-2.25 -2.59 -2.71 -2.00 ]
[-2.25 -3.03 -2.00 -1.00 ]
[-3.00 -2.50 -1.00 0.00 ]
→
[0.00 -1.25 -2.25 -3.00 ]
[-1.56 -2.12 -3.09 -2.50 ]
[-2.23 -2.87 -1.99 -1.00 ]
[-2.87 -2.00 -1.00 0.00 ]


### Windy World

In [9]:
def windy_world():
    global ROWS_COUNT, COLUMNS_COUNT, TERMINATING
    ROWS_COUNT = 7
    COLUMNS_COUNT = 10
    TERMINATING = [(3,7)]
    
    Q = init_action_value(7,10)
    wind = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0]
    
#   Episodes loop
    for i in range(10000):
        epsilon = 1/(i+1)
        S = (3,0)
#         S = random_state()
        (row, column) = S
        
#       Choose A from S by epsilon-greedy
        action_prob = find_action_prob(Q[row][column], epsilon)
        A = random_action(action_prob)

#       Steps loop
        while not S in TERMINATING:
            (row, column) = S
            
#           Observe R and S'
            R = rewards[ACTIONS.index(A)]
            S_next = next_state(A, S)
            (row_next, column_next) = S_next
            row_next -= wind[column]
            if row_next < 0:
                row_next = 0
            S_next = (row_next, column_next)
            
#           Choose A' from S'
            action_prob = find_action_prob(Q[row_next][column_next], epsilon)
            A_next = random_action(action_prob)
            
            Q[row][column][A] += alpha * (R + gamma*Q[row_next][column_next][A] - Q[row][column][A])
            S = S_next
            A = A_next
            
    return Q

In [10]:
Q = windy_world()
print_Q(Q)
# print(ACTIONS[3])
# Q

←
[-25.00 -25.07 -24.07 -23.25 -23.18 -22.09 -22.60 -21.16 -19.88 -18.80 ]
[-26.00 -25.14 -24.19 -24.24 -23.16 -22.93 -20.61 -19.67 -13.37 -10.84 ]
[-26.50 -25.54 -24.67 -23.25 -24.29 -21.80 -20.43 -8.45 -12.00 -12.40 ]
[-28.00 -27.26 -24.99 -23.85 -23.04 -22.02 -20.31 0.00 -7.31 -8.04 ]
[-26.00 -25.00 -24.02 -23.83 -23.30 -21.00 0.00 -5.92 -1.00 -2.00 ]
[-25.50 -24.53 -24.42 -23.03 -22.21 0.00 0.00 -5.85 -3.71 -2.85 ]
[-24.00 -23.94 -23.85 -22.63 0.00 0.00 0.00 0.00 -3.68 -2.96 ]
↑
[-25.50 -25.00 -25.00 -32.00 -34.00 -37.50 -31.00 -22.00 -22.00 -11.00 ]
[-25.21 -24.64 -24.97 -25.95 -32.97 -37.77 -25.56 -21.55 -17.10 -10.95 ]
[-25.96 -25.03 -24.26 -23.82 -34.26 -36.03 -21.00 -13.62 -19.55 -11.16 ]
[-26.64 -25.73 -24.76 -25.97 -31.48 -36.56 -30.50 0.00 -6.85 -10.27 ]
[-26.37 -25.02 -24.42 -24.39 -23.05 -32.86 0.00 -9.69 -19.36 -10.33 ]
[-24.97 -24.27 -23.82 -22.78 -24.68 0.00 0.00 -7.31 -3.93 -4.85 ]
[-24.78 -24.21 -23.85 -23.70 0.00 0.00 0.00 0.00 -1.19 -1.63 ]
↓
[-25.49 -24.48 -23.79 

In [11]:
extract_policy(Q)

[['→', '→', '↓', '←', '→', '→', '→', '→', '→', '↓'],
 ['↑', '↑', '→', '→', '→', '→', '→', '→', '→', '↓'],
 ['→', '↓', '→', '→', '→', '→', '→', '←', '→', '↓'],
 ['→', '→', '→', '→', '→', '→', '→', '-', '→', '↓'],
 ['↓', '→', '←', '→', '→', '→', '←', '↓', '←', '←'],
 ['↓', '↑', '→', '↑', '↓', '←', '←', '↓', '↓', '→'],
 ['←', '→', '→', '→', '←', '←', '←', '←', '→', '↓']]

## Exercise 3*: Eligibility trace

Do a random walk in example 4.1 gridworld, and create an eligibility trace from the walk.

*) - not mandatory

In [12]:
# YOUR CODE
def eligibility_trace():
    global ROWS_COUNT, COLUMNS_COUNT, TERMINATING
    ROWS_COUNT = 4
    COLUMNS_COUNT = 4
    TERMINATING = [(0,0), (ROWS_COUNT-1, COLUMNS_COUNT-1)]
    
    E = init_state_value()
    prob = [0.25, 0.25, 0.25, 0.25]
    
#   Loop for each episode
    for i in range(100):
        S = random_state()
        
#       Loop for each step
        while not S in TERMINATING:
            (row, column) = S
            
#           Update frequency
            E[row][column] += 1
    
            A = random_action(prob)
            S_next = next_state(A, S)
            
#           Decay for recency
            E *= gamma * lamb
            S = S_next
    return E

In [13]:
lamb = 0.5
E = eligibility_trace()
E

array([[0.00000000e+00, 3.56083790e-66, 2.35098870e-38, 1.41059322e-37],
       [4.34243352e-19, 7.81250000e-02, 1.32812500e-01, 1.88170932e-37],
       [4.06801701e-06, 3.32107563e-02, 2.54959110e-01, 5.00000000e-01],
       [2.59361710e-06, 2.45095334e-04, 6.40876600e-04, 0.00000000e+00]])

## Exercise 4**: TD(lambda)

Implement TD(lambda) algorithm and use it for solving example 6.5. Create a table/plot on the effect of lambda in the performance of the algorithm.

*) - not mandatory

In [14]:
# YOUR CODE
def TD_lambda():
    global ROWS_COUNT, COLUMNS_COUNT, TERMINATING
    ROWS_COUNT = 4
    COLUMNS_COUNT = 4
    TERMINATING = [(0,0), (ROWS_COUNT-1, COLUMNS_COUNT-1)]
    
    Q = init_action_value(ROWS_COUNT, COLUMNS_COUNT)

#   Loop for each episode
    for i in range(100):
        epsilon = 1/(i+1)
        E = init_action_value(ROWS_COUNT, COLUMNS_COUNT)
        S = random_state()
        action_prob = [0.25, 0.25, 0.25, 0.25]
        A = random_action(action_prob)
        
#       Loop for each step
        while not S in TERMINATING:
            (row, column) = S
            
#           Observe R and S'
            R = rewards[ACTIONS.index(A)]
            S_next = next_state(A, S)
            (row_next, column_next) = S_next
            
#           Choose A' from S'
            action_prob = find_action_prob(Q[row_next][column_next], epsilon)
            A_next = random_action(action_prob)
            
            delta = R + gamma*Q[row_next][column_next][A_next] - Q[row][column][A]
            E[row][column][A] += 1
            
#           For all s,a in all_states,all_actions
            for r in range(ROWS_COUNT):
                for c in range(COLUMNS_COUNT):
                    if not (r,c) in TERMINATING:
                        for a in ACTIONS:
                            Q[r][c][a] += alpha*delta*E[r][c][a]
                            E[r][c][a] *= gamma*lamb
            
            S = S_next
            A = A_next
        
    return Q        

In [15]:
Q = TD_lambda()
print_Q(Q)

←
[0.00 -1.00 -2.00 -2.98 ]
[-1.90 -2.19 -2.96 -2.83 ]
[-3.08 -2.99 -2.87 -2.74 ]
[-3.73 -2.71 -2.87 0.00 ]
↑
[0.00 -1.80 -3.10 -3.77 ]
[-1.00 -2.00 -2.95 -3.59 ]
[-2.00 -2.88 -2.97 -2.87 ]
[-2.98 -3.55 -2.49 0.00 ]
↓
[0.00 -1.99 -3.77 -2.99 ]
[-2.41 -3.02 -2.91 -2.00 ]
[-2.76 -2.86 -2.00 -1.00 ]
[-3.82 -2.56 -1.92 0.00 ]
→
[0.00 -2.72 -2.96 -3.57 ]
[-2.36 -2.74 -3.00 -2.76 ]
[-3.48 -2.91 -2.00 -1.83 ]
[-3.03 -2.00 -1.00 0.00 ]


In [16]:
extract_policy(Q)

[['-', '←', '←', '←'],
 ['↑', '↑', '↓', '↓'],
 ['↑', '↓', '→', '↓'],
 ['↑', '→', '→', '-']]

In [17]:
Q

[[{'←': 0, '↑': 0, '↓': 0, '→': 0},
  {'←': -0.9999999850988388,
   '↑': -1.8047136068344116,
   '↓': -1.9895126972904507,
   '→': -2.720859457052739},
  {'←': -2.0003060957845293,
   '↑': -3.096608159116225,
   '↓': -3.772574842654091,
   '→': -2.964210197857028},
  {'←': -2.984789169489391,
   '↑': -3.774473531821924,
   '↓': -2.9902209897325065,
   '→': -3.5689397230885334}],
 [{'←': -1.8972083737404368,
   '↑': -0.9999961853027344,
   '↓': -2.411229545156739,
   '→': -2.364628859926597},
  {'←': -2.1946728399816493,
   '↑': -1.998682456144575,
   '↓': -3.0180280804634094,
   '→': -2.7448827198531944},
  {'←': -2.9632507238058867,
   '↑': -2.9505984027989314,
   '↓': -2.9145498169236816,
   '→': -3.0038076817436377},
  {'←': -2.827548989211209,
   '↑': -3.589256725491921,
   '↓': -1.999977383762598,
   '→': -2.755203685577726}],
 [{'←': -3.083884943274159,
   '↑': -2.0001823487703563,
   '↓': -2.763261599057305,
   '→': -3.481098135250543},
  {'←': -2.9910327894779387,
   '↑': -2.88