In [1]:
from lego_robot import *
from math import sin, cos, pi, atan2, sqrt
from numpy import *
from slam_d_library import get_observations, write_cylinders

In [2]:
class ExtendedKalmanFilter:
    def __init__(self, state, covariance,
                 robot_width, scanner_displacement,
                 control_motion_factor, control_turn_factor,
                 measurement_distance_stddev, measurement_angle_stddev):

        self.state = state
        self.covariance = covariance


        self.robot_width = robot_width
        self.scanner_displacement = scanner_displacement
        self.control_motion_factor = control_motion_factor
        self.control_turn_factor = control_turn_factor
        self.measurement_distance_stddev = measurement_distance_stddev
        self.measurement_angle_stddev = measurement_angle_stddev

    @staticmethod
    def g(state, control, w):

        x, y, theta = state
        l, r = control
        if r != l:
            alpha = (r - l) / w
            rad = l / alpha
            g1 = x + (rad + w / 2.) * (sin(theta + alpha) - sin(theta))
            g2 = y + (rad + w / 2.) * (-cos(theta + alpha) + cos(theta))
            g3 = (theta + alpha + pi) % (2 * pi) - pi
        else:
            g1 = x + l * cos(theta)
            g2 = y + l * sin(theta)
            g3 = theta

        return array([g1, g2, g3])

    @staticmethod
    def dg_dstate(state, control, w):

        theta = state[2]
        l, r = control
        m = None
        if r != l:

            alpha_t = (r - l)/w
            R_t = l/alpha_t
            J13 = (R_t + w/2.0)*(cos(state[2] + alpha_t) - cos(state[2]))
            J23 = (R_t + w/2.0)*(sin(state[2] + alpha_t) - sin(state[2]))
            m = array([[1, 0, J13], [0, 1, J23], [0, 0, 1]])  # Replace this.

        else:

            J13 = -l * sin(state[2])
            J23 = l * cos(state[2])
            m = array([[1, 0, J13], [0, 1, J23], [0, 0, 1]])

        return m

    @staticmethod
    def dg_dcontrol(state, control, w):

        theta = state[2]
        l, r = tuple(control)

        if r != l:


            a = w/(r - l)**2
            b = 0.5 * ((r + l)/(r - l))
            c = 1/w
            alpha_t = (r - l)/w
            theta_t = theta + alpha_t

            J11 = ((a * r)*(sin(theta_t) - sin(theta))) - (b*cos(theta_t))
            J21 = ((a * r)*(-cos(theta_t) + cos(theta))) - (b*sin(theta_t))

            J12 = -((a * l)*(sin(theta_t) - sin(theta))) + (b*cos(theta_t))
            J22 = -((a * l)*(-cos(theta_t) + cos(theta))) + (b*sin(theta_t))

            m = array([[J11, J12], [J21, J22], [-c, c]])

        else:

            c = l/w
            J11 = 0.5*(cos(theta) + c*sin(theta))
            J21 = 0.5*(sin(theta) - c*cos(theta))
            J12 = 0.5*(-c*sin(theta) + cos(theta))
            J22 = 0.5*(c*cos(theta) + sin(theta))

        m = array([[J11, J12], [J21, J22], [-c, c]])
        return m

    @staticmethod
    def get_error_ellipse(covariance):

        eigenvals, eigenvects = linalg.eig(covariance[0:2, 0:2])
        angle = atan2(eigenvects[1, 0], eigenvects[0, 0])
        return (angle, sqrt(eigenvals[0]), sqrt(eigenvals[1]))

    def predict(self, control):



       
        left, right = control

     
        var_l = (self.control_motion_factor*left)**2 + (self.control_turn_factor*(left-right))**2
        var_r = (self.control_motion_factor*right)**2 + (self.control_turn_factor*(left-right))**2
        covariance_control = diag([var_l, var_r])

       
        V_t = ExtendedKalmanFilter.dg_dcontrol(self.state, control, self.robot_width)
        G_t = ExtendedKalmanFilter.dg_dstate(self.state, control, self.robot_width)
       
        self.covariance = dot(dot(G_t, self.covariance), G_t.T) + dot(dot(V_t, covariance_control), V_t.T)
       
        self.state = ExtendedKalmanFilter.g(self.state, control, self.robot_width)

        

    @staticmethod
    def h(state, landmark, scanner_displacement):
       
        dx = landmark[0] - (state[0] + scanner_displacement * cos(state[2]))
        dy = landmark[1] - (state[1] + scanner_displacement * sin(state[2]))
        r = sqrt(dx * dx + dy * dy)
        alpha = (atan2(dy, dx) - state[2] + pi) % (2 * pi) - pi

        return array([r, alpha])

    @staticmethod
    def dh_dstate(state, landmark, scanner_displacement):

       
        xl = state[0] + scanner_displacement*cos(state[2])
        yl = state[1] + scanner_displacement*sin(state[2])

        deltax = landmark[0] - xl
        deltay = landmark[1] - yl
        q = deltax**2 + deltay**2

        dr_dtheta = (scanner_displacement/sqrt(q)) * (deltax*sin(state[2]) - deltay*cos(state[2]))
        dalpha_dtheta = (-(scanner_displacement/q)*(deltax*cos(state[2]) + deltay*sin(state[2]))) - 1

        return array([[-deltax/sqrt(q), -deltay/sqrt(q), dr_dtheta], [deltay/q, -deltax/q, dalpha_dtheta]])

    def correct(self, measurement, landmark):
        

       
        H = ExtendedKalmanFilter.dh_dstate(self.state, landmark, self.scanner_displacement)
        
        Q = diag([self.measurement_distance_stddev**2, self.measurement_angle_stddev**2])
       
        K = dot(dot(self.covariance, H.T), linalg.inv(dot(dot(H, self.covariance), H.T) + Q))
        
        innovation = array(measurement) - ExtendedKalmanFilter.h(self.state, landmark, self.scanner_displacement)
        innovation[1] = (innovation[1] + pi) % (2*pi) - pi 
        self.state = self.state + dot(K, innovation)
        
        self.covariance = dot((eye(3) - dot(K, H)), self.covariance)

In [3]:
if __name__ == '__main__':
    # Robot constants.
    scanner_displacement = 30.0
    ticks_to_mm = 0.349
    robot_width = 155.0

    # Cylinder extraction and matching constants.
    minimum_valid_distance = 20.0
    depth_jump = 100.0
    cylinder_offset = 90.0
    max_cylinder_distance = 300.0

    # Filter constants.
    control_motion_factor = 0.35  # Error in motor control.
    control_turn_factor = 0.6  # Additional error due to slip when turning.
    # Distance measurement error of cylinders.
    measurement_distance_stddev = 200.0
    measurement_angle_stddev = 15.0 / 180.0 * pi  # Angle measurement error.

    # Measured start position.
    
    initial_state = array([1850.0, 1897.0, 213.0 / 180.0 * pi])
    # We have to obtain the origin point for the center of the robot.
    initial_state = array([initial_state[0] - scanner_displacement*cos(initial_state[2]), initial_state[1] - scanner_displacement*sin(initial_state[2]), initial_state[2]])

    # Covariance at start position.
    initial_covariance = diag([100.0**2, 100.0**2, (10.0 / 180.0 * pi) ** 2])
    # Setup filter.
    kf = ExtendedKalmanFilter(initial_state, initial_covariance,
                              robot_width, scanner_displacement,
                              control_motion_factor, control_turn_factor,
                              measurement_distance_stddev,
                              measurement_angle_stddev)

    # Read data.
    logfile = LegoLogfile()
    logfile.read("robot4_motors.txt")
    logfile.read("robot4_scan.txt")
    logfile.read("robot_arena_landmarks.txt")
    reference_cylinders = [l[1:3] for l in logfile.landmarks]

    # Loop over all motor tick records and all measurements and generate
    # filtered positions and covariances.
    # This is the Kalman filter loop, with prediction and correction.
    states = []
    covariances = []
    matched_ref_cylinders = []
    for i in xrange(len(logfile.motor_ticks)):
        # Prediction.
        control = array(logfile.motor_ticks[i]) * ticks_to_mm
        kf.predict(control)

        # Correction.
        observations = get_observations(
            logfile.scan_data[i],
            depth_jump, minimum_valid_distance, cylinder_offset,
            kf.state, scanner_displacement,
            reference_cylinders, max_cylinder_distance)
        for j in xrange(len(observations)):
            kf.correct(*observations[j])

        # Log state, covariance, and matched cylinders for later output.
        states.append(kf.state)
        covariances.append(kf.covariance)
        matched_ref_cylinders.append([m[1] for m in observations])

    # Write all states, all state covariances, and matched cylinders to file.
    f = open("kalman_prediction_and_correction.txt", "w")
    for i in xrange(len(states)):
        # Output the center of the scanner, not the center of the robot.
        print >> f, "F %f %f %f" % \
            tuple(states[i] + [scanner_displacement * cos(states[i][2]),
                               scanner_displacement * sin(states[i][2]),
                               0.0])
        # Convert covariance matrix to angle stddev1 stddev2 stddev-heading form
        e = ExtendedKalmanFilter.get_error_ellipse(covariances[i])
        print >> f, "E %f %f %f %f" % (e + (sqrt(covariances[i][2, 2]),))
        # Also, write matched cylinders.
        write_cylinders(f, "W C", matched_ref_cylinders[i])

    f.close()