In [20]:
from pathlib import Path
from infrastructure.robot.fleet import get_fleet_data
from wayve.core.calibration.load_calibration import load_calibration_textproto, load_v2_textproto
from wayve.core.calibration.backcompat_support import convert_to_v2, convert_to_v1
from google.protobuf.text_format import MessageToString
from google.protobuf.json_format import MessageToDict, ParseDict
from wayve.core.interfaces.protobuf.sensor_calibration_pb2 import CalibrationState
from wayve.interfaces.protobuf.vehicle_calibration_pb2 import VehicleCalibration, SensorCalibration
from google.protobuf.text_format import Parse, ParseError
from google._upb._message import MessageMapContainer as MMC

REPO_ROOT = "/home/benjin/Development/wcs/WayveCode"
CALIBRATION_DIR = Path(REPO_ROOT) / "infrastructure" / "robot" / "config"

VEHICLES = [entry.name for entry in CALIBRATION_DIR.iterdir() if entry.is_dir()]
MAXUS = ["avon", "beni", "congo"]
VEHICLES = ["shire"]

In [24]:
def get_calibration_gen1(vehicle_dir: Path, prior: bool):
    if prior:
        textproto_file = vehicle_dir / "bb" / "vehicle_calibration_default.textproto"

    else:
        textproto_file = vehicle_dir / "bb" / "vehicle_calibration.textproto"

    try:
        calib = load_v2_textproto(textproto_file)
    except Exception:
        calib = CalibrationState()
        Parse(Path(textproto_file).read_text(), calib) 
        calib = convert_to_v2(calib)
    
    return calib
    

def get_calibration_gen2(vehicle_dir: Path, prior: bool):
    if prior:
        textproto_file = vehicle_dir / "wayve-interfaces-protobuf-VehicleCalibration_default.textproto"
    
    else:
        textproto_file = vehicle_dir / "wayve-interfaces-protobuf-VehicleCalibration_.textproto"

    calib = load_v2_textproto(textproto_file)
    return calib

def is_gen1(vehicle_dir: Path) -> bool:
    calib_exists = (vehicle_dir / "bb" / "vehicle_calibration.textproto").exists()
    prior_calib_exists = (vehicle_dir / "bb" / "vehicle_calibration_default.textproto").exists()
    return calib_exists and prior_calib_exists

def is_gen2(vehicle_dir: Path) -> bool:
    calib_exists = (vehicle_dir / "wayve-interfaces-protobuf-VehicleCalibration_.textproto").exists()
    prior_calib_exists = (vehicle_dir / "wayve-interfaces-protobuf-VehicleCalibration_default.textproto").exists()
    return calib_exists and prior_calib_exists


def get_calibration(vehicle, prior=False):
    vehicle_dir = CALIBRATION_DIR / vehicle

    if is_gen1(vehicle_dir):
        calib = get_calibration_gen1(vehicle_dir, prior)
    elif is_gen2(vehicle_dir):
        calib = get_calibration_gen2(vehicle_dir, prior)
    else:
        print(f"Vehicle directory does not seem to be either gen1 or gen2 for {vehicle}")
        return None

    return calib

def align_serial_numbers(calib, calib_prior):
    # Check that the same sensor calibration entries exist
    calib_sensors = set(calib.sensor_calibrations.keys())
    calib_prior_sensors = set(calib_prior.sensor_calibrations.keys())
    if calib_sensors != calib_prior_sensors:
        print(f"Sensor calibration entries do not match calib {calib_sensors} vs calib_prior {calib_prior_sensors}")

    for sensor in calib_sensors:
        calib_sensor_serial = calib.sensor_calibrations[sensor].metadata.serial_number
        calib_prior_sensor_serial = calib_prior.sensor_calibrations[sensor].metadata.serial_number

        if calib_sensor_serial != calib_prior_sensor_serial:
            calib_prior.sensor_calibrations[sensor].metadata.serial_number = calib_sensor_serial
    
    return calib_prior

def align_camera_intrinsics(calib, calib_prior):
    calib_sensors = set(calib.sensor_calibrations.keys())

    for sensor in calib_sensors:
        if not calib.sensor_calibrations[sensor].HasField("camera_intrinsics"):
            continue

        if "radar" in sensor or "lidar" in sensor or "imu" in sensor or "rear" in sensor:
            continue
        
        calib_sensor_intrinsics = calib.sensor_calibrations[sensor].camera_intrinsics
        calib_prior_sensor_intrinsics = calib_prior.sensor_calibrations[sensor].camera_intrinsics

        if calib_sensor_intrinsics != calib_prior_sensor_intrinsics:
            calib_prior.sensor_calibrations[sensor].camera_intrinsics.CopyFrom(calib_sensor_intrinsics)
    
    return calib_prior

def _find_sensor_state_entry(calib, sensors_id):
    for sensor_state in calib.state:
        if set(sensor_state.sensors) == set(sensors_id):
            return sensor_state

    breakpoint()
    # raise ValueError(f"Could not find sensor state {sensors_id} in calibration")

def format_calib(calib, old_prior_path):
    try:
        old_prior = CalibrationState()
        Parse(Path(old_prior_path).read_text(), old_prior)
    except Exception:
        old_prior = load_v2_textproto(old_prior_path)

    if isinstance(old_prior, CalibrationState):
        state_order = [state.sensors for state in old_prior.state]
        calib = convert_to_v1(calib)
        cs = CalibrationState()

        for sensors_id in state_order:
            
            cs.state.append(_find_sensor_state_entry(calib, sensors_id))

        calib = cs

    elif isinstance(old_prior, VehicleCalibration):
        order = list(old_prior.sensor_calibrations.keys())
        breakpoint()
        vc = VehicleCalibration()
        vc.vehicle_intrinsics.CopyFrom(calib.vehicle_intrinsics)

        for sensor in order:
            vc.sensor_calibrations[sensor].CopyFrom(calib.sensor_calibrations[sensor])

        calib = vc

    else:
        raise ValueError("Unknown calibration type")
    
    return calib

    

def write_calib_prior(vehicle, calib_prior):
    vehicle_dir = CALIBRATION_DIR / vehicle

    if is_gen1(vehicle_dir):
        textproto_file = vehicle_dir / "bb" / "vehicle_calibration_default.textproto"
        
    elif is_gen2(vehicle_dir):
        textproto_file = vehicle_dir / "wayve-interfaces-protobuf-VehicleCalibration_default.textproto"

    else:
        print(f"Vehicle directory does not seem to be either gen1 or gen2 for {vehicle}")
        return None
    
    out_calib = format_calib(calib_prior, textproto_file)

    out_calib_str = MessageToString(out_calib, indent=1, use_short_repeated_primitives=True)

    breakpoint()

    with open(textproto_file, "w") as f:
        f.write(out_calib_str)

    print(f"Calibration prior written to {textproto_file}")

problematic_vehicles = []

for vehicle in VEHICLES:
    if vehicle in MAXUS:
        continue

    print(f"////////////////////////////////{vehicle}////////////////////////////////")

    calib = get_calibration(vehicle, prior=False)
    calib_prior = get_calibration(vehicle, prior=True)

    if calib is None or calib_prior is None:
        continue
    breakpoint()
    calib_prior = align_serial_numbers(calib, calib_prior)

    calib_prior = align_camera_intrinsics(calib, calib_prior)
    
    write_calib_prior(vehicle, calib_prior)

////////////////////////////////shire////////////////////////////////


Calibration prior written to /home/benjin/Development/wcs/WayveCode/infrastructure/robot/config/shire/wayve-interfaces-protobuf-VehicleCalibration_default.textproto


In [22]:
problematic_vehicles

[]