### Extended Kalman Filter

In this problem, we will try to estimate the position of a mobile robot in an environment. The controls that can be given to the robot are "up", "down", "left", "right", and "stay" and the robot moves accordingly. The sensors on the robot can measure square of the distance to some known checkpoints.

In [1]:
import numpy as np
import pygame
import os
from copy import deepcopy
import random

pygame 2.0.0.dev6 (SDL 2.0.10, python 3.8.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def to_pygame(coords, height):
    """Convert coordinates into pygame coordinates (lower-left => top left)."""
    return (coords[0], height - coords[1])

def move(direction, speed, positon, sigma):
    if direction == "up" and position[1] < 795:
        position[1] = position[1] + speed + random.normalvariate(0, sigma)
    elif direction == "down" and position[1] > 5:
        position[1] = position[1] - speed + random.normalvariate(0, sigma)
    elif direction == "right" and position[0] < 795:
        position[0] = position[0] + speed + random.normalvariate(0, sigma)
    elif direction == "left" and position[0] > 5:
        position[0] = position[0] - speed + random.normalvariate(0, sigma)
    else:
        position[1] = position[1] + random.normalvariate(0, sigma)
        position[0] = position[0] + random.normalvariate(0, sigma)
        if position[1]<5:
            position[1] = 5
        elif position[1]>795:
            position[1] = 795
        if position[0]<5:
            position[0] = 5
        elif position[0]>795:
            position[0] = 795

    return position

def get_measurement_from_sensors(position, checkpoints, sigma):
    measurements = []
    for checkpoint in checkpoints:
        measurement = (position[0]-checkpoint[0])**2 + (position[1]-checkpoint[1])**2 + random.normalvariate(0, sigma)
        measurements.append(deepcopy(measurement))
    return measurements

def calculate_H(u, checkpoints):
    H = np.array([2*(u[0]-checkpoints[0][0]), 2*(u[1]-checkpoints[0][1])]).reshape(1,2)
    for i in range(1, len(checkpoints)):
        h = np.array([2*(u[0]-checkpoints[i][0]), 2*(u[1]-checkpoints[i][1])]).reshape(1,2)
        H = np.vstack((H, h))
    return H
    
def EKF(u, sigma, R,Q, z, speed, checkpoints):
    u_ = u+speed
    sigma_ = sigma + R
    H = calculate_H(u_, checkpoints)
    
    K = sigma_.dot(H.T).dot(np.linalg.pinv(H.dot(sigma_).dot(H.T) + Q))
    h_of_u_ = []
    for checkpoint in checkpoints:
        measurement = (u_[0]-checkpoint[0])**2 + (u_[1]-checkpoint[1])**2
        h_of_u_.append(deepcopy(measurement))
    h_of_u_ = np.array(h_of_u_).reshape(len(checkpoints),1)
    u = u_ + K.dot(z - h_of_u_)
    sigma = (np.eye(2) - K.dot(H)).dot(sigma_)
    return u, sigma

In [3]:
screen = pygame.display.set_mode([800,800])
pygame.display.set_caption("EKF")

crashed = False

### checkpoints
checkpoints = [
    [100, 100],
    [100, 600],
    [300, 400],
    [700, 300],
    [700, 600]
]

terminal_speed = 8
ticker = 0
position = np.array([300,300]).reshape(2,1)
estimate = deepcopy(position)

sigma_control = 2
sigma_sensor = 10
sigma = 10*np.eye(2)

while not crashed:
    
    ### taking care of keypresses
    keys=pygame.key.get_pressed()
    if keys[pygame.K_w]:
        if ticker == 0:
            position = move("up", terminal_speed, position, sigma_control)
        ticker = (ticker+1)%50
        speed = [0, terminal_speed]
    elif keys[pygame.K_s]:
        if ticker == 0:
            position = move("down", terminal_speed, position, sigma_control)
        ticker = (ticker+1)%50
        speed = [0, -terminal_speed]
    elif keys[pygame.K_a]:
        if ticker == 0:
            position = move("left", terminal_speed, position, sigma_control)
        ticker = (ticker+1)%50
        speed = [-terminal_speed, 0]
    elif keys[pygame.K_d]:
        if ticker == 0:
            position = move("right", terminal_speed, position, sigma_control)
        ticker = (ticker+1)%50
        speed = [terminal_speed, 0]
    else:
        if ticker == 0:
            position = move("stay", terminal_speed, position, sigma_control)
        ticker = (ticker+1)%50
        speed = [0,0]
        
    for event in pygame.event.get():
        if event.type == pygame.KEYDOWN:
            if event.unicode == "q":
                crashed = True
            
    if crashed:
        exit()
       
    if ticker == 1:
        measurements = get_measurement_from_sensors(position, checkpoints, sigma_sensor)
        R = sigma_control*np.eye(2)
        Q = sigma_sensor*np.eye(len(checkpoints))
        measurements = np.array(measurements).reshape(len(checkpoints),1)
        estimate, sigma = EKF(estimate, sigma, R,Q, measurements, np.array(speed).reshape(2,1), checkpoints)
    
    ### drawing elements on the screen
    screen.fill((255,255,255))
    for checkpoint in checkpoints:
        pygame.draw.circle(screen, (255,0,0), to_pygame(checkpoint,800), 10)
    pygame.draw.circle(screen, (255,0,255), to_pygame([estimate[0][0], estimate[1][0]],800), 15)    #predicted position
    pygame.draw.circle(screen, (0,255,255), to_pygame([position[0][0], position[1][0]],800), 10)     #actual position
    pygame.display.update()

  pygame.draw.circle(screen, (255,0,255), to_pygame([estimate[0][0], estimate[1][0]],800), 15)    #predicted position
