# Lab 4 - Reinforcement Learning

## Read details of the environment & model from a text file
Read: 

- Width, Height
- Noise: probability of not going in the intended direction (then divided by two for two unintentional neighbor directions)
- Immediate rewards of non-goal states
- Terminal (goal) states: their locations and rewards
- Internal Walls: locations

Create the transition matrix (T) and the Rewards matrix according to above details.

"grid1.txt" corresponds to the example in AIMA and the lecture slides. You can double-check various results there.

Python style comments in grid1.txt are just comments, can be ignored/deleted.

Task1 - We ask you to create two more grids (files) of moderate size and with at least one internal wall. And perform the analysis described below on these three grids.

In [1]:
import numpy as np

filenames = ["grid1.txt", "grid2.txt", "grid3.txt"]
W_all = []
H_all = []
T_all = []
Rewards_all = []
Terminals_all = []
Walls_all = []


for filename in filenames:

    file1 = open(filename, 'r')
    (W, H) = [t(s) for t,s in zip((int,int),file1.readline().split())]
    noise = [t(s) for t,s in zip((float,str),file1.readline().split())][0]
    immediate_rewards = [t(s) for t,s in zip((float,str),file1.readline().split())][0]
    N = W*H #total number of states
    NA = 4 #number of actions
    T = np.zeros((N,N,NA)) #state transition probabilities
    Directions = np.array(['L','R','U','D'])
    Rewards = np.zeros((N,1)) + immediate_rewards #immediate rewards
    Terminals = np.zeros((N,1)) #terminal states
    Walls = np.zeros((N,1)) #(internal) wall states
    prob1 = 1-noise
    prob2 = noise/2
    while True:
        line = file1.readline()
        if not line:
            break
        if line.find('T') != -1:
            (_,x, y, r) = [t(s) for t,s in zip((str,int,int,float),line.split())]
            Terminals[y*W+x] = 1
            Rewards[y*W+x] = r
        if line.find('W') != -1:
            (_,x, y) = [t(s) for t,s in zip((str,int,int),line.split())]
            Walls[y*W+x] = 1
    file1.close()
    for i in range(W):
        for j in range(H):
            if i+1<W and Walls[j*W+i+1]==0: #not wall on the right
                T[j*W+i,j*W+i+1,1] += prob1
                T[j*W+i,j*W+i+1,2] += prob2
                T[j*W+i,j*W+i+1,3] += prob2
            else:
                T[j*W+i,j*W+i,1] += prob1
                T[j*W+i,j*W+i,2] += prob2
                T[j*W+i,j*W+i,3] += prob2
            if i-1>=0 and Walls[j*W+i-1]==0: #not wall on the left
                T[j*W+i,j*W+i-1,0] += prob1
                T[j*W+i,j*W+i-1,2] += prob2
                T[j*W+i,j*W+i-1,3] += prob2
            else:
                T[j*W+i,j*W+i,0] += prob1
                T[j*W+i,j*W+i,2] += prob2
                T[j*W+i,j*W+i,3] += prob2
            if j+1<H and Walls[(j+1)*W+i]==0: #not wall below
                T[j*W+i,(j+1)*W+i,3] += prob1
                T[j*W+i,(j+1)*W+i,0] += prob2
                T[j*W+i,(j+1)*W+i,1] += prob2
            else:
                T[j*W+i,j*W+i,3] += prob1
                T[j*W+i,j*W+i,0] += prob2
                T[j*W+i,j*W+i,1] += prob2
            if j-1>=0 and Walls[(j-1)*W+i]==0: #not wall above
                T[j*W+i,(j-1)*W+i,2] += prob1
                T[j*W+i,(j-1)*W+i,0] += prob2
                T[j*W+i,(j-1)*W+i,1] += prob2
            else:
                T[j*W+i,j*W+i,2] += prob1
                T[j*W+i,j*W+i,0] += prob2
                T[j*W+i,j*W+i,1] += prob2

    W_all.append(W)
    H_all.append(H)
    T_all.append(T)
    Rewards_all.append(Rewards)
    Terminals_all.append(Terminals)
    Walls_all.append(Walls)

print(T.shape)

(360, 360, 4)


## Task 2 - Implement functions

### Value and Policy iteration
Value iteration is provided, implement policy iteration

### TD-Learning and Q-Learning
Q-Learning is implemented, implement TD-Learning

In [36]:
def value_iteration(gamma, T, Rewards, Terminals, Walls):   
    N, NA = T.shape[0], T.shape[2]
    V = np.zeros((N,1))
    Policy = np.zeros((N,1), dtype=int)
    while True:
        V_old = np.copy(V)
        for s in range(N):
            if Walls[s]==1: continue 
            if Terminals[s]==1:
                V[s] = Rewards[s]
                continue
            Q = np.zeros((NA,1))
            # Bellman equation
            for a in range(NA):
                Q[a] = Rewards[s] + gamma*np.dot(T[s,:,a],V)
            V[s] = np.max(Q)
            Policy[s] = np.argmax(Q)
        if np.sum(np.abs(V-V_old))<1e-10:
            break
    return V, Policy

def policy_iteration(gamma, T, Rewards, Terminals, Walls): 
    # Initialization
    N, NA = T.shape[0], T.shape[2]
    V = np.zeros((N,1))
    Policy = np.zeros((N,1), dtype=int)
    policy_stable = False

    while not policy_stable: # policy iteration

        # Policy Evaluation
        while True: # value iteration
            V_old = np.copy(V)
            for s in range(N):
                if Walls[s]==1: continue
                if Terminals[s]==1:
                    V[s] = Rewards[s]
                    continue

                V[s] = Rewards[s] + gamma*np.dot(T[s,:,Policy[s]],V) # Utility approximation

            delta = np.max(np.abs(V-V_old))
            if delta < 0.05: # smaller delta won't happen: wrong calculations?
                break

        # Policy Improvement
        policy_stable = True
        Q = np.zeros((NA,1))
        for s in range(N):
            b = np.copy(Policy[s])
            for a in range(NA):
                Q[a] = Rewards[s] + gamma*np.dot(T[s,:,a],V) #
                # Q[a] = gamma*np.dot(T[s,:,a],V) # expected utility for action a in state s, according to T and V
            Policy[s] = np.argmax(Q)
            if b != Policy[s]:
                policy_stable = False

    return V, Policy


def TD_learning(N_episodes, Policy, alpha, gamma, T, Rewards, Terminals, Walls):
    N, NA = T.shape[0], T.shape[2]
    V = np.zeros((N,1))

    for e in range(N_episodes):
        # sampling process
        s = int(np.floor(np.random.uniform(0,N-1))) # random starting position
        while Terminals[s]==1 or Walls[s]==1:
            s = int(np.floor(np.random.uniform(0,N-1))) # new random starting position
        while Terminals[s]==0:
            a = Policy[s] # choose action according to policy which is evaluated

            # This logic was provided in Q_earning()
            # seems to incorporate the real transition probabilities to get s'
            u = np.random.uniform(0,1)
            s1 = np.argmax(u<np.cumsum(T[s,:,a]))

            V[s] += alpha*(Rewards[s1] + gamma*V[s1] - V[s]) # Policy evaluation (State Utility Estimation)
            s = s1

    V[Terminals==1] = Rewards[Terminals==1]

    return V, Policy

def Q_learning(N_episodes, epsilon, alpha, gamma, T, Rewards, Terminals, Walls): 
    N, NA = T.shape[0], T.shape[2]
    Q = np.zeros((N,NA))
    for e in range(N_episodes):
        s = int(np.floor(np.random.uniform(0,N-1)))
        while Terminals[s]==1 or Walls[s]==1:
            s = int(np.floor(np.random.uniform(0,N-1)))
        while Terminals[s]==0:
            u = np.random.uniform(0,1)
            if u<epsilon:
                a = int(np.floor(np.random.uniform(0,NA)))
            else:
                a = np.argmax(Q[s,:])
            u = np.random.uniform(0,1)
            s1 = np.argmax(u<np.cumsum(T[s,:,a]))
            Q[s,a] += alpha*(Rewards[s1] + gamma*np.max(Q[s1,:]) - Q[s,a])
            s = s1
        epsilon = epsilon*0.9999 # Modify here for other annealing regimes
        alpha = alpha*0.9999 # Modify here for other annealing regimes
    V = np.max(Q,axis=1)
    V = V[:,None]
    V[Terminals==1] = Rewards[Terminals==1] 
    Policy = np.array(np.argmax(Q,axis=1)).reshape((N,1))
    return V, Policy

## Task 3 - Experiments

For the three grids you have, perform the following analysis and include them in your report,.

- Compare value and policy iterations in terms of convergence time. Relate it to computational complexity, current implementation and number of iterations needed.

- How are optimal policies change with immediate reward values? Show some examples (similar to Figure 17.2b in AIMA)

- Compare TD-Learning and Q-Learning results with each other. Also with value and policy iterations. Remember that value and policy iteration solve Markov Decision Processes where we know the model (T and Rewards). TD-Learning and Q-Learning are (passive and active, respectively) Reinforcement Learning methods that don't have the model but use data from simulations. Data are simulated from the model we know, but the model is not used n TD or Q-Learning.

- What are the effects of epsilon and alpha values and how they are modified?

- What is the effect of number of episodes?

In [None]:
import time

gamma = 1
alpha = 0.9
epsilon = 0.5
N_episodes = 20000

V_VI_all, Policy_VI_all, Time_VI_all = [], [], []
V_PI_all, Policy_PI_all, Time_PI_all = [], [], []
V_TD_all, Policy_TD_all, Time_TD_all = [], [], []
V_Q_all, Policy_Q_all, Time_Q_all = [], [], []

for grid in range(len(W_all)):
    # select which grid to use
    W = W_all[grid]
    H = H_all[grid]
    T = T_all[grid]
    Rewards = Rewards_all[grid]
    Terminals = Terminals_all[grid]
    Walls = Walls_all[grid]


    #Value iteration
    st = time.time()
    V_VI, Policy_VI = value_iteration(gamma, T, Rewards, Terminals, Walls)
    et = time.time()
    V_VI_all.append(V_VI)
    Policy_VI_all.append(Policy_VI)
    Time_VI_all.append(et-st)

    #Policy iteration
    st = time.time()
    V_PI, Policy_PI = policy_iteration(gamma, T, Rewards, Terminals, Walls)
    et = time.time()
    V_PI_all.append(V_PI)
    Policy_PI_all.append(Policy_PI)
    Time_PI_all.append(et-st)

    #TD-Learning
    st = time.time()
    V_TD, Policy_TD = TD_learning(N_episodes, Policy_VI, alpha, gamma, T, Rewards, Terminals, Walls)
    et = time.time()
    V_TD_all.append(V_TD)
    Policy_TD_all.append(Policy_TD)
    Time_TD_all.append(et-st)

    #Q-Learning
    st = time.time()
    V_Q, Policy_Q = Q_learning(N_episodes, epsilon, alpha, gamma, T, Rewards, Terminals, Walls)
    et = time.time()
    V_Q_all.append(V_Q)
    Policy_Q_all.append(Policy_Q)
    Time_Q_all.append(et-st)



    print("RESULTS Grid:", grid, " ###################################################################")
    print("RESULTS: VI")
    print(V_VI.reshape((H,W)))
    maze = Directions[Policy_VI.astype(int)]
    maze[Walls==1] = 'W'
    maze[Terminals==1] = 'G'
    print(maze.reshape((H,W)))
    #
    print("RESULTS: PI")
    print(V_PI.reshape((H,W)))
    maze = Directions[Policy_PI.astype(int)]
    maze[Walls==1] = 'W'
    maze[Terminals==1] = 'G'
    print(maze.reshape((H,W)))
    #
    print("RESULTS: Q-L")
    print(V_Q.reshape((H,W)))
    maze_Q = Directions[Policy_Q.astype(int)]
    maze_Q[Walls==1] = 'W'
    maze_Q[Terminals==1] = 'G'
    print(maze_Q.reshape((H,W)))
    #
    print("RESULTS: TD-L")
    print(V_TD.reshape((H,W)))
    maze = Directions[Policy_TD.astype(int)]
    maze[Walls==1] = 'W'
    maze[Terminals==1] = 'G'
    print(maze.reshape((H,W)))

In [16]:
# Illustrate grids without policy

for grid in range(len(W_all)):
    # select which grid to use
    W = W_all[grid]
    H = H_all[grid]
    T = T_all[grid]
    Rewards = Rewards_all[grid]
    Terminals = Terminals_all[grid]
    Walls = Walls_all[grid]


    print("Grid:", grid, " ###################################################################")
    print("")
    maze = np.full_like(Walls, fill_value=' ', dtype=str)
    maze[Walls==1] = 'W'
    maze[Terminals==1] = 'G'
    print(maze.reshape((H,W)))


Grid: 0  ###################################################################

[[' ' ' ' ' ' 'G']
 [' ' 'W' ' ' 'G']
 [' ' ' ' ' ' ' ']]
Grid: 1  ###################################################################

[[' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' 'W' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' 'W' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' 'W' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' 'W' ' ' ' ' ' ' 'G' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' 'G' ' ']]
Grid: 2  ###################################################################

[[' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' '

## Task 4 (extra credit) - What if diagonal moves were possible?

Repeat the experiments in one of these settings. You don't have to stick to the code provided here. But, you can only use Numpy (and matplotlib).

You should decide on how to distribute the noise associated with actions across (next) states. For example, in the scenario above we have probability of going Up with the Up action is 1-noise. The probability of going Left with the Up action is noise/2. The same for going Right.