## Question 5

### Step B
Step B: The next objective is to scale up the size of filtering problems like the above one that you can solve. For this question, you will use large maps (e.g., 100 by 50), where you randomly assign terrain types to each cell out of the four possible types (50% normal cells, 20% highway cells, 20% hard to traverse and 10% blocked cells).

First, generate “ground truth” data, i.e., generate sequences of actions and sensor readings to test your algorithm. For this purpose, first randomly select a non-blocked cell as the initial location of your agent in the world $x_{0}$. Then, randomly generate a sequence of 100 actions, i.e., randomly select actions from the set α = {Up, Left, Down, Right} and generate a string of length 100. For each action starting from the initial location $x_{0}$ apply:
- the **transition model** (i.e., 90% follow the action - when you collide stay in place, 10% stay in place), in order to get the next ground truth state $x_{i+1}$
- the **observation model** (i.e., 90% the correct terrain type and 5% one of the other two types), in order to get the “ground truth” sensor reading $e_{i+1}$

Once you have generated the 100 actions, the 100 ground truth states and the 100 observations, store them in a file. Generate 10 such files per map and for 10 different maps of the world (i.e., 100 total ground truth files), as different experiments inside the large maps. The format for the file can be as follows:
- $x_{0}y_{0}$: coordinates of initial point
- $x_{i}y_{i}$: 100 coordinates of the consecutive points that the agent actually goes through separated by a new line
- $α_{i}$: 100 characters indicating the type of action executed {U, L, D, R} separated by a new line
- $e_{i}$: 100 characters indicating the sensor reading {N, H, T} collected by the agent as it moves

In your report, provide examples of ground truth paths and the corresponding action and
sensor readings generated.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import math
import random

#### (1) Produce ten 100 x 50 grids

In [2]:
def create_grid(rows, cols):
    grid = [['']*cols for i in range(rows)]
    
    for r in range(rows):
        for c in range(cols):
            
            prob = random.random()
            if (prob <= 0.5):
                grid[r][c] = 'N' # 50% normal
            elif (prob <= 0.7):
                grid[r][c] = 'H' # 20% highway
            elif (prob <= 0.9):
                grid[r][c] = 'T' # 20% hard-to-traverse
            else:
                grid[r][c] = 'B' # 10% blocked
                
    return grid


def save_grid_as_csv(grid, fname):
    np.savetxt(fname, grid, delimiter = ',', fmt='%s')
    

# produce 10 grids
rows = 50
cols = 100
grids = []
for i in range(10):
    grids.append(create_grid(rows, cols))
    save_grid_as_csv(grids[i], f'grid{i+1}.csv')

#### (2) Produce ten 'ground truth' data per grid

In [3]:
def create_ground_truth(grid, fname, n):
    f = open(fname, 'w')
    rows, cols = len(grid), len(grid[0])
    
    # ensure valid, non-blocked starting point
    x = [random.randrange(0, rows), random.randrange(0, cols)]
    while (grid[x[0]][x[1]] == 'B'):
        x = [random.randrange(0, rows), random.randrange(0, cols)]
    
    f.write(f'{x[0]},{x[1]}\n') # write true path
    
    actions, sensor = ['']*n, ['']*n
    for i in range(n):
        actions[i] = random.choice(['U', 'L', 'D', 'R'])
        
        prob = random.random()
        if (prob <= 0.9): # 90% action succeeds, unless colliding into blocked cell or edge
            if (actions[i] == 'U'):
                if (x[0]-1 >= 0 and grid[x[0]-1][x[1]] != 'B'):
                    x[0] -= 1
            
            if (actions[i] == 'L'):
                if (x[1]-1 >= 0 and grid[x[0]][x[1]-1] != 'B'):
                    x[1] -= 1
            
            if (actions[i] == 'D'):
                if (x[0]+1 < rows and grid[x[0]+1][x[1]] != 'B'):
                    x[0] += 1
            
            if (actions[i] == 'R'):
                if (x[1]+1 < cols and grid[x[0]][x[1]+1] != 'B'):
                    x[1] += 1
            
        f.write(f'{x[0]},{x[1]}\n') # or, 10% action fails, stays in place
            
        
        prob = random.random()
        if (prob <= 0.9): # 90% sensor reads terrain correctly
            sensor[i] = grid[x[0]][x[1]]
        
        else: # or, 5% sensor incorrectly reads either of the other two types
            terrains = ['N', 'H', 'T']
            terrains.remove(grid[x[0]][x[1]])
            sensor[i] = random.choice(terrains)
            
    
    # write actions and observations to 'ground truth' data
    for i in range(n):
        f.write(f'{actions[i]}\n')
    for i in range(n):
        f.write(f'{sensor[i]}\n')
        
    f.close
    return


# produce 10 'ground truth' data per grid
n = 100
for i in range(10):
    for j in range(10):
        create_ground_truth(grids[i], f'grid{i+1}_groundtruth{j+1}.txt', n)

### Step C
Step C: Demonstrate the capability of estimating the position of the agent inside the world given only the actions and observations indicated in these files. In particular, your program should be able to load a “ground truth” file and a map. Then, after each action/sensor reading pair, it should be able to compute the probability that the agent is in each cell of the map by applying the filtering algorithm. Visualize the different probabilities on your map (e.g., think of a heatmap) or provide a capability to indicate the probability on any cell of the map. Visualize the ground truth location of the agent inside the world at the corresponding step. Your visualization should be updated with each new action/sensor reading pair until you consume all 100 readings.

Attach example heat maps in your report after 10, 50 and 100 iterations. Indicate the ground truth path up to this point in each case.

For each of the 100 experiments, compute the error (as distance inside the grid world) between the true location of the agent and the maximum likelihood estimation (i.e., the cell with the highest probability according to the filtering algorithm) as the number of readings increases. For the computation of the maximum likelihood estimation, ties can be broken randomly. Generate a plot of the average error over all 100 experiments as the number of readings increases. For this plot, you can ignore the first 5 iterations as many cells will have the same probability in the beginning.

For each of the 100 experiments, keep track of the probability that the cell where the agent is actually located (which changes over time) is assigned by the filtering algorithm. Generate a plot of the average probability of the ground truth cell over 100 experiments as the number of readings increases. Here you can start with the uniform probability assigned to the cell in the beginning of this process.

#### (3) Read information from 'ground truth' files

In [4]:
def read_file(fname, n):
    f = open(fname)
    
    true_path = [] # true path traversed
    actions = []   # attempted actions
    sensor = []    # sensor readings
    
    for i in range(n+1):
        l = f.readline()
        x = l.split(',')
        true_path.append([int(x[0]), int(x[1])])
        
    for i in range(n):
        l = f.readline()
        actions.append(l[0])
        
    for i in range(n):
        l = f.readline()
        sensor.append(l[0])
    
    f.close()
    return [true_path, actions, sensor]

#### (4) Apply filtering

In [25]:
def filtering(grid, itr, prob_matrix, true_path, actions, sensor):
    rows, cols = len(grid), len(grid[0])
    err, true_prob = [0]*itr, [0]*itr

    for i in range(itr):
        alpha, sigma = 1, 0 # for normalization
        max_prob, max_x = 0, [] # for calculating error
        
        prior = np.copy(prob_matrix)
        
        for r in range(rows):
            for c in range(cols):
                
                # transition model
                if (actions[i] == 'U'):
                    if (r == rows-1):
                        sigma = prior[r][c]*0.1 
                    else:
                        sigma = prior[r][c]*0.1 + prior[r+1][c]*0.9                        
                
                elif (actions[i] == 'L'):
                    if (c == cols-1):
                        sigma = prior[r][c]*0.1
                    else:
                        sigma = prior[r][c]*0.1 + prior[r][c+1]*0.9  
                
                elif (actions[i] == 'D'):
                    if (r == 0):
                        sigma = prior[r][c]*0.1
                    else:
                        sigma = prior[r][c]*0.1 + prior[r-1][c]*0.9                      
                
                elif (actions[i] == 'R'):
                    if (c == 0):
                        sigma = prior[r][c]*0.1
                    else:
                        sigma = prior[r][c]*0.1 + prior[r][c-1]*0.9
                
                
                # observational model
                if (sensor[i] == grid[r][c]):
                    prob_matrix[r][c] = sigma * 0.9
                else:
                    prob_matrix[r][c] = sigma * 0.05
                
                if (grid[r][c] == 'B'):
                    prob_matrix[r][c] = 0
                
                
                # for calculating error
                if (max_prob < prob_matrix[r][c]):
                    max_prob = prob_matrix[r][c]
                    max_x = [r,c]
                    
                # for normalization
                sigma += prob_matrix[r][c]
            
        # normalize
        if (sigma != 0):
            alpha = 1/sigma
        prob_matrix = np.multiply(prob_matrix, alpha)
        
        # calculate error
        err[i] = math.sqrt((max_x[0] - true_path[i][0])**2 + (max_x[1] - true_path[i][1])**2)
        true_prob[i] = prob_matrix[true_path[i][0]][true_path[i][1]]
        
    return [prob_matrix, true_prob, err]

#### (5) Produce example heatmaps

In [26]:
import seaborn as sns # for pandas headmaps

In [27]:
%matplotlib tk

def plot_heatmap(grid, fname, itr):
    rows, cols = len(grid), len(grid[0])
    
    info = read_file(fname, 100)
    true_path, actions, sensor = info
    
    # set up inital belief
    prior = (rows*cols)*0.9/(rows*cols)
    prob_matrix = np.array([[prior]*cols]*rows)
    
    for r in range(rows):
        for c in range(cols):
            if (grid[r][c] == 'B'):
                prob_matrix[r][c] = 0
    
    
    # apply filtering
    ans = filtering(grid, itr, prob_matrix, true_path, actions, sensor)
    prob_matrix = np.copy(ans[0])
    
    
    # set up log w/ accurate minimum values for blocked cells
    log = np.copy(prob_matrix)
    log = np.log10(log, where=(log!=0))
    m = np.min(log)
    
    for r in range(rows):
        for c in range(cols):
            if (grid[r][c] == 'B'):
                log[r][c] = m-1
    
    
    # plot heatmap
    plt.title(f'Heatmap for {fname} - {itr} iterations')
    ax = sns.heatmap(log, linewidth = 0.5, cmap = "coolwarm", cbar = False)


    # draw true path of agent
    for i in range(itr-1):
        y = [true_path[i][0], true_path[i+1][0]]
        x = [true_path[i][1], true_path[i+1][1]]
        plt.plot(x, y, color="BLACK")

    plt.show()

In [29]:
plot_heatmap(grids[6], 'grid7_groundtruth1.txt', 100)

#### (6) Calculate average error and probability

In [32]:
itr = 100
avg_error, avg_prob = [0]*itr, [0]*itr


for i in range(10):
    grid = grids[i]
    rows, cols = len(grid), len(grid[0])
    
    for j in range(10):
        info = read_file(f'grid{i+1}_groundtruth{j+1}.txt', 100)
        true_path, actions, sensor = info

        # set up inital belief
        prior = (rows*cols)*0.9/(rows*cols)
        prob_matrix = np.array([[prior]*cols]*rows)
        for r in range(rows):
            for c in range(cols):
                if (grid[r][c] == 'B'):
                    prob_matrix[r][c] = 0
        
        # apply filtering
        ans = filtering(grid, itr, prob_matrix, true_path, actions, sensor)
        avg_error = np.add(avg_error, ans[2])
        avg_prob = np.add(avg_prob, ans[1])
        

# calculate average errors and probabilities
avg_error = np.multiply(avg_error, 1/itr)
avg_prob = np.multiply(avg_prob, 1/itr)

print(avg_error, avg_prob)

[56.4243463  51.3656025  44.35878765 40.00712197 39.03155913 36.6238886
 37.02321025 36.02233186 31.5627002  31.56098397 35.03879409 34.00330228
 33.33011175 35.22933838 32.98786467 33.46383734 31.92305923 31.63751059
 31.74835211 32.66389315 32.52383548 29.90093482 30.62203618 30.57454347
 31.02760567 28.15110391 27.82870627 28.33439654 28.20457006 25.70322218
 25.31309066 27.68485009 29.12145373 26.73902366 26.13613242 27.73211528
 25.74673182 27.68138201 27.69376122 24.77761535 26.19302822 26.58878977
 24.16488895 25.3594705  24.07540251 21.34574717 21.00692528 22.90434381
 23.96650897 23.28406427 24.09386315 22.44918261 22.41706794 22.69844613
 21.75160394 23.39438612 22.12503823 19.28600707 20.33298576 20.52899804
 17.93622501 17.22706531 19.15217902 19.7988612  19.10030117 17.60198757
 18.82386836 18.88608024 18.5471338  18.17909559 18.14887541 17.83392823
 16.66833693 17.35911413 18.37065036 17.21618434 18.12141616 17.90232026
 17.42277315 17.37865283 16.87136366 16.39645005 16.

In [37]:
plt.figure(2)
plt.plot(avg_error[5:])
plt.title("The Average Distance between the True Path and Prediction")
plt.ylabel("Distance")
plt.xlabel("Iterations")
plt.show()
plt.figure(3)
plt.plot(avg_prob)
plt.title("The Average Probability of the True Path")
plt.ylabel("Probability")
plt.xlabel("Iterations")
plt.show()