# Chapter 7: Rotation Initialization with Chordal Relaxation

## Learning Objectives
- Understand the rotation synchronization problem
- Learn about chordal relaxation for SO(3) optimization
- Implement rotation initialization algorithms
- Compare different initialization strategies
- Apply rotation initialization to pose graph optimization

## 1. The Rotation Synchronization Problem

In pose graph optimization, good initialization is crucial for convergence to the global minimum. The rotation synchronization problem asks:

Given relative rotation measurements $R_{ij}$ between poses $i$ and $j$, find absolute rotations $R_i$ that best satisfy:

$$R_j \approx R_i R_{ij}$$

This is a non-convex problem due to the SO(3) manifold constraint: $R^T R = I$ and $\det(R) = 1$.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix, lil_matrix
from scipy.sparse.linalg import eigsh, eigs
from scipy.linalg import svd, eigh
import networkx as nx
from typing import List, Tuple, Dict, Optional
import time
from mpl_toolkits.mplot3d import Axes3D

## 2. Rotation Representations and Utilities

In [None]:
def rotation_matrix_from_axis_angle(axis: np.ndarray, angle: float) -> np.ndarray:
    """Create rotation matrix from axis-angle representation"""
    axis = axis / np.linalg.norm(axis)
    K = np.array([
        [0, -axis[2], axis[1]],
        [axis[2], 0, -axis[0]],
        [-axis[1], axis[0], 0]
    ])
    R = np.eye(3) + np.sin(angle) * K + (1 - np.cos(angle)) * K @ K
    return R

def rotation_matrix_to_axis_angle(R: np.ndarray) -> Tuple[np.ndarray, float]:
    """Extract axis-angle from rotation matrix"""
    angle = np.arccos(np.clip((np.trace(R) - 1) / 2, -1, 1))
    if abs(angle) < 1e-6:
        return np.array([0, 0, 1]), 0
    axis = np.array([
        R[2, 1] - R[1, 2],
        R[0, 2] - R[2, 0],
        R[1, 0] - R[0, 1]
    ]) / (2 * np.sin(angle))
    return axis, angle

def project_to_SO3(M: np.ndarray) -> np.ndarray:
    """Project a 3x3 matrix to the nearest rotation matrix"""
    U, _, Vt = svd(M)
    R = U @ Vt
    # Ensure det(R) = +1
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = U @ Vt
    return R

def geodesic_distance_SO3(R1: np.ndarray, R2: np.ndarray) -> float:
    """Compute geodesic distance between two rotations"""
    R_diff = R1.T @ R2
    trace = np.clip(np.trace(R_diff), -1, 3)
    return np.arccos((trace - 1) / 2)

def chordal_distance_SO3(R1: np.ndarray, R2: np.ndarray) -> float:
    """Compute chordal distance between two rotations"""
    return np.linalg.norm(R1 - R2, 'fro')

## 3. Chordal Relaxation

The key insight of chordal relaxation is to relax the SO(3) constraint and solve:

$$\min_{R_i \in \mathbb{R}^{3×3}} \sum_{(i,j) \in \mathcal{E}} w_{ij} \|R_i - R_j R_{ij}^T\|_F^2$$

This is a convex problem! After solving, we project each $R_i$ back to SO(3).

In [None]:
class RotationGraph:
    def __init__(self):
        self.vertices = set()
        self.edges = []  # (i, j, R_ij, weight)
        self.n_vertices = 0
    
    def add_vertex(self, vertex_id: int):
        self.vertices.add(vertex_id)
        self.n_vertices = max(self.vertices) + 1
    
    def add_edge(self, i: int, j: int, R_ij: np.ndarray, weight: float = 1.0):
        """Add relative rotation measurement R_ij from i to j"""
        self.add_vertex(i)
        self.add_vertex(j)
        self.edges.append((i, j, R_ij, weight))
    
    def build_connection_laplacian(self) -> np.ndarray:
        """Build the connection Laplacian matrix for chordal relaxation"""
        # Create block matrix L of size (3n x 3n)
        n = self.n_vertices
        L = np.zeros((3*n, 3*n))
        
        for i, j, R_ij, weight in self.edges:
            # Block L_ii += w_ij * I
            L[3*i:3*i+3, 3*i:3*i+3] += weight * np.eye(3)
            # Block L_jj += w_ij * I
            L[3*j:3*j+3, 3*j:3*j+3] += weight * np.eye(3)
            # Block L_ij -= w_ij * R_ij^T
            L[3*i:3*i+3, 3*j:3*j+3] -= weight * R_ij.T
            # Block L_ji -= w_ij * R_ij
            L[3*j:3*j+3, 3*i:3*i+3] -= weight * R_ij
        
        return L
    
    def solve_chordal_relaxation(self, anchor_id: int = 0) -> Dict[int, np.ndarray]:
        """Solve rotation synchronization using chordal relaxation"""
        n = self.n_vertices
        L = self.build_connection_laplacian()
        
        # Fix the anchor rotation to identity
        # Remove rows and columns corresponding to anchor
        anchor_indices = list(range(3*anchor_id, 3*anchor_id+3))
        mask = np.ones(3*n, dtype=bool)
        mask[anchor_indices] = False
        
        L_reduced = L[mask][:, mask]
        
        # Solve the linear system
        # We want to minimize x^T L x, which means finding the eigenvectors
        # corresponding to the smallest eigenvalues
        # For a connected graph, the null space has dimension 3
        
        # Build the right-hand side considering the anchor
        b = np.zeros(3*n - 3)
        
        # Add contributions from edges connected to anchor
        idx = 0
        for v in range(n):
            if v == anchor_id:
                continue
            for i, j, R_ij, weight in self.edges:
                if i == anchor_id and j == v:
                    b[idx:idx+3] += weight * R_ij.T @ np.eye(3).flatten().reshape(3, 3).T
                elif j == anchor_id and i == v:
                    b[idx:idx+3] += weight * np.eye(3).flatten().reshape(3, 3).T
            idx += 3
        
        # Solve the system
        try:
            x = np.linalg.solve(L_reduced, -b)
        except np.linalg.LinAlgError:
            # Use least squares if singular
            x, _, _, _ = np.linalg.lstsq(L_reduced, -b, rcond=None)
        
        # Reconstruct full solution
        full_x = np.zeros(3*n)
        full_x[anchor_indices] = np.eye(3).flatten()
        full_x[mask] = x
        
        # Extract and project rotations
        rotations = {}
        for v in self.vertices:
            R_flat = full_x[3*v:3*v+3]
            # Note: This is a simplified version. In practice, we'd solve for
            # all 9 entries of each rotation matrix
            # For now, we'll use a different approach
            pass
        
        # Alternative: Use spectral approach
        return self._solve_spectral_relaxation(anchor_id)
    
    def _solve_spectral_relaxation(self, anchor_id: int = 0) -> Dict[int, np.ndarray]:
        """Solve using spectral relaxation approach"""
        n = self.n_vertices
        
        # Build matrix where each block (i,j) contains w_ij * R_ij
        M = np.zeros((3*n, 3*n))
        
        for i, j, R_ij, weight in self.edges:
            M[3*i:3*i+3, 3*j:3*j+3] = weight * R_ij
            M[3*j:3*j+3, 3*i:3*i+3] = weight * R_ij.T
        
        # Add small diagonal regularization for numerical stability
        M += 1e-6 * np.eye(3*n)
        
        # Find top 3 eigenvectors
        eigenvalues, eigenvectors = eigh(M)
        top_indices = np.argsort(eigenvalues)[-3:]
        
        # Extract rotations from eigenvectors
        rotations = {}
        V = eigenvectors[:, top_indices[::-1]]  # top 3 eigenvectors
        
        # First, get the anchor rotation
        R_anchor = V[3*anchor_id:3*anchor_id+3, :].T
        
        for v in self.vertices:
            # Extract 3x3 block for this vertex
            R_v = V[3*v:3*v+3, :].T
            
            # Align to anchor
            R_aligned = R_v @ R_anchor.T
            
            # Project to SO(3)
            rotations[v] = project_to_SO3(R_aligned)
        
        return rotations

## 4. Alternative: Iterative Rotation Averaging

Another approach is to iteratively update rotations using the tangent space of SO(3).

In [None]:
def skew_symmetric(v: np.ndarray) -> np.ndarray:
    """Create skew-symmetric matrix from 3D vector"""
    return np.array([
        [0, -v[2], v[1]],
        [v[2], 0, -v[0]],
        [-v[1], v[0], 0]
    ])

def log_SO3(R: np.ndarray) -> np.ndarray:
    """Logarithm map from SO(3) to so(3)"""
    trace = np.trace(R)
    
    if abs(trace - 3) < 1e-6:
        return np.zeros(3)
    
    if abs(trace + 1) < 1e-6:
        # R = I + 2*v*v^T, find v
        if R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
            v = np.array([1 + R[0, 0], R[1, 0], R[2, 0]])
        elif R[1, 1] > R[2, 2]:
            v = np.array([R[0, 1], 1 + R[1, 1], R[2, 1]])
        else:
            v = np.array([R[0, 2], R[1, 2], 1 + R[2, 2]])
        v = v / np.linalg.norm(v)
        return np.pi * v
    
    angle = np.arccos((trace - 1) / 2)
    return angle / (2 * np.sin(angle)) * np.array([
        R[2, 1] - R[1, 2],
        R[0, 2] - R[2, 0],
        R[1, 0] - R[0, 1]
    ])

def exp_SO3(v: np.ndarray) -> np.ndarray:
    """Exponential map from so(3) to SO(3)"""
    angle = np.linalg.norm(v)
    if angle < 1e-6:
        return np.eye(3)
    
    axis = v / angle
    K = skew_symmetric(axis)
    return np.eye(3) + np.sin(angle) * K + (1 - np.cos(angle)) * K @ K

class IterativeRotationAveraging:
    def __init__(self, graph: RotationGraph):
        self.graph = graph
    
    def solve(self, max_iterations: int = 100, tolerance: float = 1e-6,
              anchor_id: int = 0) -> Dict[int, np.ndarray]:
        """Solve rotation synchronization using iterative averaging"""
        # Initialize with random rotations
        rotations = {}
        for v in self.graph.vertices:
            if v == anchor_id:
                rotations[v] = np.eye(3)
            else:
                # Random rotation
                axis = np.random.randn(3)
                axis /= np.linalg.norm(axis)
                angle = np.random.uniform(0, 2*np.pi)
                rotations[v] = rotation_matrix_from_axis_angle(axis, angle)
        
        history = []
        
        for iteration in range(max_iterations):
            # Store old rotations
            old_rotations = {k: v.copy() for k, v in rotations.items()}
            
            # Update each rotation
            for v in self.graph.vertices:
                if v == anchor_id:
                    continue
                
                # Collect measurements
                tangent_sum = np.zeros(3)
                weight_sum = 0
                
                for i, j, R_ij, weight in self.graph.edges:
                    if i == v:
                        # R_j ≈ R_i * R_ij
                        R_target = old_rotations[j] @ R_ij.T
                        tangent = log_SO3(R_target @ old_rotations[v].T)
                        tangent_sum += weight * tangent
                        weight_sum += weight
                    elif j == v:
                        # R_j ≈ R_i * R_ij
                        R_target = old_rotations[i] @ R_ij
                        tangent = log_SO3(R_target @ old_rotations[v].T)
                        tangent_sum += weight * tangent
                        weight_sum += weight
                
                if weight_sum > 0:
                    # Average tangent vector
                    avg_tangent = tangent_sum / weight_sum
                    
                    # Update rotation
                    rotations[v] = exp_SO3(avg_tangent) @ old_rotations[v]
            
            # Compute change
            max_change = 0
            for v in self.graph.vertices:
                change = geodesic_distance_SO3(old_rotations[v], rotations[v])
                max_change = max(max_change, change)
            
            history.append(max_change)
            
            if max_change < tolerance:
                print(f"Converged after {iteration + 1} iterations")
                break
        
        return rotations, history

## 5. Testing Rotation Synchronization

In [None]:
def create_synthetic_rotation_graph(n_vertices: int = 10, 
                                  connectivity: float = 0.3,
                                  noise_level: float = 0.1) -> Tuple[RotationGraph, Dict[int, np.ndarray]]:
    """Create a synthetic rotation graph with known ground truth"""
    graph = RotationGraph()
    
    # Generate ground truth rotations
    ground_truth = {}
    for i in range(n_vertices):
        if i == 0:
            ground_truth[i] = np.eye(3)
        else:
            # Random rotation
            axis = np.random.randn(3)
            axis /= np.linalg.norm(axis)
            angle = np.random.uniform(0, np.pi)
            ground_truth[i] = rotation_matrix_from_axis_angle(axis, angle)
        graph.add_vertex(i)
    
    # Add edges
    n_edges = 0
    for i in range(n_vertices):
        for j in range(i + 1, n_vertices):
            if np.random.random() < connectivity:
                # Compute relative rotation
                R_ij_true = ground_truth[i].T @ ground_truth[j]
                
                # Add noise
                noise_axis = np.random.randn(3)
                noise_axis /= np.linalg.norm(noise_axis)
                noise_angle = np.random.normal(0, noise_level)
                R_noise = rotation_matrix_from_axis_angle(noise_axis, noise_angle)
                
                R_ij_measured = R_noise @ R_ij_true
                
                graph.add_edge(i, j, R_ij_measured)
                n_edges += 1
    
    print(f"Created graph with {n_vertices} vertices and {n_edges} edges")
    return graph, ground_truth

# Test the algorithms
np.random.seed(42)
graph, ground_truth = create_synthetic_rotation_graph(n_vertices=15, 
                                                     connectivity=0.4,
                                                     noise_level=0.1)

In [None]:
# Solve using chordal relaxation
print("Solving with chordal relaxation...")
start_time = time.time()
rotations_chordal = graph.solve_chordal_relaxation(anchor_id=0)
time_chordal = time.time() - start_time

# Solve using iterative averaging
print("\nSolving with iterative averaging...")
solver_iterative = IterativeRotationAveraging(graph)
start_time = time.time()
rotations_iterative, history_iterative = solver_iterative.solve(max_iterations=100)
time_iterative = time.time() - start_time

# Compute errors
def compute_rotation_errors(estimated: Dict[int, np.ndarray], 
                          ground_truth: Dict[int, np.ndarray]) -> Dict[str, float]:
    errors = []
    for v in estimated:
        error = geodesic_distance_SO3(estimated[v], ground_truth[v])
        errors.append(error)
    
    return {
        'mean': np.mean(errors),
        'max': np.max(errors),
        'median': np.median(errors),
        'std': np.std(errors)
    }

errors_chordal = compute_rotation_errors(rotations_chordal, ground_truth)
errors_iterative = compute_rotation_errors(rotations_iterative, ground_truth)

print(f"\nChordal Relaxation (time: {time_chordal:.3f}s):")
for metric, value in errors_chordal.items():
    print(f"  {metric}: {value:.4f} rad ({np.degrees(value):.2f} deg)")

print(f"\nIterative Averaging (time: {time_iterative:.3f}s):")
for metric, value in errors_iterative.items():
    print(f"  {metric}: {value:.4f} rad ({np.degrees(value):.2f} deg)")

## 6. Visualizing Rotation Errors

In [None]:
# Visualize convergence and errors
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Convergence of iterative method
ax = axes[0]
ax.semilogy(history_iterative, linewidth=2)
ax.set_xlabel('Iteration')
ax.set_ylabel('Max Rotation Change (rad)')
ax.set_title('Iterative Averaging Convergence')
ax.grid(True, alpha=0.3)

# Error distribution comparison
ax = axes[1]
vertices = sorted(ground_truth.keys())
errors_c = [geodesic_distance_SO3(rotations_chordal[v], ground_truth[v]) for v in vertices]
errors_i = [geodesic_distance_SO3(rotations_iterative[v], ground_truth[v]) for v in vertices]

x = np.arange(len(vertices))
width = 0.35
ax.bar(x - width/2, np.degrees(errors_c), width, label='Chordal', alpha=0.7)
ax.bar(x + width/2, np.degrees(errors_i), width, label='Iterative', alpha=0.7)
ax.set_xlabel('Vertex ID')
ax.set_ylabel('Rotation Error (degrees)')
ax.set_title('Rotation Errors by Vertex')
ax.legend()
ax.grid(True, alpha=0.3)

# Visualize rotation axes
ax = axes[2]
ax = plt.subplot(1, 3, 3, projection='3d')

# Plot rotation axes for a subset of vertices
vertices_to_plot = vertices[:min(10, len(vertices))]
colors = plt.cm.viridis(np.linspace(0, 1, len(vertices_to_plot)))

for idx, v in enumerate(vertices_to_plot):
    # Ground truth axis
    axis_gt, angle_gt = rotation_matrix_to_axis_angle(ground_truth[v])
    if abs(angle_gt) > 0.01:
        ax.quiver(0, 0, 0, axis_gt[0], axis_gt[1], axis_gt[2], 
                 color=colors[idx], alpha=0.8, linewidth=2, 
                 length=angle_gt/np.pi, label=f'V{v} GT' if idx < 3 else '')
    
    # Estimated axis (chordal)
    axis_est, angle_est = rotation_matrix_to_axis_angle(rotations_chordal[v])
    if abs(angle_est) > 0.01:
        ax.quiver(0, 0, 0, axis_est[0], axis_est[1], axis_est[2], 
                 color=colors[idx], alpha=0.4, linewidth=2, linestyle='--',
                 length=angle_est/np.pi)

ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('Rotation Axes (solid=GT, dashed=estimated)')
ax.set_xlim([-1, 1])
ax.set_ylim([-1, 1])
ax.set_zlim([-1, 1])
if len(vertices_to_plot) <= 3:
    ax.legend()

plt.tight_layout()
plt.show()

## 7. Robustness to Outliers

Let's test how these methods handle outlier measurements.

In [None]:
def add_outliers_to_graph(graph: RotationGraph, outlier_ratio: float = 0.1) -> RotationGraph:
    """Add outlier measurements to the graph"""
    n_outliers = int(len(graph.edges) * outlier_ratio)
    outlier_indices = np.random.choice(len(graph.edges), n_outliers, replace=False)
    
    new_graph = RotationGraph()
    for v in graph.vertices:
        new_graph.add_vertex(v)
    
    for idx, (i, j, R_ij, weight) in enumerate(graph.edges):
        if idx in outlier_indices:
            # Create random rotation as outlier
            axis = np.random.randn(3)
            axis /= np.linalg.norm(axis)
            angle = np.random.uniform(0, np.pi)
            R_outlier = rotation_matrix_from_axis_angle(axis, angle)
            new_graph.add_edge(i, j, R_outlier, weight)
        else:
            new_graph.add_edge(i, j, R_ij, weight)
    
    return new_graph

# Test with different outlier ratios
outlier_ratios = [0.0, 0.1, 0.2, 0.3]
results = {'chordal': [], 'iterative': []}

for outlier_ratio in outlier_ratios:
    print(f"\nTesting with {outlier_ratio*100:.0f}% outliers...")
    
    # Create graph with outliers
    graph_clean, ground_truth = create_synthetic_rotation_graph(n_vertices=20, 
                                                               connectivity=0.3,
                                                               noise_level=0.05)
    graph_outliers = add_outliers_to_graph(graph_clean, outlier_ratio)
    
    # Solve with both methods
    rotations_chordal = graph_outliers.solve_chordal_relaxation(anchor_id=0)
    solver_iterative = IterativeRotationAveraging(graph_outliers)
    rotations_iterative, _ = solver_iterative.solve(max_iterations=100)
    
    # Compute errors
    errors_c = compute_rotation_errors(rotations_chordal, ground_truth)
    errors_i = compute_rotation_errors(rotations_iterative, ground_truth)
    
    results['chordal'].append(errors_c['mean'])
    results['iterative'].append(errors_i['mean'])
    
    print(f"  Chordal: mean error = {np.degrees(errors_c['mean']):.2f} deg")
    print(f"  Iterative: mean error = {np.degrees(errors_i['mean']):.2f} deg")

In [None]:
# Plot robustness results
plt.figure(figsize=(8, 6))
plt.plot([r*100 for r in outlier_ratios], np.degrees(results['chordal']), 
         'o-', linewidth=2, markersize=8, label='Chordal Relaxation')
plt.plot([r*100 for r in outlier_ratios], np.degrees(results['iterative']), 
         's-', linewidth=2, markersize=8, label='Iterative Averaging')
plt.xlabel('Outlier Ratio (%)')
plt.ylabel('Mean Rotation Error (degrees)')
plt.title('Robustness to Outlier Measurements')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

## 8. Application to Full Pose Graph Optimization

Now let's use rotation initialization as a preprocessing step for full pose graph optimization.

In [None]:
class PoseGraphWithRotationInit:
    def __init__(self):
        self.vertices = {}  # id -> (x, y, theta)
        self.edges = []     # (i, j, dx, dy, dtheta, info_matrix)
        self.rotation_graph = RotationGraph()
    
    def add_vertex(self, vertex_id: int, x: float, y: float, theta: float):
        self.vertices[vertex_id] = np.array([x, y, theta])
        self.rotation_graph.add_vertex(vertex_id)
    
    def add_edge(self, i: int, j: int, dx: float, dy: float, dtheta: float,
                 info_matrix: np.ndarray = None):
        if info_matrix is None:
            info_matrix = np.eye(3)
        self.edges.append((i, j, np.array([dx, dy, dtheta]), info_matrix))
        
        # Add to rotation graph (2D rotation as 3D rotation around z-axis)
        R_ij = np.array([
            [np.cos(dtheta), -np.sin(dtheta), 0],
            [np.sin(dtheta), np.cos(dtheta), 0],
            [0, 0, 1]
        ])
        self.rotation_graph.add_edge(i, j, R_ij, info_matrix[2, 2])
    
    def initialize_with_rotation_sync(self, method: str = 'chordal'):
        """Initialize poses using rotation synchronization"""
        print(f"Initializing rotations using {method} method...")
        
        if method == 'chordal':
            rotations_3d = self.rotation_graph.solve_chordal_relaxation(anchor_id=0)
        else:
            solver = IterativeRotationAveraging(self.rotation_graph)
            rotations_3d, _ = solver.solve()
        
        # Extract 2D rotations
        for v in self.vertices:
            R = rotations_3d[v]
            # Extract rotation around z-axis
            theta = np.arctan2(R[1, 0], R[0, 0])
            self.vertices[v][2] = theta
        
        # Now initialize positions using a simple spanning tree approach
        visited = set([0])
        queue = [0]
        
        while queue:
            current = queue.pop(0)
            
            for i, j, measurement, _ in self.edges:
                next_vertex = None
                if i == current and j not in visited:
                    next_vertex = j
                    dx, dy = measurement[0], measurement[1]
                elif j == current and i not in visited:
                    next_vertex = i
                    # Reverse measurement
                    c = np.cos(measurement[2])
                    s = np.sin(measurement[2])
                    dx = -c * measurement[0] - s * measurement[1]
                    dy = s * measurement[0] - c * measurement[1]
                
                if next_vertex is not None:
                    # Transform to global frame
                    theta_current = self.vertices[current][2]
                    c = np.cos(theta_current)
                    s = np.sin(theta_current)
                    
                    x_next = self.vertices[current][0] + c * dx - s * dy
                    y_next = self.vertices[current][1] + s * dx + c * dy
                    
                    self.vertices[next_vertex][0] = x_next
                    self.vertices[next_vertex][1] = y_next
                    
                    visited.add(next_vertex)
                    queue.append(next_vertex)
        
        print("Initialization complete!")

In [None]:
# Create a test scenario
def create_circle_trajectory_with_noise(n_poses: int = 30, radius: float = 10.0,
                                       noise_trans: float = 0.1, 
                                       noise_rot: float = 0.05) -> PoseGraphWithRotationInit:
    graph = PoseGraphWithRotationInit()
    
    # Create circular trajectory
    angles = np.linspace(0, 2*np.pi, n_poses, endpoint=False)
    
    for i, angle in enumerate(angles):
        # Ground truth pose
        x_true = radius * np.cos(angle)
        y_true = radius * np.sin(angle)
        theta_true = angle + np.pi/2
        
        # Add with large initial error
        x_init = x_true + np.random.normal(0, 2.0)
        y_init = y_true + np.random.normal(0, 2.0)
        theta_init = theta_true + np.random.normal(0, 0.5)
        
        graph.add_vertex(i, x_init, y_init, theta_init)
    
    # Add sequential edges
    for i in range(n_poses):
        j = (i + 1) % n_poses
        
        # True relative pose
        angle_diff = 2 * np.pi / n_poses
        dx_true = 2 * radius * np.sin(angle_diff / 2)
        dy_true = 0
        dtheta_true = angle_diff
        
        # Add noise
        dx = dx_true + np.random.normal(0, noise_trans)
        dy = dy_true + np.random.normal(0, noise_trans)
        dtheta = dtheta_true + np.random.normal(0, noise_rot)
        
        graph.add_edge(i, j, dx, dy, dtheta)
    
    # Add some loop closures
    for _ in range(5):
        i = np.random.randint(0, n_poses - 5)
        j = i + np.random.randint(5, min(n_poses - i, 10))
        
        # Compute relative pose with more noise
        angle_i = angles[i] + np.pi/2
        angle_j = angles[j] + np.pi/2
        
        dx_global = radius * (np.cos(angles[j]) - np.cos(angles[i]))
        dy_global = radius * (np.sin(angles[j]) - np.sin(angles[i]))
        
        c = np.cos(angle_i)
        s = np.sin(angle_i)
        dx_local = c * dx_global + s * dy_global
        dy_local = -s * dx_global + c * dy_global
        dtheta = angle_j - angle_i
        
        # Add more noise to loop closures
        dx_local += np.random.normal(0, noise_trans * 2)
        dy_local += np.random.normal(0, noise_trans * 2)
        dtheta += np.random.normal(0, noise_rot * 2)
        
        graph.add_edge(i, j, dx_local, dy_local, dtheta)
    
    return graph

# Test initialization methods
graph = create_circle_trajectory_with_noise(n_poses=25)

# Store initial poses
initial_poses = {k: v.copy() for k, v in graph.vertices.items()}

# Initialize with rotation synchronization
graph.initialize_with_rotation_sync(method='chordal')

# Visualize before and after
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Before initialization
ax = axes[0]
poses = [initial_poses[i] for i in sorted(initial_poses.keys())]
x_coords = [p[0] for p in poses]
y_coords = [p[1] for p in poses]
ax.plot(x_coords + [x_coords[0]], y_coords + [y_coords[0]], 'bo-', 
        markersize=6, label='Initial')
ax.set_aspect('equal')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_title('Before Rotation Initialization')
ax.grid(True, alpha=0.3)
ax.legend()

# After initialization
ax = axes[1]
poses = [graph.vertices[i] for i in sorted(graph.vertices.keys())]
x_coords = [p[0] for p in poses]
y_coords = [p[1] for p in poses]
ax.plot(x_coords + [x_coords[0]], y_coords + [y_coords[0]], 'ro-', 
        markersize=6, label='After Init')

# Plot ground truth for reference
angles = np.linspace(0, 2*np.pi, len(poses), endpoint=False)
x_true = [10 * np.cos(a) for a in angles]
y_true = [10 * np.sin(a) for a in angles]
ax.plot(x_true + [x_true[0]], y_true + [y_true[0]], 'g--', 
        linewidth=2, alpha=0.7, label='Ground Truth')

ax.set_aspect('equal')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_title('After Rotation Initialization')
ax.grid(True, alpha=0.3)
ax.legend()

plt.tight_layout()
plt.show()

## 9. Exercise: Distributed Rotation Averaging

Implement a distributed version of rotation averaging where each node only communicates with its neighbors:

In [None]:
class DistributedRotationAveraging:
    def __init__(self, graph: RotationGraph):
        self.graph = graph
        self.build_adjacency()
    
    def build_adjacency(self):
        """Build adjacency list for the graph"""
        self.neighbors = {v: [] for v in self.graph.vertices}
        
        for i, j, R_ij, weight in self.graph.edges:
            self.neighbors[i].append((j, R_ij, weight))
            self.neighbors[j].append((i, R_ij.T, weight))
    
    def local_update(self, v: int, rotations: Dict[int, np.ndarray]) -> np.ndarray:
        """Update rotation for vertex v based only on neighbors"""
        # TODO: Implement local update rule
        # 1. Collect measurements from neighbors
        # 2. Compute weighted average in tangent space
        # 3. Return updated rotation
        pass
    
    def solve(self, max_iterations: int = 100, 
              tolerance: float = 1e-6) -> Dict[int, np.ndarray]:
        """Solve using distributed consensus"""
        # TODO: Implement distributed solving
        # 1. Initialize rotations
        # 2. Iterate: each node updates based on neighbors
        # 3. Check convergence
        pass

# TODO: Test the distributed algorithm and compare with centralized methods

## Summary

In this chapter, we learned:

1. **The rotation synchronization problem** and its importance for initialization
2. **Chordal relaxation** - converting SO(3) constraints to a convex problem
3. **Spectral methods** for solving rotation synchronization
4. **Iterative rotation averaging** using tangent space updates
5. How to **handle outliers** in rotation measurements
6. **Integration with pose graph optimization** for better initialization

Key takeaways:
- Good rotation initialization dramatically improves pose graph convergence
- Chordal relaxation provides a fast, globally optimal solution (without SO(3) constraints)
- Iterative methods can refine solutions and handle constraints better
- Rotation synchronization is more robust than incremental initialization

Next chapter: We'll explore advanced topics including loop closure detection and global optimization strategies!