Iterative Closest Point
--------------------------------------

This project implements an ICP algorithm and use it alongside GTSAM to perform simultaneous localization & mapping (SLAM) on Lidar scans.

Load in the datset. This cell finds filenames for all of the point cloud files (.ply) in the dataset.

In [None]:
import os
import math
import gtsam
import numpy as np
from tqdm import trange
import matplotlib.pyplot as plt
import gtsam.utils.plot as gtsam_plot
from sklearn.neighbors import NearestNeighbors

from project_assets.helpers import *

LIDAR_FPATH = "project_assets/lidar/"

scans_fnames = []
for file in sorted(os.listdir(LIDAR_FPATH)):
    scans_fnames.append(os.path.join(LIDAR_FPATH, file))

# Visualization

In this section, we'll become aquainted with the dataset.

This dataset is composed of 180 Lidar scans (.ply files) captured by Argo AI, a self-driving car company based in Pittsburgh, PA. These scans were captured over 18 seconds by one of their cars in Miami, which was likely equipped with a Lidar sensor similar to this [one](https://velodynelidar.com/products/hdl-64e/). This 30 second video clip from the car's front camera gives a good intuition of what happens: https://youtu.be/FUDRK_0iEKA. The first 12 seconds, where the car is stationary, are not part of our dataset.

It's night-time and the car starts at an offset T-intersection. It waits for a car, three bicyclists, and another car to pass (12 seconds that are not part of our dataset). Then it makes a left turn onto NW 2nd Ave and traveling down the street (18 seconds that is our dataset).

In this cell, we read in the first Lidar scan and visualizes it at full resolution (~88,000 points). Use the plot menu to zoom, pan, and rotate around the scene. Then, change the index on `scans_fnames` to see some of the later frames.

In [None]:
visualize_clouds(read_ply(scans_fnames[0]), show_grid_lines=True)

This cell reads in the first 20 Lidar scans and visualizes them (as an animation) at a reduced resolution. Due to browser limitations, any additional frames after the first 5 must be rendered at reduced resolutions. Play around with the number of frames loaded by modifying the index on `scans_fnames`.

In [None]:
# the more frames visualized, the lower the resolution of each cloud
clouds_subset = read_ply(*scans_fnames[:20], as_animation=True)

visualize_clouds_animation(clouds_subset, show_grid_lines=True)

In [None]:
# visualizing the entire sequence at a low resolution
clouds_all = read_ply(*scans_fnames, as_animation=True)

visualize_clouds_animation(clouds_all)

# Iterative Closest Point

In this section, we'll load in two point clouds and implement the five steps of ICP to align them.



In [None]:
# we'll read in the first and eleventh clouds
clouda = read_ply(scans_fnames[0])[0]
cloudb = read_ply(scans_fnames[10])[0]
print(clouda.shape, cloudb.shape)

visualize_clouds([clouda, cloudb], show_grid_lines=True, cloud_colors=['#008B8B', '#FFA07A'])

Let's take a moment to talk about the shape of these clouds. Each cloud is a numpy array of shape (3, n), where n is the number of point in the cloud. The first dimension is 3 because each point has an x, y, and z component. Therefore, cloud[0] would be an array of length n that contains the x component of each point in the cloud.

There are five steps to ICP:
1.   Initial transformation
2.   Transform cloud
3.   Assign closest point pairs
4.   Estimate transformation
5.   Repeat steps 2-4 for maximum number of iterations or change is very small

Let's begin with Step 1: the initial tranformation

The initial transform will be an input to the ICP algorithm. Below, we describe some methods through which the initial transform is found.

As you can see in the visualization of the two clouds, the two clouds are nearly identical. This makes sense since the cloudb was captured 1 second after clouda. By hovering over the landmarks we expect to stay stationary on the street (such as parked cars along the street), we can see that cloudb is rotated and translated some x amount from clouda. For someone running icp on two clouds, they could use this guess-timate. Here's how this is done at a self-driving company. Tracking landmarks to compose the initial guess is common. Argo AI can (and does) write CNNs to detect and track landmarks (i.e. street signs, buildings, parked cars), and then computes the distance between stationary landmarks.

Another method could be using kinematic information of the car. The car know's its heading, velocity, and acceleration, allowing us to use the elapsed time to estimate our transform.

Even another method could be centroid of the point clouds. By computing translation between centroida and centroidb, we have half of a guess at the initial transform.

Ultimately, the cloud pairs that are being ICP-ed together are captured less than a second apart. It is possible then to just use the identity transform as the initial guess.

## ICP helper functions

When writing software, 'test driven development' refers to writing your expectation for the software as a unittest before writing the software itself. The pattern of development for this section should be:

1. Review the docstring for the method
2. Write some unit tests to set the expectation for what a correct implementation would look like
3. Write the method itself

Below are some basic unit tests.

In [None]:
import unittest

class TestICPHelpers(unittest.TestCase):

    def setUp(self):
        self.testclouda = np.array([[1], [1], [1]])
        self.testcloudb = np.array([[2, 10], [1, 1], [1, 1]])
        self.testcloudc = np.array([[2], [1], [1]])
        self.testbTa = gtsam.Pose3(gtsam.Rot3(), gtsam.Point3(1, 0, 0))
        self.testcloudd = np.array([[0, 20, 10], [0, 10, 20], [0, 0, 0]])
        self.testcloude = np.array([[10, 30, 20], [10, 20, 30], [0, 0, 0]])

    def test_transform_cloud1(self):
        expected = 2
        actual = transform_cloud(self.testbTa, self.testclouda)[0][0]
        self.assertEqual(expected, actual)

    def test_assign_closest_pairs1(self):
        expected = (3, 1)
        actual = assign_closest_pairs(self.testclouda, self.testcloudb).shape
        self.assertEqual(expected, actual)

    def test_assign_closest_pairs2(self):
        expected = 2
        actual = assign_closest_pairs(self.testclouda, self.testcloudb)[0][0]
        self.assertEqual(expected, actual)

    def test_estimate_transform1(self):
        expected = 1
        actual = estimate_transform(self.testclouda, self.testcloudc).x()
        self.assertEqual(expected, actual)

    def test_estimate_transform2(self):
        expected = 10
        actual = estimate_transform(self.testcloudd, self.testcloude).x()
        self.assertAlmostEqual(expected, actual, places=7)
        actua2 = estimate_transform(self.testcloudd, self.testcloude).y()
        self.assertAlmostEqual(expected, actua2, places=7)

    def test_icp1(self):
        ret = icp(self.testclouda, self.testcloudb)
        expected1 = type(gtsam.Pose3())
        actual1 = type(ret[0])
        self.assertEqual(expected1, actual1)

    def test_icp2(self):
        expected = 1
        actual = icp(self.testclouda, self.testcloudb)[0].x()
        self.assertEqual(expected, actual)

    def test_icp3(self):
        expected = 1
        actual = icp(self.testclouda, self.testcloudc)[0].x()
        self.assertEqual(expected, actual)

Step 2: Transform cloud

Given an input cloud, apply the gtsam.Pose3 transform on each point in the cloud. The recommended approach is convert each point in the point cloud into homogeneous points and apply a homogeneous transformation to each point.

In [None]:
def transform_cloud(transform, clouda):
    """Transforms each point in a cloud
    given a gtsam.Pose3 transform.
    """
    transformed_cloud = clouda.copy() 
    transformed_cloud = np.dot(transform.matrix(), np.vstack((transformed_cloud, np.ones((1, transformed_cloud.shape[-1])))))
    return transformed_cloud[:3,]

In [None]:
suite = unittest.TestSuite()
suite.addTest(TestICPHelpers('test_transform_cloud1'))
unittest.TextTestRunner().run(suite)


Step 3: Assign closest point pairs

For each point in clouda, find the closest (euclidean-wise) point in cloudb and form a pair. Return a cloud shaped like clouda that has been rearranged to form closest pairs index-wise between points in clouda and points in rearranged cloudb.

In [None]:
def assign_closest_pairs(clouda, cloudb):
    """Returns a a point cloud (cloudc) with points from cloudb, such that the
    ith point in cloudc is cloudb's closest point to the ith point in clouda
    by euclidean distance.
    """
    transposeA = clouda.T
    transposeB = cloudb.T
    neigh = NearestNeighbors(n_neighbors=1, algorithm='kd_tree').fit(X=transposeB)
    i = neigh.kneighbors(transposeA)[1]
    cloudc = transposeB[i[:,0]].T #i[0,:]
    return cloudc

In [None]:
suite = unittest.TestSuite()
suite.addTest(TestICPHelpers('test_assign_closest_pairs1'))
suite.addTest(TestICPHelpers('test_assign_closest_pairs2'))
unittest.TextTestRunner().run(suite)

Estimate transform (This step is given by the TAs.)

In normal matrix multiplication, we apply a transformation matrix $A$ to a given input vector $x$ to give us an output vector $b$ (i.e., $Ax=b$). Our output here represents a transformation matrix such that our input vector `clouda` transforms into output vector `cloudb` (i.e, $A=bx^{-1}$).

Thus, given clouda and cloudb, this function will return a `gtsam.pose3` object (consisting of a rotational and translational component) that we can use to estimate a transform that aligns clouda with cloudb (i.e., minimizing the point-to point-distances from clouda to cloudb).


In [None]:
def estimate_transform(clouda, cloudb):
    """Estimate the transform from clouda to cloudb that minimize the sum-of-square
    of the distances from the transformed points in point cloud A to the corresponding
    points in point cloud B.
        e.g.  argmin Σ||T*a_i - b_i||^2
                T    i
    """
    if clouda.shape != cloudb.shape and clouda.shape[1] < 3:
        return None
    
    centroida = np.average(clouda, axis=1)
    centroidb = np.average(cloudb, axis=1)

    clouda_prime = clouda - centroida[:, np.newaxis]
    cloudb_prime = cloudb - centroidb[:, np.newaxis]
    H = np.sum(clouda_prime.T[:,:,None]*cloudb_prime.T[:,None], axis=0)

    aRb = gtsam.Rot3.ClosestTo(H)
    rot_centroidb = aRb.rotate(gtsam.Point3(centroidb))
    atb = gtsam.Point3(centroida - np.array([rot_centroidb[0], rot_centroidb[1], rot_centroidb[2]]))

    return gtsam.Pose3(aRb, atb).inverse()

In [None]:
suite = unittest.TestSuite()
suite.addTest(TestICPHelpers('test_estimate_transform1'))
suite.addTest(TestICPHelpers('test_estimate_transform2'))
unittest.TextTestRunner().run(suite)

## ICP Implementation

Step 5: Repeat steps 2-4 for maximum number of iterations or change is very small.

Per each iteration of ICP, give transformed clouda and cloudb as inputs to `assign_closest_pairs()`.

The first output, `transform`, is a gtsam.Pose3 object, which is a transform describing how to transform clouda to align with cloudb (see Step 4 for explanation of this type of transform). The second output (optional), `icp_series`, lets you visualize each iteration of your ICP algorithm. The format of `icp_series` is an list of lists.

Example: `[[clouda_iter1, cloudb_iter1], [clouda_iter2, cloudb_iter2], [clouda_iter3, cloudb_iter3], ...]`

In [None]:
def icp(clouda, cloudb, initial_transform=gtsam.Pose3(), max_iterations=25):
    """Runs ICP on two clouds by calling
    all five steps implemented above.
    Iterates until close enough or max
    iterations.
    """
    transform = gtsam.Pose3()
    transform = initial_transform
    icp_series = [[clouda, cloudb]]

    for _ in range(max_iterations):
      transformCloud = transform_cloud(transform, clouda)
      closestPointPairs = assign_closest_pairs(transformCloud, cloudb)
      estimated = estimate_transform(clouda, closestPointPairs)
      icp_series.append([transformCloud, closestPointPairs])
      if transform.range(estimated) < 0.1:
        break
      transform = estimated

    return transform, icp_series

In [None]:
suite = unittest.TestSuite()
suite.addTest(TestICPHelpers('test_icp1'))
suite.addTest(TestICPHelpers('test_icp2'))
suite.addTest(TestICPHelpers('test_icp3'))
unittest.TextTestRunner().run(suite)

Below is a simple test for ICP before running it on bigger point clouds. Each point cloud contains 3 points as corners of a triangle.

In [None]:
# create 2 triangles
triangle = np.array([[0.0, 1.0, 1.0],
                 [0.0, 0.0, 0.5],
                 [0.0, 0.0, 0.0]])
triangle_translated = triangle + 1.0

# Use ICP to find the transform between them
transform, icp_series = icp(triangle_translated, triangle)
print(transform_cloud(transform, triangle_translated))
print(triangle)
print(transform)

Now let's run ICP on the LIDAR data.  
The animation shows how clouda has moved after each iteration of ICP. You should see stationary landmarks, like walls and parked cars, converge onto each other.

In [None]:
transform, icp_series = icp(clouda, cloudb)
print(transform)

visualize_clouds([transform_cloud(transform, clouda), cloudb], show_grid_lines=True, cloud_colors=['#008B8B', '#FFA07A'])
# If you have returned a series of intermediate ICP clouds to visualize, you can uncomment the line below
# visualize_clouds_animation(icp_series, speed=400, show_grid_lines=True, cloud_colors=['#008B8B', '#FFA07A'])

# GTSAM Introduction

This section provides a brief introduction to the GTSAM library, which will be used to create a factor graph for pose optimization in the next section.

First, let's get faimiliar with represeting and visualizing 3D pose in GTSAM by running the following code block.

In [None]:
# translation is represented by a Point3 object (same as numpy array with 
# lenghth 3). The 3 elements corresponds to x, y, z values
translation = gtsam.Point3(1, 1, 2)

# rotation is represented by Rot3 class. There are several ways to create a Rot3. 
# We create one by specifying the yaw, pitch, roll angles
rotation = gtsam.Rot3.Ypr(np.pi/2, 0, 0)

# 3D pose is represeted by Pose3 class, which can be constructed with its 
# rotational (Rot3) and translational (Point3) componenets
pose = gtsam.Pose3(rotation, translation)

# you can directly use print to show the pose. The rotation is shown as a 3x3 
# rotation matrix, and its translation is shown as a 3-vector
print('pose:\n', pose)

# The uncertainty of the pose is reprsented its covariance matrix, which is of 
# shape 6x6. The upper-left 3x3 submatrix represents the covariance for rotation, 
# and the lower-right 3x3 submatrix represents the covariance for translation.
marginal = np.identity(6)
marginal[3, 3] = 5
marginal[4, 4] = 1
marginal[5, 5] = 0.5

# You can use the plot_pose3 function to visualize the pose with uncertaintites
# The x, y, z axes are represented with red, green, blue.
fig_num = 0
gtsam_plot.plot_pose3(fig_num, pose, axis_length=40, P=marginal);

The following code block introduces factor graph optimization with a simple pose graph example. The factor graph includes 5 variables representing the poses of the robot at 5 time steps. There's a prior factor on the first variable, "between factor" (represeting odometry measurement) connecting consecutive poses, and a loop closure factor.

In [None]:
# # Factor graph example 

################################################################################
########################## Create noise model ##################################
################################################################################
# noise model represents the uncertainty for the measurements. In this example, 
# we directly specify uncertainty with standard deviation (sigma). The first 3 
# elements represent the sigmas of rotation; the last 3 elements represent the 
# sigmas of translation
prior_sigmas = np.array([0.1, 0.1, 0.1, 0.3, 0.3, 0.3])/20
between_sigmas = np.array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2])/20
prior_noise = gtsam.noiseModel.Diagonal.Sigmas(prior_sigmas)
between_noise = gtsam.noiseModel.Diagonal.Sigmas(between_sigmas)

################################################################################
######################## Create the factor graph ###############################
################################################################################
# - As you've learned that a factor graph consists of variable nodes and factor
# nodes. In GTSAM, a factor graph is a container of factors, while we do not 
# explicitly store variables in a factor graph. 
# - A variable is referred to by a unique "name" (an integer). We use 1, 2, 3,
# 4, 5 for the "names" of the 5 poses.
# - In this example, we will create a factor graph as follows:
#           5---•---4
#           |       |
#           •       •
#           |       |
#  •--1--•--2---•---3

# create an empty factor graph. 
example_graph = gtsam.NonlinearFactorGraph()

# - We first create a prior factor  on the first pose (with name 1), with prior 
# value at the origin, and add it to the factor graph
prior_factor = gtsam.PriorFactorPose3(1, gtsam.Pose3(), prior_noise)
example_graph.add(prior_factor)

# We then add "between factors" that represent odometry measurements to connect 
# consecutive pose variables. A between factor is specified by the name of the 
# first pose variable, the name of the second pose variable, and the relative 
# transform between the two poses.
# Notice that the noise model is required for all factors
T_12 = gtsam.Pose3(gtsam.Rot3.Rz(0), gtsam.Point3(2, 0, 0))
T_23 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi/2), gtsam.Point3(2, 0, 0))
T_34 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi/2), gtsam.Point3(2, 0, 0))
T_45 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi/2), gtsam.Point3(2, 0, 0))
example_graph.add(gtsam.BetweenFactorPose3(1, 2, T_12, between_noise)) 
example_graph.add(gtsam.BetweenFactorPose3(2, 3, T_23, between_noise)) 
example_graph.add(gtsam.BetweenFactorPose3(3, 4, T_34, between_noise)) 
example_graph.add(gtsam.BetweenFactorPose3(4, 5, T_45, between_noise)) 

# The loop closure factor is also represented as "between factor"
T_52 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi/2), gtsam.Point3(2, 0, 0))
example_graph.add(gtsam.BetweenFactorPose3(5, 2, T_52, between_noise)) 


################################################################################
######################## Create initial guess ##################################
################################################################################
# - Initial values are necessary for nonlinear optimization
# - GTSAM uses "Values" type to store the values for variables. "Values" is like
# a dictionary object in python, it maps the unique "name" of the variable to 
# its value
# - The initial values do not need to be very precise.

# create empty Values object
example_initial_estimate = gtsam.Values()

# add initial estiamtes of poses to Values
T_w1 = gtsam.Pose3(gtsam.Rot3.Rz(0.2), gtsam.Point3(0.5, 0.0, 0.0))
T_w2 = gtsam.Pose3(gtsam.Rot3.Rz(-0.2), gtsam.Point3(2.3, 0.0, 0.0))
T_w3 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi/2), gtsam.Point3(4.1, 0.3, 0.0))
T_w4 = gtsam.Pose3(gtsam.Rot3.Rz(math.pi), gtsam.Point3(3.9, 2.2, 0.0))
T_w5 = gtsam.Pose3(gtsam.Rot3.Rz(-math.pi), gtsam.Point3(2.1, 2.1, 0.0))
example_initial_estimate.insert(1, T_w1)
example_initial_estimate.insert(2, T_w2)
example_initial_estimate.insert(3, T_w3)
example_initial_estimate.insert(4, T_w4)
example_initial_estimate.insert(5, T_w5)

################################################################################
################################# optimize #####################################
################################################################################
# - Optimize the initial values using a Gauss-Newton nonlinear optimizer

# set parameters for the optimizer
opt_parameters = gtsam.GaussNewtonParams()
opt_parameters.setRelativeErrorTol(1e-5)
opt_parameters.setMaxIterations(100)

# create the optimization problem with the factor graph and initial values
ex_optimizer = gtsam.GaussNewtonOptimizer(example_graph, example_initial_estimate, opt_parameters)

# solving the optimization problem returns the optimized values for variables
ex_result = ex_optimizer.optimize()

################################################################################
############################### visualization ##################################
################################################################################
# you can use print to show factor graphs and values
print("Factor Graph:\n", example_graph)
print("Final Result:\n", ex_result)

In [None]:
# the uncertainties for each variable can be extracted by the "marginal" 
# probablity density of the variable
marginals = gtsam.Marginals(example_graph, ex_result)

# Visualize the optimzied poses with uncertainties
fig = plt.figure(0, figsize=(8, 8), dpi=120)
ax = fig.gca(projection='3d')
for i in range(1, 6):
    gtsam_plot.plot_pose3(0, ex_result.atPose3(i), 0.5, marginals.marginalCovariance(i))

def axisEqual3D(ax):
    extents = np.array([getattr(ax, 'get_{}lim'.format(dim))() for dim in 'xyz'])
    sz = extents[:,1] - extents[:,0]
    centers = np.mean(extents, axis=1)
    maxsize = max(abs(sz))
    r = maxsize/2
    for ctr, dim in zip(centers, 'xyz'):
        getattr(ax, 'set_{}lim'.format(dim))(ctr - r, ctr + r)

axisEqual3D(ax)
ax.view_init(azim=-90, elev=90)
plt.show()

# Pose Graph Optimization

In this section, we'll build a factor graph to estimate the pose of our vehicle using the transforms our ICP algorithm gives us between frames. These ICP transforms are the factors that tie the pose variables together.

We will be using GTSAM to construct the factor graph as well as perform a optimization for the pose of the car as it travels down the street. Recall from PoseSLAM describe in the LIDAR slides how we could add a factor (aka constraint) between two state variables. When we revisited a state, we could add a loop closure. Since the car in our dataset never revisits a previous pose, there is no loop closure.

In [None]:
import unittest

class TestFactorGraph(unittest.TestCase):

    def setUp(self):
        max_steps = 6
        test_clouds = read_ply(*scans_fnames)[:max_steps]
        PRIOR_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([1e-6, 1e-6, 1e-6, 1e-4, 1e-4, 1e-4]))
        ICP_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([1e-6, 1e-6, 1e-6, 1e-4, 1e-4, 1e-4]))

        initial_pose = gtsam.Pose3(gtsam.Rot3(0.9982740, -0.0572837,  0.0129474, 0.0575611,  0.9980955, -0.0221840, -0.0116519,  0.0228910,  0.9996701),
                          gtsam.Point3(-263.9464864482589, 2467.3015467381383, -19.374652610889633))
        test_graph, test_initial_estimates = construct_graph_and_initial_estimates(initial_pose, test_clouds, PRIOR_NOISE, ICP_NOISE)
        self.max_steps = max_steps
        self.graph = test_graph
        self.initial_estimates = test_initial_estimates
    
    def test_graph_params(self):
        self.assertTrue(type(self.graph) == gtsam.NonlinearFactorGraph)
        # 1 prior factor, 5 between factors connecting consecutive poses, 2 between
        # factors for skip connection
        self.assertTrue(self.graph.size() == self.max_steps + int((self.max_steps-1)/2))
    
    def test_initial_estimates_params(self):
        self.assertTrue(type(self.initial_estimates) == gtsam.Values)
        self.assertTrue(self.initial_estimates.size() == self.max_steps)
        for i in range(self.max_steps):
            self.assertTrue(self.initial_estimates.exists(i))


The ICP implementation is used here to find the transform between two subsequent clouds. These transforms become the factors between pose variables in the graph. So, it needs to go through all the point clouds and run icp pair-wise to find the relative movement of the car. With these transformation, create a factor representing the transform between the pose variables.

We talked about how loop closure helps us consolidate conflicting data into a better global estimate. Unfortunately, our car does not perform a loop closure. So, our graph would just be a long series of poses connected by icp-returned transforms. However, our lidar scans are noisy, which means that our icp-returned transforms are not perfect either. This ultimately results in incorrect vehicle poses and overall map. One way that we can augment our graph is through "skip connections". We simply run ICP between every other cloud, and add these skip connections into the graph. You can basically perform ICP between two non-consecutive point clouds and add that transform as a factor in the factor graph.

In [None]:
def construct_graph_and_initial_estimates(initial_pose, clouds, prior_noise, icp_noise):
    """construct a factor graph and initial value estimates using transforms 
    generated from aligning point clouds with icp.

    Returns a graph (gtsam.NonlinearFactorGraph) with prior factor on the first 
    pose, and the between factors between pairs of clouds, along with our input 
    initial_estimates (gtsam.Values) of initial estimates for our vehicle pose 
    in world coordinates.
    """
    graph = gtsam.NonlinearFactorGraph()
    initial_estimates = gtsam.Values()

    graph.add(gtsam.PriorFactorPose3(0, initial_pose, prior_noise))
    initial_estimates.insert(0, initial_pose)
    factor_pose = initial_pose
    previousTransform = gtsam.Pose3()

    # Add ICP Factors between each pair of clouds
    for i in trange(len(clouds) - 1):
        T, _ = icp(clouds[i+1], clouds[i], initial_transform=previousTransform)
        previousTransform = T
        T = T.inverse()
        graph.add(gtsam.BetweenFactorPose3(i, i+1, T, icp_noise))
        factor_pose = factor_pose.compose(T)
        initial_estimates.insert(i+1, factor_pose)

    previousTransform = gtsam.Pose3()
    # Add skip connections between every other frame
    for i in trange(0, len(clouds) - 2, 2):
        T, _ = icp(clouds[i+2], clouds[i], initial_transform=previousTransform)
        previousTransform = T
        T = T.inverse()
        graph.add(gtsam.BetweenFactorPose3(i, i+2, T, icp_noise))

    return graph, initial_estimates

In [None]:
suite = unittest.TestSuite()
suite.addTest(TestFactorGraph('test_graph_params'))
suite.addTest(TestFactorGraph('test_initial_estimates_params'))
unittest.TextTestRunner().run(suite)

The real power of GTSAM will show here. In five lines, we'll setup a Gauss Newton nonlinear optimizer and optimize for the vehicle's poses in world coordinates.


In [None]:
# load in all clouds in our dataset
clouds = read_ply(*scans_fnames)

# We get the initial pose of the car from Argo AI's dataset, and we add it to the graph as such
PRIOR_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([1e-6, 1e-6, 1e-6, 1e-4, 1e-4, 1e-4]))
ICP_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([1e-6, 1e-6, 1e-6, 1e-4, 1e-4, 1e-4]))
initial_pose = gtsam.Pose3(gtsam.Rot3(0.9982740, -0.0572837,  0.0129474, 0.0575611,  0.9980955, -0.0221840, -0.0116519,  0.0228910,  0.9996701),
                           gtsam.Point3(-263.9464864482589, 2467.3015467381383, -19.374652610889633))

# We'll use the function to create the factor graph and initial estiamtes
graph, initial_estimates = construct_graph_and_initial_estimates(initial_pose, clouds, PRIOR_NOISE, ICP_NOISE)

# Now optimize for the states
parameters = gtsam.GaussNewtonParams()
parameters.setRelativeErrorTol(1e-5)
parameters.setMaxIterations(100)
optimizer = gtsam.GaussNewtonOptimizer(graph, initial_estimates, parameters)
result = optimizer.optimize()

Let's plot these poses to see how our vehicle moves.

In [None]:
poses_cloud = np.array([[], [], []])
for i in range(len(clouds)):
    poses_cloud = np.hstack([poses_cloud, np.array([[result.atPose3(i).x()], [result.atPose3(i).y()], [result.atPose3(i).z()]])])

init_car_pose = gtsam.Pose3(gtsam.Rot3(0.9982740, -0.0572837,  0.0129474, 0.0575611,  0.9980955, -0.0221840, -0.0116519,  0.0228910,  0.9996701),
                            gtsam.Point3(-263.9464864482589, 2467.3015467381383, -19.374652610889633))
visualize_clouds([poses_cloud, transform_cloud(init_car_pose, clouds[0])], show_grid_lines=True, cloud_colors=['#ffa500', '#FFFFFF'])

# Mapping

In this section, we'll tackle the mapping component of SLAM (Simulataneous Localization and Mapping). The previous section used a factor graph to localize our vehicle's poses in world coordinates. We'll now use those poses to form a map of the street from the point clouds.

Given the poses and the clouds, this task is easy. We'll use the `transform_cloud` method from the ICP section to transform every other cloud in our dataset to be centered at the corresponding pose where the cloud was captured. Visualizing all of these clouds yields the complete map. We don't use every cloud in our dataset to reduce the amount of noise in our map while retaining plenty of detail.

Generate the full map using the set of clouds provided and the results of the optimization. Then plot these transformed clouds to visualize the entire map the car saw in 18 seconds.

In [None]:
def transform_map(transforms, clouds):
    """Use the `transform_cloud` method to transform each
    cloud in `clouds` by its pose given in `transforms`.
    """

    clouds_w = []
    for i in range(len(clouds)):
        cloud_i = clouds[i]
        cloud_w = transform_cloud(transforms.atPose3(i).inverse(), cloud_i)
        clouds_w.append(cloud_w)
    return clouds_w

In [None]:
visualize_clouds(transform_map(result, clouds), show_grid_lines=True, cloud_colors="#C6C6C6")