In [469]:
import numpy as np
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
np.random.seed(42)

In [470]:
class MapBelief():
    def __init__(self, init_map, sensor_noise):
        self.map = init_map
        self.landmark_beliefs = [] # array of (mu, cov) pairs
        self.sensor_noise = sensor_noise

    def likelihood(self, landmark):
        l = 0
        n = len(self.landmark_beliefs)
        id = 0
        max_like = -1
        for i, (mu, cov) in enumerate(self.landmark_beliefs):
            # print('LANDMARK', landmark, '\n', mu, '\n', cov)
            tmp = multivariate_normal.pdf(landmark, mu, cov) / multivariate_normal.pdf(mu, mu, cov)
            # l += (1/n) * tmp
            # pick landmark with highest likelihood
            # print(f'tmp {tmp}')
            if tmp > max_like:
                id = i
                max_like = tmp
        l = max_like
        # print(f'likelihood {l}')
        return l, id
    
    def get_landmark_params(self, id):
        return self.landmark_beliefs[id]
    
    def set_landmark_params(self, id, params):
        self.landmark_beliefs[id] = params
    
    def add_landmark(self, params):
        self.landmark_beliefs.append(params)

    def draw_map(self, iter):
        def gaussian_2d(x, y, mean, cov):
            pos = np.stack([x, y], axis=-1)
            inv_cov = np.linalg.inv(cov)
            diff = pos - mean
            exponent = np.einsum('...i,ij,...j->...', diff, inv_cov, diff)
            return np.exp(-0.5 * exponent)

        # Create grid
        grid_size = 100
        x_vals = np.linspace(-2, 6, grid_size)
        y_vals = np.linspace(-2, 7, grid_size)
        x, y = np.meshgrid(x_vals, y_vals)

        # Example: list of (mean, covariance) pairs
        gaussians = self.landmark_beliefs

        # Initialize heatmap
        heatmap = np.zeros((grid_size, grid_size))

        # Add each Gaussian to the heatmap
        for mean, cov in gaussians:
            heatmap += gaussian_2d(x, y, mean, cov)

        # Plot
        plt.imshow(heatmap, extent=[-2, 6, -2, 7], origin='lower', cmap='hot')
        # plt.colorbar(label='Intensity')
        plt.title('Map Belief')
        plt.xlabel('c')
        plt.ylabel('r')
        plt.savefig(f'results/map{iter}.jpg')
        plt.close()

    # Kalman Filter
    def update_belief(self, mu, cov, z_t):
        # Update belief; estimate new state.
        C_t = np.eye(2)
        Q_t = self.sensor_noise
        A_t = np.eye(2)
        R_t = np.eye(2) * 1e-1

        # prediction step. NOTE: assuming landmarks do not move
        mu_bar_next = (A_t @ mu)
        cov_bar_next = (A_t @ (cov @ A_t.T)) + R_t
        # kalman gain
        K_t = cov_bar_next @ (C_t.T @ np.linalg.inv((C_t @ (cov_bar_next @ C_t.T)) + Q_t))
        # correction step
        mu_next = mu_bar_next + (K_t @ (z_t - (C_t @ mu_bar_next)))
        cov_next = (np.eye(2) - (K_t @ C_t)) @ cov_bar_next

        return mu_next, cov_next
            
    def map_update(self, z_t):
        # update and add new observations
        # loop over all observed landmarks
        thresh = 0.5
        for obv in z_t:
            l, id = self.likelihood(obv)
            # print(l)
            if l > thresh:
                # print('UPDATED')
                mu, cov = self.get_landmark_params(id)
                # kalman update
                mu_new, cov_new = self.update_belief(mu, cov, obv)
                self.set_landmark_params(id, (mu_new, cov_new))
            else:
                # add to map_belief.landmark_beliefs
                params = (obv, np.eye(2)*1e-1)
                self.add_landmark(params)


# map_update is called everytime a robot gets observation
# how do we synchronize updates to global belief map
# first call first serve?


In [471]:
g_map = np.array([
    [1, 1, 1, 1, 0, 1],
    [1, 0, 1, 0, 0, 1],
    [0, 0, 1, 0, 0, 1],
    [1, 0, 0, 0, 0, 1],
    [0, 1, 1, 1, 1, 1],
])

r, c = np.where(g_map == 1)
# for i, j in zip(r, c):
#     print(i, j)
coor = np.array([[i.item(), j.item()] for i, j in zip(r, c)])
# print(coor)

def generate_z(landmarks, sensor_noise):
    sensor_noise = sensor_noise
    n = landmarks.shape[0]
    land_idx = np.random.choice(n, size=3, replace=False)
    # print(land_idx)
    observations = []
    # print(landmarks[land_idx])
    for land in landmarks[land_idx]:
        z = np.random.multivariate_normal(land, sensor_noise)
        observations.append(z)

    return np.array(observations)

sensor_n = np.eye(2) * 1e-3

map_belief = MapBelief(init_map=g_map, sensor_noise=sensor_n)

for i in range(100):
    z_t = generate_z(coor, sensor_n)
    # z_t = np.array([[1., 1.]])
    # print(z_t)
    # print(map_belief.landmark_beliefs)
    map_belief.map_update(z_t)
    map_belief.draw_map(i)
    # print(map_belief.landmark_beliefs)
    # print(f'\niter {i}\n')
    # for id, (m,c) in enumerate(map_belief.landmark_beliefs):
    #     print(f'landmark {id}')
    #     print(m)
    #     print(c)