In [58]:
import random
import numpy as np
from enum import Enum 
import copy
from tqdm import tqdm_notebook, tqdm

In [59]:
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
from matplotlib import colors

In [60]:
GRID_SIZE=25
MAX_TIME = 10
random.seed(28)

## State

In [61]:
class State(object):
    def __init__(self, row, col, grid_size=GRID_SIZE):
        self.grid_size=grid_size
        self.row=row
        self.col=col
        self.pos = (row,col)
    
    def is_valid(self):
        if 0<=self.row<self.grid_size and 0<=self.col<self.grid_size:
            return True
        else:
            return False
    
    def is_corner(self):
        if self.row==0:
            if self.col==0 or self.col==self.grid_size-1:
                return True
        elif self.row==self.grid_size-1:
            if self.col==0 or self.col==self.grid_size-1:
                return True
        else:
            return False
    
    def is_edge(self):
        if self.row==0:
            if 0<self.col<self.grid_size-1:
                return True
        elif self.col==0:
            if 0<self.row<self.grid_size-1:
                return True
        elif self.row==self.grid_size-1:
            if 0<self.col<self.grid_size-1:
                return True
        elif self.col==self.grid_size-1:
            if 0<self.row<self.grid_size-1:
                return True
        else:
            return False
        
    def print(self):
        return("("+str(self.row)+" , "+str(self.col)+")")

## Action, Observation(Sound)

In [62]:
class Action(Enum):
    UP = 0
    DOWN = 1
    LEFT = 2
    RIGHT = 3
    
class Sound(Enum):
    ROTOR = 0
    BUMP = 1
    
class Color(Enum):
    DARK = 0.1
    LIGHT = 0.9

## Observation Model

In [63]:
class Observation():
    def __init__(self):
        if GRID_SIZE==5:
            self.rotor_prob = [[Color.DARK, Color.DARK, Color.DARK, Color.DARK, Color.DARK],
                        [Color.LIGHT, Color.DARK, Color.LIGHT, Color.LIGHT, Color.DARK],
                        [Color.LIGHT, Color.DARK, Color.LIGHT, Color.LIGHT, Color.LIGHT],
                        [Color.DARK, Color.LIGHT, Color.LIGHT, Color.DARK, Color.DARK],
                        [Color.DARK, Color.DARK, Color.DARK, Color.LIGHT, Color.DARK]]
            self.bump_prob = np.array([[Color.LIGHT, Color.LIGHT, Color.LIGHT, Color.DARK, Color.DARK],
                        [Color.LIGHT, Color.LIGHT, Color.DARK, Color.DARK, Color.DARK],
                        [Color.DARK, Color.DARK, Color.LIGHT, Color.LIGHT, Color.DARK],
                        [Color.DARK, Color.DARK, Color.DARK, Color.DARK, Color.DARK],
                        [Color.DARK, Color.LIGHT, Color.LIGHT, Color.LIGHT, Color.DARK]])
            self.rotor_prob = np.array([[color.value for color in dist] for dist in self.rotor_prob])
            self.bump_prob = np.array([[color.value for color in dist] for dist in self.bump_prob])
        else:
            self.rotor_prob = np.random.random((GRID_SIZE, GRID_SIZE))
            self.bump_prob = np.random.random((GRID_SIZE, GRID_SIZE))
                
    
    def sample(self, state, obs):
#         state.print()
        if obs==Sound.ROTOR:
            sound_prob = self.rotor_prob[state.row][state.col]
        else:
            sound_prob = self.bump_prob[state.row][state.col]
        sound = np.random.choice([True,False], 1, p = [sound_prob, 1.0-sound_prob])[0]
        return sound   
    
    def get_rotor_prob(self, state):
        sound_prob = self.rotor_prob[state.row][state.col]
        return sound_prob
    
    def get_bump_prob(self, state):
        sound_prob = self.rotor_prob[state.row][state.col]
        return sound_prob    
    
    def get_rotor_emission_matrix(self):
        return self.rotor_prob.flatten()
    
    def get_bump_emission_matrix(self):
        return self.bump_prob.flatten()
    
    def get_emission_matrix(self):
        rotor_emission = self.get_rotor_emission_matrix()
        bump_emission = self.get_bump_emission_matrix()
        obs_zero = np.multiply(rotor_emission, bump_emission)
        obs_one = np.multiply(rotor_emission, 1 - bump_emission)
        obs_two = np.multiply(1-rotor_emission, bump_emission)
        obs_three = np.multiply(1-rotor_emission, 1-bump_emission)
        return np.vstack((obs_zero, obs_one, obs_two, obs_three)).T

## Transition Model

In [64]:
def get_index(row, col, grid_size=GRID_SIZE):
    return ((grid_size*row) + col)

def get_state(index, grid_size=GRID_SIZE):
    return State(index//grid_size, index%grid_size)

## transition_matrix[A][B] --> from state A to state B
transition_matrix = np.zeros((GRID_SIZE**2, GRID_SIZE**2))

for i in range(GRID_SIZE):
    for j in range(GRID_SIZE):
        index = get_index(i, j, GRID_SIZE)
        state = State(i,j,GRID_SIZE)
        if state.is_corner():
            transition_matrix[index][index] = 0.5
            if i==0 and j==0:
                transition_matrix[index][get_index(i,j+1)] = 0.25
                transition_matrix[index][get_index(i+1,j)] = 0.25
            elif i==0 and j==GRID_SIZE-1:
                transition_matrix[index][get_index(i,j-1)] = 0.25
                transition_matrix[index][get_index(i+1,j)] = 0.25
            elif i==GRID_SIZE-1 and j==0:
                transition_matrix[index][get_index(i,j+1)] = 0.25
                transition_matrix[index][get_index(i-1,j)] = 0.25
            else:
                transition_matrix[index][get_index(i,j-1)] = 0.25
                transition_matrix[index][get_index(i-1,j)] = 0.25
        elif state.is_edge():
            transition_matrix[index][index] = 0.25
            if i==0:
                transition_matrix[index][get_index(i,j-1)] = 0.25
                transition_matrix[index][get_index(i,j+1)] = 0.25
                transition_matrix[index][get_index(i+1,j)] = 0.25
            elif i==GRID_SIZE-1:
                transition_matrix[index][get_index(i,j-1)] = 0.25
                transition_matrix[index][get_index(i,j+1)] = 0.25
                transition_matrix[index][get_index(i-1,j)] = 0.25
            elif j==0:
                transition_matrix[index][get_index(i-1,j)] = 0.25
                transition_matrix[index][get_index(i+1,j)] = 0.25
                transition_matrix[index][get_index(i,j+1)] = 0.25
            else:
                transition_matrix[index][get_index(i-1,j)] = 0.25
                transition_matrix[index][get_index(i+1,j)] = 0.25
                transition_matrix[index][get_index(i,j-1)] = 0.25
        else:
            transition_matrix[index][get_index(i+1,j)] = 0.25
            transition_matrix[index][get_index(i-1,j)] = 0.25
            transition_matrix[index][get_index(i,j+1)] = 0.25
            transition_matrix[index][get_index(i,j-1)] = 0.25

## Initial Probability Distribution

In [65]:
init_dist_matrix = float(1/(GRID_SIZE*GRID_SIZE))*np.ones((GRID_SIZE, GRID_SIZE))
init_dist_matrix = init_dist_matrix.flatten()

## Environment

In [66]:
class Environment():
    def __init__(self, grid_size=GRID_SIZE):
        self.grid_size = grid_size
        self.obs = Observation()
        self.init_state = self.reset()
        
    def reset(self):
        return State(random.randrange(self.grid_size-1), random.randrange(self.grid_size-1))

    
    def get_next_state(self, state, action):
        next_state_dict = {Action.UP: State(state.row-1, state.col),
                           Action.DOWN: State(state.row+1, state.col),
                           Action.LEFT: State(state.row, state.col-1),
                           Action.RIGHT: State(state.row, state.col+1)}
        next_state = next_state_dict[action]
        if next_state.is_valid():
            return next_state
        else:
            return state
        
    def get_obs(self, state):
        rotor = self.obs.sample(state, Sound.ROTOR)
        bump = self.obs.sample(state, Sound.BUMP)
        if rotor and bump:
            return 0
        elif rotor and (not bump):
            return 1
        elif (not rotor) and bump:
            return 2
        else:
            return 3
    
    def step(self, state):
        action = Action(random.randrange(4))
        next_state = self.get_next_state(state, action)
        observation = self.get_obs(next_state)
        return next_state, action, observation

In [67]:
def simulate(env, time_steps):
    states = []
    actions = []
    observations = []
    state = env.init_state
    states.append(state)
    observations.append(env.get_obs(state))
    
    for time in range(time_steps-1):
        state, action, obs = env.step(state)
        actions.append(action)
        states.append(state)
        observations.append(obs)
    return states, actions, observations
#     return states, observations

In [68]:
env = Environment()
states, actions, observations = simulate(env,MAX_TIME)

In [69]:
def print_obs(num):
    if num==0:
        return "R,B"
    elif num==1:
        return "R,~B"
    elif num==2:
        return "~R,B"
    else:
        return "~R,~B"

In [70]:
for i in range(len(observations)-1):
    print("[[ "+str(i)+" ]]   : ",states[i].print(), "   ", actions[i], "    ", print_obs(observations[i]))

[[ 0 ]]   :  (3 , 23)     Action.DOWN      R,B
[[ 1 ]]   :  (4 , 23)     Action.DOWN      R,~B
[[ 2 ]]   :  (5 , 23)     Action.DOWN      ~R,~B
[[ 3 ]]   :  (6 , 23)     Action.DOWN      R,B
[[ 4 ]]   :  (7 , 23)     Action.RIGHT      R,B
[[ 5 ]]   :  (7 , 24)     Action.RIGHT      ~R,B
[[ 6 ]]   :  (7 , 24)     Action.DOWN      R,~B
[[ 7 ]]   :  (8 , 24)     Action.DOWN      R,B
[[ 8 ]]   :  (9 , 24)     Action.DOWN      R,B


## Filtering

In [71]:
def normalise(vector):
    norm_factor = vector.sum(axis=1)
#     print(norm_factor)
    vector /= norm_factor[:, np.newaxis] 
    return vector

In [72]:
Pi = init_dist_matrix
E = env.obs.get_emission_matrix() # emission matrix
T = transition_matrix # transition matrix
Pi.shape, E.shape, T.shape, len(observations)

((625,), (625, 4), (625, 625), 10)

### Forward Algorithm

In [73]:
def forward(observations, Pi, E, T):
    alpha = np.zeros((len(observations), Pi.shape[0]))
    alpha[0,:] = Pi * E[:, observations[0]] 
    for t in range(1,len(observations)):
        for j in range(Pi.shape[0]):
            alpha[t, j] = np.dot(T[:,j], alpha[t-1, :]) * E[j, observations[t]]
    alpha = normalise(alpha)
    return alpha

In [74]:
alpha = forward(observations, Pi, E, T)

In [75]:
# for t in range(MAX_TIME):
#     est_state = get_state(np.argmax(alpha[t]))
#     est_state.print()
#     states[t].print()
#     print()

## Plots

### Colormap

In [76]:
def plot(data, title):
    plt.imshow(data, cmap='bone', origin='upper')
    plt.title(title)
    plt.show()

### Log-likelihood over states

In [77]:
# def plot_likelihood(alpha):
#     %matplotlib
#     im = None
#     for t in range(MAX_TIME):
#         data = np.log(alpha[t].reshape(GRID_SIZE,GRID_SIZE))
#         if not im:
#             im = plt.imshow(data, cmap='bone', origin='upper')
#             plt.colorbar(im)
#         else:
#             im.set_data(data)
#         plt.title("Log-likelood at time "+ str(t))
#         plt.draw()
#         plt.pause(0.2)
#     plt.close()

In [78]:
%matplotlib

Using matplotlib backend: MacOSX


In [79]:
def plot_pred_ground(alpha, title):
    fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(8,8))
    fig.suptitle(title)
    t = 0
    for ax in axes.flat:
        ax.set_title('t='+str(t))
        im = ax.imshow(np.log(alpha[t].reshape(GRID_SIZE,GRID_SIZE)), cmap='bone', origin='upper')
        t += 1
    fig.colorbar(im, ax=axes.ravel().tolist())
    plt.show()

In [80]:
plot_pred_ground(alpha, "Log Likelihood - Filtering")

In [81]:
for t in range(MAX_TIME):
    data = np.log(alpha[t].reshape(GRID_SIZE,GRID_SIZE))
    plot(data,"Log-likelood at time "+ str(t))

In [82]:
# plot_likelihood(alpha)

### Most probable state

In [83]:
def plot_ground_pred(data, title):
    fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(8,8))
    fig.suptitle(title)
    t = 0
    for ax in axes.flat:
        ax.set_title('t='+str(t))
        index = np.argmax(alpha[t])
        pred_state = get_state(index)
        true_state = states[t]
        data = np.zeros((GRID_SIZE, GRID_SIZE))
        if pred_state.row==true_state.row and pred_state.col==true_state.col:
            data[true_state.row, true_state.col] = -0.25
        else:
            data[pred_state.row, pred_state.col] = -0.75
            data[true_state.row, true_state.col] = -1.0
        im = ax.imshow(data, cmap='pink', origin='upper')
        t+=1
    fig.colorbar(im, ax=axes.ravel().tolist())
    plt.show()

In [84]:
# def plot_ground_pred(data):
#     %matplotlib inline
#     for t in range(MAX_TIME):
#         index = np.argmax(alpha[t])
#         pred_state = get_state(index)
#         true_state = states[t]
#         data = np.zeros((GRID_SIZE, GRID_SIZE))
#         if pred_state.row==true_state.row and pred_state.col==true_state.col:
#             data[true_state.row, true_state.col] = -0.25
#         else:
#             data[pred_state.row, pred_state.col] = -0.75
#             data[true_state.row, true_state.col] = -1.0
#         plt.imshow(data, cmap='pink', origin='upper')
#         plt.title("Predicted v/s Ground Truth at time "+str(t))
#         plt.show()

In [85]:
plot_ground_pred(alpha, "Predicted v/s Ground Truth")

## Smoothing

### Backward Algorithm

In [86]:
def backward(observations, Pi, E, T):
    beta = np.zeros((len(observations), Pi.shape[0]))
    beta[len(observations)-1] = np.ones((Pi.shape[0]))
    for t in range(len(observations)-2,-1,-1):
        for j in range(Pi.shape[0]):
            beta[t, j] = (beta[t + 1, :] * E[:, observations[t + 1]]).dot(T[j, :])
    beta = normalise(beta)
    return beta

In [87]:
def forward_backward(observations, Pi, E, T):
    alpha = forward(observations, Pi, E, T)
    beta = backward(observations, Pi, E, T)
    prob_dist = np.multiply(alpha, beta)
    prob_dist = normalise(prob_dist)
    return prob_dist

In [88]:
prob_dist = forward_backward(observations, Pi, E, T)

In [89]:
plot_ground_pred(prob_dist, "Predicted v/s Ground Truth - Smoothing")

## Decoding - Viterbi

In [90]:
def viterbi(observations, Pi, E, T):
    omega = np.zeros((len(observations), Pi.shape[0]))
    omega[0, :] = np.log(Pi * E[:, observations[0]])
    previous = np.zeros((len(observations)-1, Pi.shape[0]))
    for t in range(1,len(observations)):
        for j in range(Pi.shape[0]):
            prob = omega[t-1] + np.log(T[:, j]) + np.log(E[j, observations[t]])
            previous[t-1, j] = np.argmax(prob)
            omega[t, j] = np.max(prob)
    path = []
    last_state = np.argmax(omega[len(observations)-1, :])
    path.append(last_state)
    for i in range(len(observations)-2,-1,-1):
        path.append(previous[i,int(last_state)])
        last_state = previous[i,int(last_state)]
    path = list(map(int, list(path)[::-1]))
    print(path)
    return path

In [91]:
path = viterbi(observations, Pi, E, T)

[489, 514, 539, 514, 489, 490, 465, 464, 489, 490]


  import sys


In [92]:
path, observations

([489, 514, 539, 514, 489, 490, 465, 464, 489, 490],
 [0, 1, 3, 0, 0, 2, 1, 0, 0, 2])

In [93]:
# for p in path:
#     get_state(p).print()
# print()
# for o in observations:
#     get_state(o).print()

In [94]:
# def plot_path(path, observations):
#     for i in range(len(path)):
#         data = np.zeros((GRID_SIZE, GRID_SIZE))
#         p = (path[i]//GRID_SIZE, path[i]%GRID_SIZE)
#         o = (observations[i]//GRID_SIZE, observations[i]%GRID_SIZE)
# #         print(path[i], observations[i], p, o)
#         if p==o:
#             data[p[0], p[1]] = -0.5
#         else:
#             data[p[0], p[1]] = -0.75
#             data[o[0], o[1]] = -1.0
#         plt.imshow(data, cmap='pink', origin='upper')
#         plt.title("Most Likely v/s Ground Location at time "+str(t))
#         plt.show()

In [95]:
def plot_path(path, observations, title):
    fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(8,8))
    fig.suptitle(title)
    t = 0
    for ax in axes.flat:
        ax.set_title('t='+str(t))
        data = np.zeros((GRID_SIZE, GRID_SIZE))
        p = (path[t]//GRID_SIZE, path[t]%GRID_SIZE)
        o = (observations[t]//GRID_SIZE, observations[t]%GRID_SIZE)
#         print(path[i], observations[i], p, o)
        if p==o:
            data[p[0], p[1]] = -0.5
        else:
            data[p[0], p[1]] = -0.75
            data[o[0], o[1]] = -1.0
        im = ax.imshow(data, cmap='pink', origin='upper')
        t+=1
    fig.colorbar(im, ax=axes.ravel().tolist())
    plt.show()

In [96]:
plot_path(path, observations, "Most Likely v/s Ground Location")

### Manhattan Distance

In [97]:
def compute_error(path, observations):
    dist = []
    for i in range(len(path)):
        p = (path[i]//GRID_SIZE, path[i]%GRID_SIZE)
        o = (observations[i]//GRID_SIZE, observations[i]%GRID_SIZE)
        dist.append(abs(p[0]-o[0]) + abs(p[1]-o[1]))
    return dist

In [56]:
man_dist_5 = compute_error(path, observations)

In [57]:
sum(man_dist_5)

21

In [98]:
man_dist_25 = compute_error(path, observations)

In [99]:
sum(man_dist_25)

326

In [102]:
plt.figure(figsize=(8, 8))
plt.plot(np.arange(MAX_TIME), man_dist_5, marker='o', label="5x5")
plt.plot(np.arange(MAX_TIME), man_dist_25, marker='o', label="25x25")
plt.title("Error b/w most likely and actual path")
plt.xlabel("Time")
plt.ylabel("Error")
plt.legend()
plt.show()