In [None]:
from matplotlib import pyplot, animation
import numpy as np
import torch
import utils
from utils import plotting_utils
from utils import lidar_utils
from importlib import reload
reload(plotting_utils)
reload(lidar_utils)
from tqdm import tqdm
from utils.simulation import BotSimulation, plot_predicted_map
from IPython.display import Image, display
import time


In [None]:
# Generate a simple rectangular map with two circles in the top left and bottom right corners and a wall in the middle that goes halfway up the height of the room
map_size = 40
robot_size = map_size/10
room = plotting_utils.get_map_objects(map_size)

fig, ax = plotting_utils.generate_map(room,map_size)

# set the aspect ratio to be equal
pyplot.axis('equal')
pyplot.show()

In [None]:
room = plotting_utils.get_map_objects(map_size)
# Print the room objects to verify
print("Walls:")
for wall in room['walls']:
    print(f"  {wall}")
print("Circles:")
for circle in room['circles']:
    print(f"  Center: {circle['center']}, Radius: {circle['radius']}")


In [None]:
# This function takes in an x and y coordinate, a radius,
# and a size for the robot and lidar, and returns the coordinates
# of the individual lines that make up the robot and lidar.
robot,lidar = plotting_utils.create_robot(8, 8, 25, size=robot_size)
print("Robot Octagon Coordinates:", robot)
print("Lidar Line Coordinates:", lidar)
# We need a matplotlib axis to plot the room, robot, and lidar
# We use the cordinate version of the room for this (wall and circle objects)
room = plotting_utils.get_map_objects(map_size)
fig, ax = pyplot.subplots()
plotting_utils.show_room_and_robot(ax, room, robot,lidar)

pyplot.show()

In [None]:
# Example path for the robot to follow
path_orientations = [
    (10, 10, 45), (20, 20, 25), (30, 30, 15), (40, 25, 45),
    (50, 10, 25), (60, 20, 57), (70, 30, 180), (70, 35, 170)
]
# Here we animate the robot movement along the path
room = plotting_utils.get_map_objects(map_size)  # Get the room objects again
fig, ax = plotting_utils.generate_map(room,size=map_size)  # Use the previously defined map generation function
ax.set_xlim(0, map_size * 2)
ax.set_ylim(0, map_size)
ax.set_aspect('equal')
anim = plotting_utils.animate_robot_movement(ax, path_orientations, size=robot_size)
# Save the animation as a GIF
anim.save('basic_robot_animation.gif', fps=2)
# Show the animation

## Task 1
Now that we have visualised the basic maps and we have an idea of what we are looking at, we need to start simulating some lidar data. 

**Complete the missing code for the 4 blocks in lidar data following the instructions in the comments.**

Feel free to ask questions, this is quite math heavy.

In [None]:
def cast_lidar_rays(lidar, room, num_rays=10, ray_length=100):
    lidar_x, lidar_y = lidar[0]  # Starting point of the lidar
    front_x, front_y = lidar[1]  # Front point of the lidar

    # CODING: Calculate the angle between the lidar front and the x-axis
    # This angle is the "direction" the lidar is facing
    # Or rather, the center of the scanning cone
    # Recall that an easy means of calculating angles is arctan2 - (check numpy docs)
    angle = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

    # Generate angles for the rays using the angle and the number of rays
    angles = np.linspace(angle - np.pi / 4, angle + np.pi / 4, num_rays)

    intersections = []

    # We will test each ray individually
    for a in angles:
        # CODING: First generate the ray endpoint
        ray_end_x = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
        ray_end_y = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

        # We are going to find the closest intersection point with walls and circles,
        # You will use these for that logic later on
        closest_intersection = None
        closest_distance = float('inf')

        # We will write functions for checking line and circle intersections in a Check walls for intersection
        for wall in room['walls']:
            # CODING: Using the given lidar.line_intersection function, check for intersections
            # and check if that intersection is closer than the current closest intersection
            # if it is, update the closest intersection and distance
            # The line_intersection function takes two points of the ray and two points of the wall
            # The points are tuples of (x, y) coordinates, of which we have individual points for the lidar and ray end
            # The wall is from the room objects we made earlier
            intersection = None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

        # CODING: Now do the same for circles
        # Bear in mind that it is unlikely for a ray to intersect with a circle just once
        for circle in room['circles']:
            intersection = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE


        if closest_intersection:
            intersections.append(closest_intersection)

    return intersections
# Cast lidar rays from the robot's front and check for intersections with walls and circles
robot, lidar = plotting_utils.create_robot(62,20,140, size=4)
lidar_intersections = cast_lidar_rays(lidar, room, num_rays=25)
# Print the lidar intersections
print("Lidar Intersections:")
for i, intersection in enumerate(lidar_intersections):
    print(f"  {intersection}")
    if i >= 10:
        break


If all went well, you should see this: (Double click this box to see it printed right ^.^)

Lidar Intersections:
  (58.472909877623195, 40.0)
  (57.25130500680461, 40.0)
  (56.00480194880258, 40.0)
  (54.721540157960916, 40.0)
  (53.388292490705574, 40.0)
  (51.989879091978516, 40.0)
  (50.508410909484226, 40.0)
  (48.92227739903094, 40.0)
  (48.0, 38.86425855653695)
  (48.0, 36.663253524727864)
  (46.89545935646254, 36.0)

In [None]:
fig, ax = plotting_utils.generate_map(room,size=40)

# Generate some interesting lidar views by changing the robot's position and orientation
# This, of course, if you managed to calculate the lidar intersections correctly
robot, lidar = plotting_utils.create_robot(62,20,140, size=4) # These are the ones you calculated before
lidar_intersections = cast_lidar_rays(lidar, room, num_rays=25)

# visualize them here
plotting_utils.plot_lidar_intersections(ax, robot, lidar, lidar_intersections)
# Show the plot with lidar intersections
pyplot.show()

# "Task" 2

(No coding here, just a playground -- **DO** play though, it gives perspective)

So, now that we have seen what a single lidar scan ought to look like, let's get it moving and start messing it up a bit. The ``BotSimulation`` class is how we take a series a wheel velocities, translate them into global coordinates, and simulate lidar scans for each such global postion along the robot's path. 

Next, this class will allow us to simulate the [odometry](https://en.wikipedia.org/wiki/Odometry) noise that will cause the robot to believe it somewhere different than it.

Finally, we can simulate lidar noise (kindof) which is normal in such scans, as lasers do not bounce very well off all surfaces. 

Note that when the robot moves, the lidar scans are coming from the **real** location, not the *perceived* one, but the scans are in the robot's **local** coordinates, so when simulating this, the scans show that the robot did not see the walls in the correct location. 

Play with the noise and scans here to see how much the different types of noise affect the quality of the percieved map.

We will freeze the values for the main assignment, but this will help get an idea of why we need SLAM (**S**imultaneous **L**ocalisation **a**nd **M**apping). 

*(hint: the robot is the wrong place and map is wrong ^.- get it?)*

In [None]:
simulation = BotSimulation(cast_lidar_rays, robot_size=4, odometry_noise=0.05, lidar_noise=1, velocities=lidar_utils.wheel_velocities, num_scans = 25)
# Run the simulation to get all data
anim = simulation.animate_simulation_and_noise(simulation.all_data)
# Save the animation as a GIF
anim.save('noisy_robot_simulation.gif', fps=12)
# Show the maps and animation
plot_predicted_map(simulation)
display(Image(url=f'noisy_robot_simulation.gif?{int(time.time())}', format='png', width=600))

# Tasks 3

Now comes the **rough** stuff. 

We have provided the majority of the logic for ICP and Pose Graph Estimation. This process attempts to find correlations that are far away in the total robot path that can draw the noisy map recreation closer together (ICP), and thus bring the back to somewhere stable (PGE). 

This stabilization is a bit of a pytorch hack wherein we allow autograd to compute the gradients between the edges and nodes of the pose graph (rather than hessians in the usual process.....ew). 

The pose graph is a series of graph nodes wherein the location and orientation (global x, y, and theta) are stored in nodes, and the indicies and relative transformations (local dx, dy, and dtheta) between two nodes are stored as edges which serve as constraints in an optimization process. 

In effect, the odometry data is doodoo, but still a pretty good guess that stops the path from spiralling out of control. On the other hand, the lidar alignments are *"accurate"* (as long as you did it well), but far from where the noisy path currently is. 

The goal of the optimizer is the drag the path to somewhere that minimizes the the total "tension" on the edges by pulling the nodes to places that balance the odometry data and the lidar data. This will undo the noise form both odometry and lidar as long as the numerous hyperparameters are adjusted well.

This code is pretty complex and tough to learn quickly (take it from me ftlog), so the task is to find the missing code and fill in the gaps. If I did my job right, the rest is still interesting to read and learn from. 

Take a look, finish the code, and proceed to task to 3.5 if you think you did it. Those blocks can help you test things. 



In [None]:
class PoseGraph:
    def __init__(self, simulation, lidar_tolerance, training_steps):
        self.pi = torch.tensor(np.pi, dtype=torch.float32)
        self.simulation = simulation
        # Initialize nodes based on the simulation path
        # These nodes are the poses of the robot at each step in the simulation
        # They also happen to be the very thing we will be optimizing
        self.nodes = self.set_nodes()
        self.all_data = self.simulation.run_simulation()  # Get all data from the simulation
        self.edges = []
        self.weights = []
        self.lidar_tolerance = lidar_tolerance
        self.training_steps = training_steps
        self.lidar_information_factor = 5.0
        self.odometry_information = 20.0

    def compute_relative_transform(self, pose1, pose2):
    # This function is used to find the local transformation from pose1 to pose2
    # These are needed for Pose Graph Estimation
        xi, yi, ti = pose1
        xj, yj, tj = pose2

        # Global difference
        dx = xj - xi
        dy = yj - yi
        # Relative orientation
        dtheta = torch.remainder(tj - ti + self.pi, 2 * self.pi) - self.pi # Rotation is normalized to the range [-pi, pi]

        # To get the translation in pose1's local frame, we must
        # rotate the global vector (dx, dy) by -ti.
        # The rotation matrix R for -ti is:
        #  [[cos(-ti), -sin(-ti)],
        #   [sin(-ti),  cos(-ti)]]
        # which simplifies to:
        #  [[cos(ti),  sin(ti)],
        #   [-sin(ti), cos(ti)]]

        cos_ti = torch.cos(ti)
        sin_ti = torch.sin(ti)

        # CODING: Calculate the local coordinates of the translation
        # Remember that the new location will come from R * [[dx], [dy]] (think matrix multiplicaiton)
        local_dx = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
        local_dy = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

        return torch.stack([local_dx, local_dy, dtheta])

    def set_nodes(self):
        # This function sets the nodes based on the simulation path
        nodes = []
        for step in self.simulation.noise_path:
            x, y, orientation = step
            orientation = np.deg2rad(orientation)  # Convert orientation to radians
            nodes.append((x, y, orientation))
        return torch.tensor(nodes, dtype=torch.float32)  # Convert to tensor for consistency

    def batch_trimmed_icp(self, source_points, target_points, initial_guess = None, max_iterations=1000, tolerance=1e-6):
        source_points_raw = source_points.clone().to(dtype=torch.float32)
        target_points = target_points.clone().to(dtype=torch.float32)
        B = source_points.shape[0] # Batch size

        if initial_guess is None:
            transformations = torch.eye(3, device=source_points.device).unsqueeze(0).repeat(B, 1, 1)
        else:
            transformations = initial_guess.clone().to(dtype=torch.float32)

        current_source_points = torch.einsum('bij,bnj->bni', transformations[:, :2, :2], source_points_raw) + transformations[:, :2, 2].unsqueeze(1)

        converged = torch.zeros(B, dtype=torch.bool, device=source_points.device)

        for _ in range(max_iterations): # ITERATIVE
            if converged.all():
                break

            # Step 1: Find closest points using the current transformed points
            # This means, for each source point cloud,
            # we find the closest point in it's matched target point cloud
            # (which happen to all be the same point cloud).
            distances = torch.cdist(current_source_points, target_points) # Compute pairwise distances
            # CODING: How do we get the indices of the closest points?
            # Recall that the distances tensor is of shape (B, N_rays_source, N_rays_target)
            closest_indices = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # Gather is a nasty function to wrap your head around, so we provide it. Check out how it works
            closest_points = torch.gather(target_points, 1, closest_indices.unsqueeze(-1).repeat(1, 1, 2))

            # Step 2: Trim points based on distance
            trim_ratio = 0.6
            num_keep = int(trim_ratio * current_source_points.shape[1])
            dist_to_closest, _ = distances.min(dim=2)
            sorted_indices = torch.argsort(dist_to_closest, dim=1)
            keep_indices = sorted_indices[:, :num_keep]

            inlier_source = torch.gather(current_source_points, 1, keep_indices.unsqueeze(-1).repeat(1, 1, 2))
            inlier_target = torch.gather(closest_points, 1, keep_indices.unsqueeze(-1).repeat(1, 1, 2))

            # Step 3: Compute centroids of the INLIERS
            # CODING: How do we compute the centroids of the inliers?
            # Hint: We want the average location over every ray in each batch
            source_centroids = None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            target_centroids = None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

            # Step 4: Compute SVD on centered INLIERS
            source_centered = inlier_source - source_centroids
            target_centered = inlier_target - target_centroids
            H = torch.einsum('bni,bnj->bij', source_centered, target_centered)

            # Add a small epsilon for stability
            H += torch.eye(2, device=H.device).unsqueeze(0) * 1e-9
            # CODING how do we get U and Vt
            U, _, Vt = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

            # Step 5: Compute incremental rotation and translation
            # We want these to be incremental because we are moving the point cloud iteratively
            # We apply the iterative rotation and translation to the current source points
            # At the end, we will calcualte the final transformation from the original
            # source points to those we iteratively align

            # CODING: To get the rotation matrix R, we need to use the formula
            # R = V * U^T
            # Note: We have batches of matrices, does torch have a way to handle this?
            # Note also: we have Vt and U, not V and U
            R_iter = None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # Handle reflections
            R_iter = batch_handle_reflections(R_iter, Vt, U)

            t_iter = target_centroids.squeeze(1) - torch.einsum('bij,bj->bi', R_iter, source_centroids.squeeze(1))

            # Step 6.-1: We only want to update the points that are not converged yet
            not_converged_mask = ~converged
            points_to_update = current_source_points[not_converged_mask]
            R_inc = R_iter[not_converged_mask]
            t_inc = t_iter[not_converged_mask]

            # Step 6: WE are going to use the learned translation and rotation to update where the points "are"
            # This is in applied to the original source points, not the inliers
            centroid_of_points_to_update = torch.mean(points_to_update, dim=1, keepdim=True)

            # Step 6.1: Center the points
            centered_points = points_to_update - centroid_of_points_to_update

            # Step 6.2: Apply incremental rotation
            rotated_points = torch.einsum('bij,bnj->bni', R_inc, centered_points)

            # Step 6.3: Add the centroid back and then apply the incremental translation
            new_points = rotated_points + centroid_of_points_to_update + t_inc.unsqueeze(1)

            # Step 6.4: Update the points for the next iteration
            current_source_points[not_converged_mask] = new_points

            # Step 7: Check for convergence
            alignment_errors = torch.mean(torch.norm(current_source_points - closest_points, dim=2), dim=1)
            newly_converged = (alignment_errors < tolerance) & (~converged)
            converged = converged | newly_converged

        # Step 8: Final Transformation Calculation
        # Now that the points are aligned, we find the single transformation that
        # maps the ORIGINAL raw points to the FINAL aligned points.

        # Step 8.1: Centroids of original and final points
        source_raw_centroids = torch.mean(source_points_raw, dim=1, keepdim=True)
        final_points_centroids = torch.mean(current_source_points, dim=1, keepdim=True)

        # Step 8.2: Centered points
        source_raw_centered = source_points_raw - source_raw_centroids
        final_points_centered = current_source_points - final_points_centroids

        # Step 8.3: Final rotation
        H_final = torch.einsum('bni,bnj->bij', source_raw_centered, final_points_centered)
        # CODING how do we get U and Vt
        U_final, _, Vt_final = None, None, None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
        # Coding: Another Rotation matrix, remember how to make this?
        R_final = None# <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
        # Handle reflections in the final matrix
        R_final = batch_handle_reflections(R_final, U_final, Vt_final)

        # Step 8.4: Final translation
        t_final = final_points_centroids.squeeze(1) - torch.einsum('bij,bj->bi', R_final, source_raw_centroids.squeeze(1))

        # Step 8.5: Update the final transformation matrix
        transformations[:, :2, :2] = R_final
        transformations[:, :2, 2] = t_final

        # Step 9: We now calculate the error against the closest points in the target set,
        # which gives a true measure of alignment quality.

        # Step 9.1: Find the final set of closest points from the target cloud
        # We use closest points because the scans may have a non correlated ordering
        final_distances = torch.cdist(current_source_points, target_points)
        final_closest_points = torch.gather(target_points, 1, torch.argmin(final_distances, dim=2).unsqueeze(-1).repeat(1, 1, 2))

        # Step 9.2: Calculate the mean distance to these closest points
        final_alignment_errors = torch.mean(torch.norm(current_source_points - final_closest_points, dim=2), dim=1)

        final_converged_mask = final_alignment_errors < self.lidar_tolerance

        return current_source_points, transformations, final_alignment_errors, final_converged_mask

    def extract_relative_pose_from_transformation(self, transformation):
        """
        Extracts the relative pose (dx, dy, dtheta) from a transformation matrix.
        The transformation matrix is expected to be in the form:
        [[cos(theta), -sin(theta), dx],
         [sin(theta),  cos(theta), dy],
         [0,           0,          1]]
        """
        dx = transformation[0, 2]
        dy = transformation[1, 2]
        dtheta = torch.atan2(transformation[1, 0], transformation[0, 0])  # Extract rotation angle
        return torch.tensor([dx, dy, dtheta], dtype=torch.float32)

    def lidar_edges(self):
        # This function creates edges based on the lidar data
        # It is quite complex so we provide it, feel free to read through (I commented good)
        lidar_data = torch.tensor([data[5] for data in self.all_data], dtype=torch.float32)

        loops_count = 0 # the number of loop closures found
        timestep_limit = 25 # the minimum number of timesteps between two scans to find closures
        spatial_distance_threshold = 10.0 # the minimum distance between two poses to consider them for loop closure
        cross_check_threshold = 0.25 # Max allowed difference between forward and reverse transforms

        pbar = tqdm(range(len(self.all_data)), desc="Finding Loop Closures")

        for i in pbar:
            # We first need to select valid candidates so the code can run fast
            # and so that the alignments are not redundant with the odometry edges
            if lidar_data[i].nelement() == 0:
                continue
            start_index = i + timestep_limit
            if start_index >= len(self.all_data):
                continue
            pose_i_xy = self.nodes[i, :2]
            possible_indices = torch.arange(start_index, len(self.all_data))
            if len(possible_indices) == 0:
                continue
            candidate_poses_xy = self.nodes[possible_indices, :2]
            distances = torch.norm(candidate_poses_xy - pose_i_xy, dim=1)
            close_enough_mask = distances < spatial_distance_threshold
            candidate_indices = possible_indices[close_enough_mask]
            if len(candidate_indices) == 0:
                continue

            # --- BATCHED FORWARD ICP (j -> i) ---
            # First we have to prepare the sets of scans
            # WE compare every candidate j with a series of copies of targets i
            source_scans = lidar_data[candidate_indices]
            target_scan_batch = lidar_data[i].unsqueeze(0).repeat(len(candidate_indices), 1, 1)

            # We feed the batch icp an intital guess based on the odometry data
            # as long as the noise is low enough, this is a good initial guess
            pose_i = self.nodes[i]
            poses_j = self.nodes[candidate_indices]
            T_i = torch.eye(3); T_i[0,0]=T_i[1,1]=torch.cos(pose_i[2]); T_i[0,1]=-torch.sin(pose_i[2]); T_i[1,0]=torch.sin(pose_i[2]); T_i[0,2]=pose_i[0]; T_i[1,2]=pose_i[1]
            T_j = torch.eye(3).unsqueeze(0).repeat(len(poses_j),1,1); cos_tj,sin_tj=torch.cos(poses_j[:,2]),torch.sin(poses_j[:,2]); T_j[:,0,0]=T_j[:,1,1]=cos_tj; T_j[:,0,1]=-sin_tj; T_j[:,1,0]=sin_tj; T_j[:,0,2]=poses_j[:,0]; T_j[:,1,2]=poses_j[:,1]
            T_i_world_to_local = torch.linalg.inv(T_i)
            T_j_local_to_world = T_j
            init_transform_forward = torch.bmm(T_i_world_to_local.unsqueeze(0).repeat(len(poses_j), 1, 1), T_j_local_to_world)

            # Pperform ICP on the batch of scans
            aligned_points, forward_transforms, alignment_errors, converged = self.batch_trimmed_icp(
                source_scans, target_scan_batch,
                initial_guess=init_transform_forward.to(dtype=torch.float32), tolerance=self.lidar_tolerance / 10, max_iterations=100
            )
            # We will only select the good alignments
            final_distances = torch.cdist(aligned_points, target_scan_batch)
            inlier_mask = final_distances.min(dim=2).values < self.lidar_tolerance
            inlier_ratios = inlier_mask.float().mean(dim=1)

            # Just to be sure, we will also run the "good" alignments the other direction
            # This is a cross-check to ensure the match is stable and not ambiguous
            for idx, j in enumerate(candidate_indices):
                if converged[idx] and alignment_errors[idx] < self.lidar_tolerance and inlier_ratios[idx] > 0.5 :
                    # This pair looks good. Now, perform the reverse check (i -> j).
                    # We do this one by one as it's conditional.
                    source_scan_reverse = lidar_data[i].unsqueeze(0)
                    target_scan_reverse = lidar_data[j].unsqueeze(0)

                    # Initial guess for the reverse transform is inv(T_i) * T_j
                    initial_guess = torch.bmm(torch.linalg.inv(T_i.unsqueeze(0)), T_j[idx].unsqueeze(0))

                    _, reverse_transform, _, _ = self.batch_trimmed_icp(
                        source_scan_reverse, target_scan_reverse,
                        initial_guess=initial_guess.to(dtype=torch.float32), tolerance=self.lidar_tolerance / 10, max_iterations=100
                    )

                    # The forward transform maps j -> i.
                    # The reverse transform maps i -> j.
                    # So, the inverse of the reverse transform should also map j -> i.
                    inv_reverse_transform = torch.linalg.inv(reverse_transform.squeeze(0))
                    forward_transform = forward_transforms[idx]

                    # Compare the two. A small difference indicates a stable, non-ambiguous match.
                    transform_diff = torch.norm(forward_transform - inv_reverse_transform)
                    if transform_diff < cross_check_threshold:
                        # This is a high-confidence match. Add the edge.
                        relative_transform = self.extract_relative_pose_from_transformation(forward_transform)
                        information = (self.lidar_information_factor * inlier_ratios[idx]) / (alignment_errors[idx] + 1e-6)
                        self.add_edge(i, j, relative_transform, information)
                        loops_count += 1

            pbar.set_description(f"Loops: {loops_count}")
        self.loop_count = loops_count

    def odometry_edges(self):
        # Create edges based on the odometry data
        for i in range(len(self.nodes) - 1):
            pose1 = self.nodes[i]
            pose2 = self.nodes[i + 1]
            relative_transform = self.compute_relative_transform(pose1, pose2)
            self.add_edge(i, i + 1, relative_transform, self.odometry_information)  # Assign a constant weight for odometry edges

    def add_edge(self, i,j, transformation, weight, lidar = False):
        self.edges.append((i, j, transformation, weight))

    def optimize(self):
        # This optimizer is vectorized well, but we left some stuff out for you to implement
        # The optimization will be done using PyTorch's autograd and Adam optimizer
        # The nodes are the poses of the robot at each step in the simulation
        optimized_nodes = self.nodes.clone().requires_grad_(True)
        optimizer = torch.optim.Adam([optimized_nodes], lr=0.00005)

        # Pre-process edges for vectorized computation
        edge_indices_i = torch.tensor([e[0] for e in self.edges], dtype=torch.long)
        edge_indices_j = torch.tensor([e[1] for e in self.edges], dtype=torch.long)

        target_translations = torch.stack([e[2][:2] for e in self.edges]).to(optimized_nodes.device)
        target_dtheta = torch.stack([e[2][2] for e in self.edges]).to(optimized_nodes.device)
        # the weights have been added as elements of the edges
        # We will use these to weight the loss function
        weights = torch.tensor([e[3] for e in self.edges], dtype=torch.float32, device=optimized_nodes.device).unsqueeze(1)

        # Here is where we get the ground truth path from the simulation
        # This is the path we want to optimize towards
        true_path_list = [list(p) for p in self.simulation.path]
        true_path_rad = torch.tensor(true_path_list, dtype=torch.float32, device=optimized_nodes.device)
        true_path_rad[:, 2] = torch.deg2rad(true_path_rad[:, 2])
        # wWeights are the same for each
        # Is this really optimal? Let me know
        translation_weight = 1.0
        rotation_weight = 1.0

        pbar = tqdm(range(self.training_steps), desc="Optimizing Pose Graph")
        for step in pbar:
            optimizer.zero_grad()

            poses_i = optimized_nodes[edge_indices_i]
            poses_j = optimized_nodes[edge_indices_j]

            xi, yi, ti = poses_i.T
            xj, yj, tj = poses_j.T

            # Predicted GLOBAL difference (this is what we want to optimize)
            predicted_global_dx = xj - xi
            predicted_global_dy = yj - yi
            predicted_global_vec = torch.stack([predicted_global_dx, predicted_global_dy], dim=1)

            # Target local difference (from odometry/ICP measurements)
            target_local_dx = target_translations[:, 0]
            target_local_dy = target_translations[:, 1]

            # Rotate the target local difference into the global frame using the CURRENT pose `ti`
            cos_ti = torch.cos(ti)
            sin_ti = torch.sin(ti)

            # This is the forward rotation matrix for `ti`
            # It transforms a vector from the local frame of pose `i` to the global frame.
            target_global_dx = target_local_dx * cos_ti - target_local_dy * sin_ti
            target_global_dy = target_local_dx * sin_ti + target_local_dy * cos_ti
            target_global_vec = torch.stack([target_global_dx, target_global_dy], dim=1)

            # Now, the translation error is a direct comparison of two global vectors.
            # The gradient from this error will only flow to the positions (x,y), not the angle `ti`.
            error_trans = torch.nn.functional.mse_loss(
                predicted_global_vec,
                target_global_vec,
                reduction='none'
            ).mean(dim=1, keepdim=True)

            # Now we optimize the angles
            # The rotation error is computed in the local frame of the target pose `j`
            dtheta = tj - ti
            predicted_rotations_vec = torch.stack([torch.cos(dtheta), torch.sin(dtheta)], dim=1)
            target_rotations_vec = torch.stack([torch.cos(target_dtheta), torch.sin(target_dtheta)], dim=1)

            error_rot = torch.nn.functional.mse_loss(
                predicted_rotations_vec,
                target_rotations_vec,
                reduction='none'
            ).mean(dim=1, keepdim=True)

            # --- Combine errors with weights (Unchanged) ---
            total_loss = torch.mean(
                (translation_weight * error_trans + rotation_weight * error_rot) * weights
            )
            # CODING: Something obvious is missing here.
            # I think it's two lines of code that you need to add
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
            # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

            with torch.no_grad():
                # CODING : We want to ensure the first node is fixed
                # This is because the first node is the origin of the pose graph
                # How can we set it still?
                optimized_nodes[0] = None # <EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE

            if step % 20 == 0:
                # These lines just print your training progress
                # The loss is DIRECTLY related to the optimization
                # The pos and rot error are only INDIRECTLY related
                # If they go down, it's ACTUALLY working
                with torch.no_grad():
                    pos_error = torch.norm(true_path_rad[:, :2] - optimized_nodes[:, :2], dim=1).mean()
                    d_theta = true_path_rad[:, 2] - optimized_nodes[:, 2]
                    rot_error = torch.atan2(torch.sin(d_theta), torch.cos(d_theta)).abs().mean()
                pbar.set_description(f"Loss: {total_loss.item():.4f}")
                pbar.set_postfix({
                    "Pos Err": f'{pos_error.item():.4f}',
                    "Rot Err": f'{rot_error.item():.4f}'
                })

        self.nodes = optimized_nodes.detach()
def batch_handle_reflections(R, Vt, U):
    # R Bx2x2
    det_R = torch.det(R)
    reflection_mask_final = det_R < 0
    if reflection_mask_final.any():
        Vt_clone = Vt.clone()
        Vt_clone[reflection_mask_final, -1, :] *= -1
        R[reflection_mask_final] = torch.bmm(Vt_clone[reflection_mask_final].transpose(-1, -2), U[reflection_mask_final].transpose(-1, -2))
    return R


# Task 3.5
Here are the blocks to actually test the training and visualise if it worked. We provide a stable seed that is garunteed to work with all the hyperparameters locked in AND the correct math in the other blocks. This means you can run them it to check your math. ^.^

The task is to optimize the Pose Graph. If you fixed all the missing code well this should be no issue. 

In [None]:
np.random.seed(69)  # Set a random seed for reproducibility
simulation = BotSimulation(cast_lidar_rays, robot_size=4, odometry_noise=0.05, lidar_noise=0.0,
                           velocities=lidar_utils.wheel_velocities, num_scans = 100)
prev_edges = None # this line is honestly only here to help not recacluate lidar if it helps

In [None]:
from copy import deepcopy
pose_graph = PoseGraph(simulation, 1.5, 400000)
pose_graph.odometry_edges()
if prev_edges:
    pose_graph.edges = prev_edges  # Use the previous edges if they exist
else:
    pose_graph.lidar_edges()
prev_edges = pose_graph.edges.copy()
pose_graph.optimize()
adjusted_simulation = deepcopy(simulation)
adjusted_simulation.noise_path = pose_graph.nodes.detach().numpy().tolist()
optimized_nodes_numpy = pose_graph.nodes.detach().numpy()
optimized_nodes_numpy[:, 2] = np.rad2deg(optimized_nodes_numpy[:, 2])
adjusted_simulation.noise_path = optimized_nodes_numpy.tolist()

# 3. Re-run the simulation logic to generate a NEW `all_data` packet.
#    This new packet will have robot poses (`pred_robot`) that follow the optimized path.
adjusted_all_data = adjusted_simulation.run_simulation()

# 4. Animate using the NEWLY generated data.
adjusted_anim = adjusted_simulation.animate_simulation_and_noise(adjusted_all_data)

# Save the adjusted animation as a GIF. This will now show the corrected path and map.
adjusted_anim.save('adjusted_robot_simulation.gif', writer='pillow', fps=12)

# Task "4"
This isn't really a task, but if you have time, play with some of the hyperparameters of the training:
 - lr
 - odo noise
 - lidar noise
 - loops_count 
 - timestep_limit 
 - spatial_distance_threshold
 - cross_check_threshold 
 - etc...

 If my hypothesis is correct, most of them will totally mess it up, but you may be able to find a better alignment. Let me know! In any case, seeing how each parameter affects this delicate process can sometimes lead to strong insights.

 Furthermore, if you have any feedback, I would love to know.

 Evenfurthermore, if you have some research ideas on how to do this better, **write them down**. Glad to inspire you, this field is neato and worth looking into. 
