# Covariance Noise Tests

In [1]:
import sys
import os
import yaml
import numpy as np
import time

In [2]:
# Get current directory
current_dir = os.getcwd()

# Assuming you're in variance_calculation folder and scripts is at the same level:
# Add the parent directory to system path to access scripts folder
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)

config_path = os.path.join(parent_dir, 'config', 'config.yaml')

# Load the config file
with open(config_path, 'r') as file:
    config = yaml.safe_load(file)
    
from scripts.vision import Vision
from utils import utils

In [3]:
# Extract webcam configuration
webcam_config = config['webcam']
webcam_device_id = webcam_config['device_id']
webcam_resolution = tuple(webcam_config['resolution'])
webcam_padding = webcam_config['padding']

# Convert camera matrix from list to numpy array
webcam_matrix = np.array(webcam_config['matrix'])

# Convert distortion coefficients from list to numpy array
webcam_distortion = np.array(webcam_config['distortion'])

# Get world dimensions from world config
world_config = config['world']
world_width = world_config['width']
world_height = world_config['height']

# Scale factor 
scale_factor = webcam_resolution[1] / world_width

# Scale speed
speed_scale = config['thymio']['speed_scale']

# Thymio width
thymio_width = config['thymio']['size']['width']
wheel_radius = config['thymio']['size']['wheel_radius']

In [2]:
import tdmclient.notebook
await tdmclient.notebook.start()
print("Successfully connected to Thymio.")

Successfully connected to Thymio.


In [None]:
@tdmclient.notebook.sync_var
def motor_move(left,right):
    global motor_left_target, motor_right_target
    motor_left_target = left
    motor_right_target = right
    
@tdmclient.notebook.sync_var
def motor_stop():
    global motor_left_target,motor_right_target
    motor_left_target = 0
    motor_right_target = 0

In [None]:
def measure_static_noise(vision, num_positions=5):
    
    print("\n=== Static Position Test ===")
    print("This test will measure position and orientation noise when stationary")
    
    var_pos = []
    var_orientation = []
    
    for i in range(num_positions):
        positions = []
        orientations = []
        print(f'\nStatic Test {i+1}/{num_positions}')
        print('Move camera to a new position and press enter to record')
        input()
        
        valid_detections = 0
        attempt_count = 0
        max_attempts = 150
        
        while valid_detections < 100 and attempt_count < max_attempts:
            original_frame, _, pos, orientation, _, found_thymio, _ = vision.get_frame()
            
            if found_thymio:
                positions.append(pos)
                orientations.append(orientation)
                valid_detections += 1
                print(f'\rValid detections: {valid_detections}/100', end='')
            
            attempt_count += 1
            utils.display_frames(original_frame, None, None)
            time.sleep(0.01)
        
        print()
        
        if valid_detections < 10:
            print(f"Warning: Only {valid_detections} valid detections. Skipping...")
            continue
            
        positions = np.array(positions)
        orientations = np.array(orientations)
        
        pos_variance = np.var(positions, axis=0)
        orientation_variance = np.var(orientations)
        
        print(f'Position variance at this location: {pos_variance}')
        print(f'Orientation variance at this location: {orientation_variance}')
        
        var_pos.append(pos_variance)
        var_orientation.append(orientation_variance)
        
    # Calculate final results
    if var_pos and var_orientation:
        mean_pos_variance = np.mean(var_pos, axis=0)
        mean_orientation_variance = np.mean(var_orientation)
        print('\nStatic Test Results:')
        print(f'Average position variance (x, y): {mean_pos_variance}')
        print(f'Average orientation variance: {mean_orientation_variance}')
        return mean_pos_variance, mean_orientation_variance
    return None, None

In [None]:
def measure_velocity_noise(vision, target_velocities=[100, 200, 300, 400]):
    
    print("\n=== Linear Velocity Test ===")
    print("This test will measure velocity noise at different speeds")
    velocity_measurements = {v: [] for v in target_velocities}
    
    for target_v in target_velocities:
        print(f"\nTesting velocity: {target_v} m/s")
        print("Please ensure there's enough straight path ahead")
        print("Place Thymio at starting position and press Enter")
        input()
        
        motor_move(int(speed_scale * target_v), int(speed_scale * target_v))
        print("Letting velocity stabilize...")
        time.sleep(2)
        
        positions = []
        timestamps = []
        start_time = time.time()
        
        print("Recording measurements for 5 seconds...")
        while time.time() - start_time < 5:
            original_frame, _, pos, _, _, found_thymio, _ = vision.get_frame()
            
            if found_thymio:
                positions.append(pos)
                timestamps.append(time.time())
                print(f"\rCollected measurements: {len(positions)}", end='')
            
            utils.display_frames(original_frame, None, None)
            time.sleep(0.01)
        
        print("\nStopping Thymio...")
        motor_stop()
        time.sleep(1)
        
        if len(positions) > 1:
            positions = np.array(positions)
            timestamps = np.array(timestamps)
            dx = positions[1:] - positions[:-1]
            dt = timestamps[1:] - timestamps[:-1]
            velocities = np.sqrt(np.sum(dx**2, axis=1)) / dt
            velocity_measurements[target_v] = velocities
            
            print(f"Average measured velocity: {np.mean(velocities):.3f} m/s")
            print(f"Velocity variance: {np.var(velocities):.6f}")
        else:
            print("Warning: Not enough measurements collected!")
    
    velocity_variances = {v: np.var(measurements) 
                         for v, measurements in velocity_measurements.items()
                         if len(measurements) > 0}
    
    if velocity_variances:
        mean_variance = np.mean(list(velocity_variances.values()))
        print("\nVelocity Test Results:")
        for v, var in velocity_variances.items():
            print(f"Target velocity {v} m/s - Variance: {var:.6f}")
        print(f"Average velocity variance: {mean_variance}")
        return mean_variance
    return None

In [None]:
def measure_angular_velocity_noise(vision, target_angular_vels=[0.5, 1.0, 1.5, 2.0]):
    print("\n=== Angular Velocity Test ===")
    print("This test will measure angular velocity noise at different rotation speeds")
    angular_measurements = {w: [] for w in target_angular_vels}
    
    for target_w in target_angular_vels:
        print(f"\nTesting angular velocity: {target_w} rad/s")
        print("Ensure there's enough space for rotation")
        print("Place Thymio in an open space and press Enter")
        input()
        
        speed_diff = int(speed_scale * target_w * thymio_width/2)
        motor_move(speed_diff, -speed_diff)
        print("Letting rotation stabilize...")
        time.sleep(2)
        
        orientations = []
        timestamps = []
        start_time = time.time()
        
        print("Recording measurements for 5 seconds...")
        while time.time() - start_time < 5:
            original_frame, _, _, orientation, _, found_thymio, _ = vision.get_frame()
            
            if found_thymio:
                orientations.append(orientation)
                timestamps.append(time.time())
                print(f"\rCollected measurements: {len(orientations)}", end='')
            
            utils.display_frames(original_frame, None, None)
            time.sleep(0.01)
        
        print("\nStopping Thymio...")
        motor_stop()
        time.sleep(1)
        
        if len(orientations) > 1:
            orientations = np.array(orientations)
            timestamps = np.array(timestamps)
            dtheta = np.diff(orientations)
            dtheta[dtheta > np.pi] -= 2 * np.pi
            dtheta[dtheta < -np.pi] += 2 * np.pi
            
            dt = timestamps[1:] - timestamps[:-1]
            angular_velocities = dtheta / dt
            angular_measurements[target_w] = angular_velocities
            
            print(f"Average measured angular velocity: {np.mean(angular_velocities):.3f} rad/s")
            print(f"Angular velocity variance: {np.var(angular_velocities):.6f}")
        else:
            print("Warning: Not enough measurements collected!")
    
    angular_variances = {w: np.var(measurements) 
                        for w, measurements in angular_measurements.items()
                        if len(measurements) > 0}
    
    if angular_variances:
        mean_variance = np.mean(list(angular_variances.values()))
        print("\nAngular Velocity Test Results:")
        for w, var in angular_variances.items():
            print(f"Target angular velocity {w} rad/s - Variance: {var:.6f}")
        print(f"Average angular velocity variance: {mean_variance}")
        return mean_variance
    return None

In [None]:
def measure_process_noise(vision):

    print("\n=== Process Noise Test ===")
    
    # Test different movements to capture process uncertainty
    movements = [
        {'v': 100, 'w': 0.0, 'name': 'straight'},  # Straight line
        {'v': 100, 'w': 0.5, 'name': 'curve'},     # Curved path
        {'v': 0.0, 'w': 1.0, 'name': 'rotation'}   # Pure rotation
    ]
    
    process_errors = {
        'x': [],
        'y': [],
        'theta': [],
        'v': [],
        'w': []
    }
    
    for movement in movements:
        print(f"\nTesting {movement['name']} movement")
        print("Place Thymio at starting position and press Enter")
        input()
        
        # Set initial state
        start_frame, _, start_pos, start_orientation, _, found_thymio, _ = vision.get_frame()
        if not found_thymio:
            print("Couldn't detect Thymio at start! Skipping this test.")
            continue
            
        # Record initial state
        x0 = start_pos[0]
        y0 = start_pos[1]
        theta0 = start_orientation
        v0 = movement['v']
        w0 = movement['w']
        
        # Command Thymio
        if movement['w'] == 0:
            motor_move(int(speed_scale * movement['v']), int(speed_scale * movement['v']))
        else:
            speed_diff = int(speed_scale * movement['w'] * thymio_width/2)
            motor_move(speed_diff, -speed_diff)
        
        # Predict state after 1 second using motion model
        dt = 1.0
        predicted_x = x0 + v0 * np.cos(theta0) * dt
        predicted_y = y0 + v0 * np.sin(theta0) * dt
        predicted_theta = theta0 + w0 * dt
        predicted_v = v0
        predicted_w = w0
        
        # Wait for actual movement
        time.sleep(dt)
        
        # Measure actual state
        end_frame, _, end_pos, end_orientation, _, found_thymio, _ = vision.get_frame()
        motor_stop()
        
        if not found_thymio:
            print("Couldn't detect Thymio at end! Skipping this measurement.")
            continue
        
        # Calculate actual velocities
        actual_v = np.sqrt((end_pos[0] - x0)**2 + (end_pos[1] - y0)**2) / dt
        actual_w = (end_orientation - theta0) / dt
        
        # Calculate errors
        process_errors['x'].append(end_pos[0] - predicted_x)
        process_errors['y'].append(end_pos[1] - predicted_y)
        # Handle angle wrapping for theta error
        theta_error = end_orientation - predicted_theta
        theta_error = (theta_error + np.pi) % (2*np.pi) - np.pi
        process_errors['theta'].append(theta_error)
        process_errors['v'].append(actual_v - predicted_v)
        process_errors['w'].append(actual_w - predicted_w)
        
        print(f"Position error (x,y): ({process_errors['x'][-1]:.3f}, {process_errors['y'][-1]:.3f})")
        print(f"Orientation error: {process_errors['theta'][-1]:.3f}")
        print(f"Velocity errors (v,w): ({process_errors['v'][-1]:.3f}, {process_errors['w'][-1]:.3f})")
        
        time.sleep(1)  # Wait before next test
    
    # Calculate process noise (variance of errors)
    process_noise = [
        np.var(process_errors['x']) if process_errors['x'] else 0.1,
        np.var(process_errors['y']) if process_errors['y'] else 0.1,
        np.var(process_errors['theta']) if process_errors['theta'] else 0.1,
        np.var(process_errors['v']) if process_errors['v'] else 0.1,
        np.var(process_errors['w']) if process_errors['w'] else 0.1
    ]
    
    print("\n=== Process Noise Results ===")
    print(f"Process noise diagonal terms:")
    print(f"X position: {process_noise[0]:.6f}")
    print(f"Y position: {process_noise[1]:.6f}")
    print(f"Theta: {process_noise[2]:.6f}")
    print(f"Linear velocity: {process_noise[3]:.6f}")
    print(f"Angular velocity: {process_noise[4]:.6f}")
    
    return process_noise

In [None]:
def run_complete_noise_analysis(vision, thymio):
    """
    Run both measurement and process noise analysis
    """
    # First get measurement noise
    print("=== Measuring Measurement Noise ===")
    mean_pos_variance, mean_orientation_variance = measure_static_noise(vision)
    mean_vel_variance = measure_velocity_noise(vision, thymio)
    mean_ang_variance = measure_angular_velocity_noise(vision, thymio)
    
    # Then get process noise
    print("\n=== Measuring Process Noise ===")
    process_noise = measure_process_noise(vision, thymio)
    
    # Print final recommended parameters
    print("\n=== Final EKF Parameters ===")
    measurement_noise = [
        mean_pos_variance[0],
        mean_pos_variance[1],
        mean_orientation_variance,
        mean_vel_variance,
        mean_ang_variance
    ]
    
    print("\nRecommended Process Noise Covariance Matrix diagonal:")
    print(process_noise)
    print("\nRecommended Measurement Noise Covariance Matrix diagonal:")
    print(measurement_noise)
    
    return {
        'process_noise': process_noise,
        'measurement_noise': measurement_noise
    }

# Run the complete analysis
vision = Vision(
    device_id=webcam_device_id,
    camera_matrix=webcam_matrix,
    dist_coeffs=webcam_distortion,
    resolution=webcam_resolution,
    padding=webcam_padding,
    scale_factor=scale_factor,
    world_width=world_width,
    world_height=world_height 
)
vision.connect_webcam()

try:
    noise_parameters = run_complete_noise_analysis(vision)
finally:
    vision.cleanup_webcam()
    motor_stop()

# Speed Conversion Test

In [5]:
def calculate_conversion_ratios(revolutions_data, wheel_radius_mm, time_duration=20, thymio_speed=200):
    # Calculate average revolutions per second
    revs_per_sec = [revs/time_duration for revs in revolutions_data]
    avg_revs_per_sec = np.mean(revs_per_sec)
    
    # Convert to radians per second (2π radians per revolution)
    speed_rad_sec = avg_revs_per_sec * 2 * np.pi
    
    # Convert to linear speed (mm/s)
    speed_mm_sec = speed_rad_sec * wheel_radius_mm
    
    # Calculate both conversion ratios
    conversion_ratio_rad = thymio_speed / speed_rad_sec  # Thymio units to rad/s
    conversion_ratio_mm = thymio_speed / speed_mm_sec    # Thymio units to mm/s
    
    # Calculate standard deviation for error analysis
    std_dev_revs = np.std(revs_per_sec)
    
    return {
        'average_revs_per_sec': avg_revs_per_sec,
        'speed_rad_sec': speed_rad_sec,
        'speed_mm_sec': speed_mm_sec,
        'conversion_ratio_rad': conversion_ratio_rad,
        'conversion_ratio_mm': conversion_ratio_mm,
        'standard_deviation': std_dev_revs
    }

def analyze_motor_data(left_motor_data, right_motor_data, wheel_radius_mm):
    # Calculate ratios for both motors
    left_results = calculate_conversion_ratios(left_motor_data, wheel_radius_mm)
    right_results = calculate_conversion_ratios(right_motor_data, wheel_radius_mm)
    
    print("=== Motor Speed and Conversion Ratios Analysis ===\n")
    
    print("Left Motor Results:")
    print(f"Average revolutions/second: {left_results['average_revs_per_sec']:.3f} ± {left_results['standard_deviation']:.3f}")
    print(f"Angular speed (rad/s): {left_results['speed_rad_sec']:.3f}")
    print(f"Linear speed (mm/s): {left_results['speed_mm_sec']:.3f}")
    print(f"Conversion ratio (Thymio -> rad/s): {left_results['conversion_ratio_rad']:.3f}")
    print(f"Conversion ratio (Thymio -> mm/s): {left_results['conversion_ratio_mm']:.3f}\n")
    
    print("Right Motor Results:")
    print(f"Average revolutions/second: {right_results['average_revs_per_sec']:.3f} ± {right_results['standard_deviation']:.3f}")
    print(f"Angular speed (rad/s): {right_results['speed_rad_sec']:.3f}")
    print(f"Linear speed (mm/s): {right_results['speed_mm_sec']:.3f}")
    print(f"Conversion ratio (Thymio -> rad/s): {right_results['conversion_ratio_rad']:.3f}")
    print(f"Conversion ratio (Thymio -> mm/s): {right_results['conversion_ratio_mm']:.3f}\n")
    
    # Compare motors
    rad_ratio_difference = abs(left_results['conversion_ratio_rad'] - right_results['conversion_ratio_rad'])
    mm_ratio_difference = abs(left_results['conversion_ratio_mm'] - right_results['conversion_ratio_mm'])
    print(f"Difference between rad/s conversion ratios: {rad_ratio_difference:.3f}")
    print(f"Difference between mm/s conversion ratios: {mm_ratio_difference:.3f}")
    print(f"Average of left and right rad/s conversion ratios: {(left_results['conversion_ratio_rad'] + right_results['conversion_ratio_rad']) / 2:.3f}")
    print(f"Average of left and right mm/s conversion ratios: {(left_results['conversion_ratio_mm'] + right_results['conversion_ratio_mm']) / 2:.3f}")
    
    return left_results, right_results

In [None]:
motor_move(200,200)
time.sleep(20)
motor_stop()

In [8]:
# Results from motor trials
left_motor_trials = [10.2, 10.0, 9.9, 10.1, 9.8]  # Number of revolutions in 20s
right_motor_trials = [10.1, 9.7, 10.1, 10.0, 10.2]  # Number of revolutions in 20s

left_results, right_results = analyze_motor_data(left_motor_trials, right_motor_trials, wheel_radius)

=== Motor Speed and Conversion Ratios Analysis ===

Left Motor Results:
Average revolutions/second: 0.500 ± 0.007
Angular speed (rad/s): 3.142
Linear speed (mm/s): 62.832
Conversion ratio (Thymio -> rad/s): 63.662
Conversion ratio (Thymio -> mm/s): 3.183

Right Motor Results:
Average revolutions/second: 0.501 ± 0.009
Angular speed (rad/s): 3.148
Linear speed (mm/s): 62.958
Conversion ratio (Thymio -> rad/s): 63.535
Conversion ratio (Thymio -> mm/s): 3.177

Difference between rad/s conversion ratios: 0.127
Difference between mm/s conversion ratios: 0.006
Average of left and right rad/s conversion ratios: 63.598
Average of left and right mm/s conversion ratios: 3.180


In [5]:
def convert_thymio_speed(thymio_speed, conversion_ratio_rad=None, conversion_ratio_mm=None):
    result = {}
    if conversion_ratio_rad is not None:
        result['speed_rad_sec'] = thymio_speed / conversion_ratio_rad
    if conversion_ratio_mm is not None:
        result['speed_mm_sec'] = thymio_speed / conversion_ratio_mm
    return result

In [6]:
# Example of converting a new Thymio speed value
thymio_speed = 100  # Example speed to convert
converted_speeds = convert_thymio_speed(
    thymio_speed, 
    left_results['conversion_ratio_rad'],
    left_results['conversion_ratio_mm']
)
print(f"\nConverted Thymio speed {thymio_speed} to:")
print(f"Angular speed: {converted_speeds['speed_rad_sec']:.3f} rad/s")
print(f"Linear speed: {converted_speeds['speed_mm_sec']:.3f} mm/s")


Converted Thymio speed 100 to:
Angular speed: 1.571 rad/s
Linear speed: 31.416 mm/s
