In [None]:
from zod import ZodDrives
import pandas as pd
import numpy as np
import json
import os

## 1. Load Zenseact Open Dataset

In [None]:
dataset_root = "../data/zod"
version = "mini"

In [None]:
zod_drives = ZodDrives(dataset_root=dataset_root, version=version)

In [None]:
zod_drives.get_all_ids()

In [None]:
seq = zod_drives["000011"]
vd = seq.vehicle_data

In [None]:
# Controls: pedals, steering, etc.
ctrl = vd.ego_vehicle_controls
df_ctrl = pd.DataFrame({
    "ctrl_acc_pedal": ctrl.acc_pedal,                            # 0–100
    "ctrl_brake_pedal_pressed": ctrl.brake_pedal_pressed,        # bool
    "ctrl_steering_angle": ctrl.steering_angle,                  # radians (counter-clockwise positive)
    "ctrl_steering_angle_rate": ctrl.steering_angle_rate,        # radians/s
    "ctrl_steering_wheel_torque": ctrl.steering_wheel_torque,    # Nm
    "ctrl_turn_indicator": ctrl.turn_indicator,                  # 0 off, 1 left, 2 right
}, index=pd.to_datetime(ctrl.timestamp, unit="ns", utc=True))

# Ego vehicle kinematics: velocities/accelerations, rates, etc.
evd = vd.ego_vehicle_data
df_ego = pd.DataFrame({
    "evd_lon_vel": evd.lon_vel,               # m/s
    "evd_lat_vel": evd.lat_vel,               # m/s
    "evd_lon_acc": evd.lon_acc,               # m/s^2
    "evd_lat_acc": evd.lat_acc,               # m/s^2
    "evd_roll_rate": evd.roll_rate,           # rad/s
    "evd_pitch_rate": evd.picth_rate,         # rad/s (spelled 'picth_rate' in package)
    "evd_body_height": evd.body_height,       # meters
    "evd_body_pitch": evd.body_pitch,         # radians
}, index=pd.to_datetime(evd.timestamp, unit="ns", utc=True))

# GNSS-like satellite info: position/speed/heading
sat = vd.satellite
df_sat = pd.DataFrame({
    "sat_latpos": sat.latpos,                     # nanodegrees
    "sat_lonpos": sat.lonpos,                     # nanodegrees
    "sat_speed": sat.speed,                       # m/s
    "sat_heading": sat.heading,                   # degrees
    "sat_altitude": sat.altitude,                 # meters
    "sat_nrof_satellites": sat.nrof_satellites,   # count
}, index=pd.to_datetime(sat.timstamp, unit="ns", utc=True))

In [None]:
# Resample to 1Hz (first value per second)
df_ctrl_1hz = df_ctrl.groupby(df_ctrl.index.floor('s')).first()
df_ego_1hz = df_ego.groupby(df_ego.index.floor('s')).first()
df_sat_1hz = df_sat.groupby(df_sat.index.floor('s')).first()

# Simple clean join of all tables
df_sensors = df_ctrl_1hz.join([df_ego_1hz, df_sat_1hz], how='outer').sort_index()

In [None]:
df_sensors.head()


In [None]:
# Calculate and print the duration of the drive (min and max timestamps)
min_time = df_sensors.index.min()
max_time = df_sensors.index.max()
print(f"Drive start time: {min_time}")
print(f"Drive end time:   {max_time}")
print(f"Drive duration:   {max_time - min_time}")

## 2. Map to VSS Signals

In [None]:
# Create the VSS-compliant dataframe
df_vss = pd.DataFrame(index=df_sensors.index)

# ============================================================================
# CONTROL SIGNALS - Vehicle.Chassis.*
# ============================================================================

# Accelerator pedal: direct mapping (already 0-100 percent)
df_vss['Vehicle.Chassis.Accelerator.PedalPosition'] = df_sensors['ctrl_acc_pedal']

# Brake pedal: boolean to percent (False=0, True=100)
df_vss['Vehicle.Chassis.Brake.PedalPosition'] = df_sensors['ctrl_brake_pedal_pressed'].astype(float) * 100

# Steering wheel angle: radians to degrees
df_vss['Vehicle.Chassis.SteeringWheel.Angle'] = np.degrees(df_sensors['ctrl_steering_angle'])

# Steering wheel angle rate: radians/s to degrees/s
# df_vss['Vehicle.Chassis.SteeringWheel.AngleRate'] = np.degrees(df_sensors['ctrl_steering_angle_rate'])

# ============================================================================
# MOTION MANAGEMENT - Vehicle.MotionManagement.Steering.SteeringWheel.*
# ============================================================================

# Steering wheel torque: direct mapping (Nm)
df_vss['Vehicle.MotionManagement.Steering.SteeringWheel.Torque'] = df_sensors['ctrl_steering_wheel_torque']

# ============================================================================
# TURN INDICATORS - Vehicle.Body.Lights.DirectionIndicator.*
# ============================================================================

# Turn indicator: split into two boolean signals
# 0 = off (both false), 1 = left (left true, right false), 2 = right (left false, right true)
df_vss['Vehicle.Body.Lights.DirectionIndicator.Left.IsSignaling'] = (df_sensors['ctrl_turn_indicator'] == 1)
df_vss['Vehicle.Body.Lights.DirectionIndicator.Right.IsSignaling'] = (df_sensors['ctrl_turn_indicator'] == 2)

# ============================================================================
# VEHICLE SPEED - Vehicle.Speed
# ============================================================================

# Calculate speed from longitudinal and lateral velocities, convert m/s to km/h
df_vss['Vehicle.Speed'] = np.sqrt(
    df_sensors['evd_lon_vel']**2 + df_sensors['evd_lat_vel']**2
) * 3.6  # m/s to km/h

# ============================================================================
# ACCELERATION - Vehicle.Acceleration.*
# ============================================================================

# Longitudinal acceleration: direct mapping (m/s²)
df_vss['Vehicle.Acceleration.Longitudinal'] = df_sensors['evd_lon_acc']

# Lateral acceleration: direct mapping (m/s²)
df_vss['Vehicle.Acceleration.Lateral'] = df_sensors['evd_lat_acc']

# ============================================================================
# ANGULAR VELOCITY - Vehicle.AngularVelocity.*
# ============================================================================

# Roll rate: radians/s to degrees/s
df_vss['Vehicle.AngularVelocity.Roll'] = np.degrees(df_sensors['evd_roll_rate'])

# Pitch rate: radians/s to degrees/s
df_vss['Vehicle.AngularVelocity.Pitch'] = np.degrees(df_sensors['evd_pitch_rate'])

# ============================================================================
# GNSS LOCATION - Vehicle.CurrentLocation.*
# ============================================================================

# Latitude: nanodegrees to degrees
df_vss['Vehicle.CurrentLocation.Latitude'] = df_sensors['sat_latpos'] * 1e-9

# Longitude: nanodegrees to degrees
df_vss['Vehicle.CurrentLocation.Longitude'] = df_sensors['sat_lonpos'] * 1e-9

# Altitude: direct mapping (meters)
df_vss['Vehicle.CurrentLocation.Altitude'] = df_sensors['sat_altitude']

# Heading: direct mapping (degrees)
df_vss['Vehicle.CurrentLocation.Heading'] = df_sensors['sat_heading']

# ============================================================================
# GNSS RECEIVER - Vehicle.CurrentLocation.GNSSReceiver.*
# ============================================================================

# GNSS ground speed: convert from m/s to km/h
# df_vss['Vehicle.CurrentLocation.GNSSReceiver.GroundSpeed'] = df_sensors['sat_speed'] * 3.6

# Satellite count: direct mapping (count)
# df_vss['Vehicle.CurrentLocation.GNSSReceiver.SatelliteCount'] = df_sensors['sat_nrof_satellites']

In [None]:
df_vss.head()

Generate tripdata.json

In [None]:
# Convert df_vss to VSS JSON format with row offset
EXPORT_OFFSET = 200  # How many initial rows to skip (to avoid exporting early data)

vss_signals = []

# Slice the dataframe after the specified offset
df_vss_export = df_vss.iloc[EXPORT_OFFSET:]

for column in df_vss_export.columns:
    # Create data points array for this signal
    data_points = []
    
    for timestamp, row in df_vss_export.iterrows():
        value = row[column]
        
        # Skip NaN values
        if pd.notna(value):
            # Convert timestamp to ISO 8601 format with 'Z' suffix (UTC)
            ts_str = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
            
            # Convert value to string
            # Handle boolean values specially
            if isinstance(value, (bool, np.bool_)):
                value_str = str(value).lower()  # "true" or "false"
            else:
                value_str = str(value)
            
            data_points.append({
                "ts": ts_str,
                "value": value_str
            })
    
    # Add signal object if it has data points
    if data_points:
        vss_signals.append({
            "path": column,
            "dp": data_points
        })

In [None]:
# Write to JSON file
output_file = 'results/zod_drive_000011_vss.json'

# Ensure the output directory exists
os.makedirs(os.path.dirname(output_file), exist_ok=True)

with open(output_file, 'w') as f:
    json.dump(vss_signals, f, indent=2)

## 3. Use Data

In [None]:
# If needed:
# %pip install ipyleaflet ipywidgets

from ipyleaflet import Map, Marker, basemaps
import ipywidgets as widgets
from IPython.display import display

# Use 1Hz-mapped df_sensors DataFrame for lat/lon (as it's already resampled)
# Assume columns 'sat_latpos' and 'sat_lonpos' in nanodegrees as per the context
df = df_sensors[["sat_latpos", "sat_lonpos"]].dropna().sort_index()
df = df[~df.index.duplicated(keep='first')]

# Convert nanodegrees to degrees
lat_deg = df["sat_latpos"] * 1e-9
lon_deg = df["sat_lonpos"] * 1e-9

coords = list(zip(lat_deg.to_numpy(), lon_deg.to_numpy()))
if not coords:
    raise ValueError("No satellite positions available.")

# Map + marker
m = Map(center=coords[0], zoom=16, basemap=basemaps.OpenStreetMap.Mapnik)
marker = Marker(location=coords[0])
m.add_layer(marker)
display(m)

# 1 Hz replay controls
play = widgets.Play(interval=1000, value=0, min=0, max=len(coords)-1, step=1, description="")
slider = widgets.IntSlider(value=0, min=0, max=len(coords)-1, step=1, description="t")

# Speed control dropdown
speed_selector = widgets.Dropdown(
    options=[('1x', 1), ('2x', 2), ('3x', 3), ('5x', 5), ('10x', 10)],
    value=1,
    description='Speed:'
)

# Stop button to reset animation
stop_button = widgets.Button(description="Stop", button_style='danger')

widgets.jslink((play, "value"), (slider, "value"))

def on_value_change(change):
    i = change["new"]
    if i < len(coords):
        marker.location = coords[i]
        m.center = coords[i]

def on_speed_change(change):
    speed = change["new"]
    play.interval = int(1000 / speed)

def on_stop_click(b):
    play.value = 0
    slider.value = 0
    marker.location = coords[0]
    m.center = coords[0]

slider.observe(on_value_change, names="value")
speed_selector.observe(on_speed_change, names="value")
stop_button.on_click(on_stop_click)

display(widgets.HBox([play, slider, speed_selector, stop_button]))

# Reset to beginning
play.value = 0
slider.value = 0
marker.location = coords[0]
m.center = coords[0]

print("Use the play button to start/stop animation, or the stop button to reset to beginning.")