In [None]:
SE2 Uncertainty Estimation

In [12]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim

In [13]:
import os
import sys
import math
base_path = os.path.dirname(os.getcwd())

sys.path.append(base_path)

In [14]:
# Data 
class SE2Group:
  def __init__(self, x, y, theta):
    self.x = x
    self.y = y
    self.theta = theta
  def __add__(self, other):
    x = self.x + other.x * np.cos(self.theta) - other.y * np.sin(self.theta)
    y = self.y + other.y * np.cos(self.theta) + other.x * np.sin(self.theta)
    theta = self.theta + other.theta
    return SE2Group(x, y, theta)
  def parameters(self):
    return np.array([self.x, self.y, self.theta])
  @classmethod
  def from_parameters(cls, x, y, theta):
    return cls(x, y, theta)



In [15]:
class SE2SimpleSimulator:

  def __init__(self, start, step, measurement_noise, motion_noise):
    self.position = start
    self.step = step
    self.motion_noise = motion_noise
    self.measurement_noise = measurement_noise
    self.beacons = np.array(
            [[0, 0.1],
             [0, 0.05],
             [0, 0.0],
             [0, -0.05],
             [0, -0.1]])
    self.beacon_idx = 0

  def motion(self):

    self.position = self.position + self.step
    noisy_prediction = self.step.parameters() + np.random.randn(3) * self.motion_noise
    noisy_prediction[2] = noisy_prediction[2] % (2 * np.pi)
    return self.position.parameters() , noisy_prediction

  def measurement(self):

    self._update_beacon_idx()
    range_beacon = self.beacons[self.beacon_idx, :]
    # Observation z_t
    self.range_measurement = np.linalg.norm(self.position.parameters()[0:2] - range_beacon)
    # Jitter range measurement with noise
    self.range_measurement += np.random.normal(0.0, self.measurement_noise, 1).item()

    return self.range_measurement

  def _update_beacon_idx(self) -> None:
    """
    Update beacon index, and cycle back to 0 if need be.
    """
    self.beacon_idx += 1
    if self.beacon_idx >= self.beacons.shape[0]:
        self.beacon_idx = 0

In [16]:


def random_start_pose():
    """
    Generate a random start pose within the specified bounds.
    :return: A random pose [x, y, theta] in [-0.5, 0.5] for x, y and [0, 2pi] for theta.
    """
    x = np.random.uniform(-0.5, 0.5)
    y = np.random.uniform(-0.5, 0.5)
    theta = np.random.uniform(0, 2 * np.pi)
    return SE2Group(x, y, theta)

def generate_bounded_se2_dataset(
    num_trajectories,
    trajectory_length,
    step_motion,
    motion_noise,
    measurement_noise,
):
    """
    Generates a dataset of SE2 trajectories and corresponding measurements within bounded space.

    :param num_trajectories: Number of trajectories to generate.
    :param trajectory_length: Number of steps per trajectory.
    :param step_motion: Step motion [dx, dy, dtheta].
    :param motion_noise: Noise for motion [x, y, theta].
    :param measurement_noise: Noise for measurements [x, y, theta].
    :param output_file: File to save the generated dataset.
    """
    true_trajectories = np.ndarray((num_trajectories, trajectory_length, 3))
    measurements = np.ndarray((num_trajectories, trajectory_length,1))
    noisy_control = np.ndarray((num_trajectories, trajectory_length, 3))


    for traj_id in range(num_trajectories):
        # Initialize simulator with a random start pose
        start_pose = random_start_pose()
        simulator = SE2SimpleSimulator(
            start=start_pose,
            step=step_motion,
            measurement_noise=measurement_noise,
            motion_noise=motion_noise,
        )

        for step in range(trajectory_length):
            # Simulate motion
            motion, noisy_step = simulator.motion()
            true_trajectories[traj_id, step, :] = motion
            noisy_control[traj_id, step, :] = noisy_step

            # Simulate measurement
            measurement = simulator.measurement()
            measurements[traj_id, step] = measurement

            # Check bounds and reset position if out of bounds
            current_pose = simulator.position.parameters()
            if not (-0.5 <= current_pose[0] <= 0.5 and -0.5 <= current_pose[1] <= 0.5):
                simulator.position = random_start_pose()


    return true_trajectories, noisy_control, measurements

In [17]:
NUM_TRAJECTORIES = 10
TRAJECTORY_LENGTH = 50
STEP_MOTION = SE2Group(0.05, 0.05, np.pi / 20)
MOTION_NOISE = np.array([0.01, 0.01, 0.005])
MEASUREMENT_NOISE = 0.02
batch_size = 32
validation_split = 0.2

# Generate dataset
true_trajectories, noisy_control, measurements = generate_bounded_se2_dataset(
    num_trajectories=NUM_TRAJECTORIES,
    trajectory_length=TRAJECTORY_LENGTH,
    step_motion=STEP_MOTION,
    motion_noise=MOTION_NOISE,
    measurement_noise=MEASUREMENT_NOISE,
)
# true_trajectories_flatten, noisy_control_flatten, measurements_flatten = torch.tensor(true_trajectories).view(-1, 3), torch.tensor(noisy_control).view(-1,1), torch.tensor(measurements).view(-1,1)
measurements_torch = torch.from_numpy(measurements).type(torch.FloatTensor)
ground_truth_torch = torch.from_numpy(true_trajectories).type(torch.FloatTensor)
print(measurements_torch.shape, ground_truth_torch.shape)
ground_truth_torch = torch.flatten(ground_truth_torch, start_dim=0, end_dim=1).type(torch.FloatTensor)
measurements_torch = torch.flatten(measurements_torch, start_dim=0, end_dim=1).type(torch.FloatTensor)
dataset = torch.utils.data.TensorDataset(ground_truth_torch, measurements_torch)
dataset_size = len(dataset)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

train_sampler = torch.utils.data.SubsetRandomSampler(train_indices)
val_sampler = torch.utils.data.SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_sampler, drop_last=True, shuffle=False)
val_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=val_sampler, drop_last=True, shuffle=False)
        

torch.Size([10, 50, 1]) torch.Size([10, 50, 3])


Send the flattened data to learn the distribution of the measurement, send ground truth and train it on the nll of the measurement

In [18]:
# Neural Network

class MLP(nn.Module):
    def __init__(self, input_dim=3, output_dim=(50, 50, 32), hidden_dims=[128, 256, 512]):
        super(MLP, self).__init__()
        self.output_dim = output_dim  # (50, 50, 32)

        # Define the MLP layers
        layers = []
        current_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(current_dim, hidden_dim))
            layers.append(nn.ReLU())
            current_dim = hidden_dim
        
        # Final layer
        final_output_dim = torch.prod(torch.tensor(output_dim)).item()  # Flattened output size
        layers.append(nn.Linear(current_dim, final_output_dim))
        
        self.mlp = nn.Sequential(*layers)
    
    def forward(self, x):
        # Pass through the MLP
        x = self.mlp(x)
        
        # Reshape to (N, 50, 50, 32)
        x = x.view(x.size(0), *self.output_dim)
        return x

In [21]:
from src.distributions.SE2.SE2_FFT import SE2_FFT

num_epochs = 100  # Number of samples
input_dim = 3
grid_size = (10, 10, 20)

model = MLP(input_dim=1, output_dim=grid_size)
model.train()  # Set the model to training mode
optimizer = optim.Adam(model.parameters(), lr=0.001)

fft = SE2_FFT(spatial_grid_size=grid_size,
                  interpolation_method='spline',
                  spline_order=1,
                  oversampling_factor=1)

In [28]:
for epoch in range(num_epochs):
    running_loss = 0.0
    for inputs, targets in train_loader:
        # Forward pass
        outputs = model(targets)
        if torch.isnan(outputs).any():
            print("outputs is nan")
        # Compute loss
        loss = torch.mean(fft.neg_log_likelihood(outputs, inputs))
        if torch.isnan(loss).any():
            print("Loss is nan")
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}")


Loss is nan
Loss is nan
Loss is nan
Loss is nan
Loss is nan
Loss is nan
Loss is nan
Loss is nan
Epoch 1/100, Loss: nan


KeyboardInterrupt: 