# SO-101 Auto-Calibration via Load Sensing

Automatically calibrates each joint by slowly driving it into its hard stops and detecting the load spike.
No manual joint movement required — just position the arm and run the cells.

### How it works
1. For each joint (distal to proximal): reset calibration, slowly move toward one hard stop, detect load spike, then find the other hard stop
2. Compute homing offset so the center of the detected range maps to encoder position 2047
3. `wrist_roll` is continuous (no hard stops) — uses `set_half_turn_homings()` with full 0–4095 range

### Safety
- **Before starting**: position the arm **straight up (vertical)** with the gripper pointing at the ceiling
- Joints are calibrated distal-to-proximal (gripper first, shoulder_pan last) to avoid collisions
- Each joint returns to center before the next one moves
- Torque is capped at 10–20%, velocity is very slow, with 20s timeout per direction
- `Ctrl+C` or kernel interrupt will safely disable torque and restore settings

In [None]:
import time
from pathlib import Path

from lerobot.motors.feetech.feetech import FeetechMotorsBus
from lerobot.motors.motors_bus import Motor, MotorCalibration, MotorNormMode

# ── Per-joint safety defaults (tuned for STS3215 30kg/cm) ─────────────
JOINT_DEFAULTS = {
    "gripper":       {"max_torque": 200, "load_threshold": 150},
    "wrist_flex":    {"max_torque": 350, "load_threshold": 200},
    "elbow_flex":    {"max_torque": 400, "load_threshold": 250},
    "shoulder_lift": {"max_torque": 400, "load_threshold": 250},
    "shoulder_pan":  {"max_torque": 350, "load_threshold": 200},
}

# Safe calibration order: distal to proximal
CALIBRATION_ORDER = ["gripper", "wrist_flex", "elbow_flex", "shoulder_lift", "shoulder_pan"]


def _save_motor_settings(bus: FeetechMotorsBus, motor: str) -> dict:
    """Read current motor settings that we'll temporarily override."""
    return {
        "Max_Torque_Limit": bus.read("Max_Torque_Limit", motor, normalize=False),
        "Goal_Velocity": bus.read("Goal_Velocity", motor, normalize=False),
        "Acceleration": bus.read("Acceleration", motor, normalize=False),
        "P_Coefficient": bus.read("P_Coefficient", motor, normalize=False),
        "I_Coefficient": bus.read("I_Coefficient", motor, normalize=False),
    }


def _restore_motor_settings(bus: FeetechMotorsBus, motor: str, settings: dict) -> None:
    """Restore previously saved motor settings."""
    bus.disable_torque(motor)
    for reg, val in settings.items():
        bus.write(reg, motor, val, normalize=False)


def _find_hard_stop(
    bus: FeetechMotorsBus,
    motor: str,
    direction: int,
    baseline_load: float,
    load_threshold: int,
    step_size: int = 10,
    stall_count: int = 10,
    timeout_s: float = 30.0,
    verbose: bool = True,
) -> int:
    """Drive motor in `direction` (+1 or -1) until a hard stop is detected.

    Detection criteria (whichever comes first):
      - |load - baseline| exceeds threshold
      - Position stalls for `stall_count` consecutive reads
      - Timeout expires

    Returns the raw encoder position of the hard stop.
    """
    start_pos = bus.read("Present_Position", motor, normalize=False)
    goal = start_pos
    prev_pos = start_pos
    stalls = 0
    t0 = time.monotonic()
    stop_reason = "timeout"

    while time.monotonic() - t0 < timeout_s:
        goal += direction * step_size
        bus.write("Goal_Position", motor, goal, normalize=False)
        time.sleep(0.05)

        cur_pos = bus.read("Present_Position", motor, normalize=False)
        cur_load = bus.read("Present_Load", motor, normalize=False)

        # Check load threshold
        if abs(cur_load - baseline_load) > load_threshold:
            stop_reason = f"load={cur_load} (baseline={baseline_load:.0f}, threshold={load_threshold})"
            break

        # Check for stall (position not changing)
        if abs(cur_pos - prev_pos) < 2:
            stalls += 1
            if stalls >= stall_count:
                stop_reason = f"stall at pos={cur_pos}"
                break
        else:
            stalls = 0

        prev_pos = cur_pos

    final_pos = bus.read("Present_Position", motor, normalize=False)
    if verbose:
        dir_label = "+" if direction > 0 else "-"
        print(f"    [{dir_label}] stopped at {final_pos} ({stop_reason})")
    return final_pos


def auto_calibrate(
    bus: FeetechMotorsBus,
    joints: list[str] | None = None,
    continuous_joints: list[str] | None = None,
    step_size: int = 10,
    goal_velocity: int = 100,
    acceleration: int = 10,
    stall_count: int = 10,
    timeout_per_direction_s: float = 30.0,
    save_path: Path | str | None = None,
    verbose: bool = True,
) -> dict[str, MotorCalibration]:
    """Auto-calibrate SO-100/101 arm joints by finding hard stops via load sensing.

    Args:
        bus: Connected FeetechMotorsBus instance.
        joints: Joints to calibrate (default: CALIBRATION_ORDER).
        continuous_joints: Joints with no hard stops (default: ["wrist_roll"]).
        step_size: Encoder ticks per step (10 ticks ~ 0.88 degrees).
        goal_velocity: Motor velocity during calibration (0-1023, lower=slower).
        acceleration: Acceleration ramp (0-254, lower=gentler).
        stall_count: Consecutive stalled reads before declaring a hard stop.
        timeout_per_direction_s: Max seconds to search in each direction.
        save_path: Optional path to save calibration JSON.
        verbose: Print progress.

    Returns:
        dict mapping motor name to MotorCalibration.
    """
    if joints is None:
        joints = CALIBRATION_ORDER
    if continuous_joints is None:
        continuous_joints = ["wrist_roll"]

    calibration: dict[str, MotorCalibration] = {}

    # ── Handle continuous joints (wrist_roll) ─────────────────────────
    for motor in continuous_joints:
        if motor not in bus.motors:
            continue
        if verbose:
            print(f"\n{'='*50}")
            print(f"  {motor} (continuous — using half-turn homing)")
            print(f"{'='*50}")

        bus.disable_torque(motor)
        bus.reset_calibration([motor])
        homing_offsets = bus.set_half_turn_homings([motor])

        calibration[motor] = MotorCalibration(
            id=bus.motors[motor].id,
            drive_mode=0,
            homing_offset=homing_offsets[motor],
            range_min=0,
            range_max=4095,
        )
        if verbose:
            print(f"    homing_offset={homing_offsets[motor]}, range=0..4095")

    # ── Calibrate joints with hard stops ──────────────────────────────
    for motor in joints:
        if motor not in bus.motors:
            if verbose:
                print(f"\n  Skipping {motor} (not in bus)")
            continue

        defaults = JOINT_DEFAULTS.get(motor, {"max_torque": 350, "load_threshold": 200})
        max_torque = defaults["max_torque"]
        load_threshold = defaults["load_threshold"]

        if verbose:
            print(f"\n{'='*50}")
            print(f"  {motor} (torque={max_torque}, threshold={load_threshold})")
            print(f"{'='*50}")

        # Save original settings for this motor
        saved = _save_motor_settings(bus, motor)

        try:
            # 1. Reset calibration — set homing_offset=0 so we work in raw encoder space,
            #    and remove position limits so the motor can traverse the full range.
            bus.disable_torque(motor)
            bus.reset_calibration([motor])

            # 2. Read starting position and baseline load
            start_pos = bus.read("Present_Position", motor, normalize=False)
            # Average a few load readings for baseline (gravity compensation)
            loads = []
            for _ in range(10):
                loads.append(bus.read("Present_Load", motor, normalize=False))
                time.sleep(0.02)
            baseline_load = sum(loads) / len(loads)
            if verbose:
                print(f"    start_pos={start_pos}, baseline_load={baseline_load:.1f}")

            # 3. Configure motor parameters for calibration
            bus.write("Max_Torque_Limit", motor, max_torque, normalize=False)
            bus.write("Acceleration", motor, acceleration, normalize=False)
            bus.write("Goal_Velocity", motor, goal_velocity, normalize=False)

            # 4. Enable torque and set goal to current position
            bus.write("Goal_Position", motor, start_pos, normalize=False)
            bus.enable_torque(motor)
            time.sleep(0.3)  # Let motor settle at starting position

            # 5. Find hard stop in negative direction
            if verbose:
                print(f"    Searching negative direction...")
            limit_neg = _find_hard_stop(
                bus, motor, -1, baseline_load, load_threshold,
                step_size=step_size, stall_count=stall_count,
                timeout_s=timeout_per_direction_s, verbose=verbose,
            )

            # 6. Find hard stop in positive direction
            if verbose:
                print(f"    Searching positive direction...")
            limit_pos = _find_hard_stop(
                bus, motor, +1, baseline_load, load_threshold,
                step_size=step_size, stall_count=stall_count,
                timeout_s=timeout_per_direction_s, verbose=verbose,
            )

            # 7. Disable torque
            bus.disable_torque(motor)

            # 8. Compute calibration values
            range_min = min(limit_neg, limit_pos)
            range_max = max(limit_neg, limit_pos)
            center = (range_min + range_max) // 2
            homing_offset = center - 2047

            if verbose:
                total_range = range_max - range_min
                print(f"    range=[{range_min}, {range_max}] ({total_range} ticks), center={center}, homing_offset={homing_offset}")

            calibration[motor] = MotorCalibration(
                id=bus.motors[motor].id,
                drive_mode=0,
                homing_offset=homing_offset,
                range_min=range_min - homing_offset,
                range_max=range_max - homing_offset,
            )

            # 9. Move joint to center before proceeding (conservative settings still active)
            if verbose:
                print(f"    Moving to center ({center})...")
            bus.write("Goal_Position", motor, center, normalize=False)
            bus.enable_torque(motor)
            time.sleep(1.5)
            bus.disable_torque(motor)

            # 10. Restore original motor settings
            _restore_motor_settings(bus, motor, saved)

        except KeyboardInterrupt:
            print(f"\n    Interrupted! Disabling torque on {motor}...")
            bus.disable_torque(motor)
            _restore_motor_settings(bus, motor, saved)
            raise
        except Exception:
            bus.disable_torque(motor)
            _restore_motor_settings(bus, motor, saved)
            raise

    # ── Write calibration to motors ───────────────────────────────────
    if calibration:
        bus.disable_torque()
        bus.write_calibration(calibration)
        if verbose:
            print(f"\n{'='*50}")
            print("  Calibration written to motors")
            print(f"{'='*50}")
            for name, cal in calibration.items():
                print(f"    {name}: offset={cal.homing_offset}, range=[{cal.range_min}, {cal.range_max}]")

    # ── Optionally save to file ───────────────────────────────────────
    if save_path is not None:
        import json
        save_path = Path(save_path)
        save_path.parent.mkdir(parents=True, exist_ok=True)
        cal_dict = {
            name: {
                "id": c.id, "drive_mode": c.drive_mode,
                "homing_offset": c.homing_offset,
                "range_min": c.range_min, "range_max": c.range_max,
            }
            for name, c in calibration.items()
        }
        save_path.write_text(json.dumps(cal_dict, indent=2))
        if verbose:
            print(f"  Saved to {save_path}")

    return calibration

In [2]:
# ── Port detection (reused from teleop.ipynb) ─────────────────────────
import glob
import scservo_sdk as scs

VOLTAGE_ADDR = 62  # Present_Voltage, 1 byte, unit = 0.1V

ports = sorted(glob.glob("/dev/ttyACM*") + glob.glob("/dev/ttyUSB*"))
if not ports:
    print("No serial ports found! Are the arms plugged in?")

leader_port = None
follower_port = None

for port_path in ports:
    port = scs.PortHandler(port_path)
    if not port.openPort():
        continue
    port.setBaudRate(1000000)
    ph = scs.PacketHandler(0)
    data, comm, _ = ph.readTxRx(port, 1, VOLTAGE_ADDR, 1)
    port.closePort()

    if comm != 0:
        continue

    volts = data[0] / 10.0
    if volts > 8:
        label = "FOLLOWER (~12V)"
        follower_port = port_path
    else:
        label = "LEADER  (~5V)"
        leader_port = port_path
    print(f"  {port_path}: {volts:.1f}V  ->  {label}")

if leader_port and follower_port:
    print("Ports detected OK")
elif leader_port:
    print("Only leader detected")
elif follower_port:
    print("Only follower detected")
else:
    print("ERROR: No arms detected")

  /dev/ttyACM0: 12.4V  ->  FOLLOWER (~12V)
  /dev/ttyACM1: 5.2V  ->  LEADER  (~5V)
Ports detected OK


## Follower Calibration

**Before running**: position the follower arm **straight up (vertical)** with the gripper pointing at the ceiling.

In [4]:
assert follower_port, "No follower port detected — run port detection cell first"

norm_mode_body = MotorNormMode.RANGE_M100_100

follower_bus = FeetechMotorsBus(
    port=follower_port,
    motors={
        "shoulder_pan":  Motor(1, "sts3215", norm_mode_body),
        "shoulder_lift": Motor(2, "sts3215", norm_mode_body),
        "elbow_flex":    Motor(3, "sts3215", norm_mode_body),
        "wrist_flex":    Motor(4, "sts3215", norm_mode_body),
        "wrist_roll":    Motor(5, "sts3215", norm_mode_body),
        "gripper":       Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
    },
)
follower_bus.connect()
print(f"Connected to follower on {follower_port}")

follower_cal = auto_calibrate(follower_bus)

follower_bus.disconnect()
print("\nFollower disconnected.")

Connected to follower on /dev/ttyACM0

  wrist_roll (continuous — using half-turn homing)
    homing_offset=1838, range=0..4095

  gripper (torque=100, threshold=70)
    start_pos=3597, baseline_load=0.0
    Searching negative direction...
    [-] stopped at 3597 (stall at pos=3597)
    Searching positive direction...
    [+] stopped at 3597 (stall at pos=3597)
    range=[3597, 3597], center=3597, homing_offset=1550
    Moving to center (3597)...

  wrist_flex (torque=200, threshold=100)
    start_pos=8, baseline_load=0.0
    Searching negative direction...
    [-] stopped at 6 (encoder limit at pos=6)
    Searching positive direction...
    [+] stopped at 26 (load=-112 (baseline=0, threshold=100))
    range=[6, 26], center=16, homing_offset=-2031
    Moving to center (16)...

  elbow_flex (torque=200, threshold=120)
    start_pos=2403, baseline_load=0.0
    Searching negative direction...
    [-] stopped at 2403 (stall at pos=2403)
    Searching positive direction...
    [+] stopped a

## Leader Calibration

**Before running**: position the leader arm **straight up (vertical)** with the gripper pointing at the ceiling.

In [None]:
assert leader_port, "No leader port detected — run port detection cell first"

norm_mode_body = MotorNormMode.RANGE_M100_100

leader_bus = FeetechMotorsBus(
    port=leader_port,
    motors={
        "shoulder_pan":  Motor(1, "sts3215", norm_mode_body),
        "shoulder_lift": Motor(2, "sts3215", norm_mode_body),
        "elbow_flex":    Motor(3, "sts3215", norm_mode_body),
        "wrist_flex":    Motor(4, "sts3215", norm_mode_body),
        "wrist_roll":    Motor(5, "sts3215", norm_mode_body),
        "gripper":       Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
    },
)
leader_bus.connect()
print(f"Connected to leader on {leader_port}")

leader_cal = auto_calibrate(leader_bus)

leader_bus.disconnect()
print("\nLeader disconnected.")

In [None]:
# ── Verification: read back calibration and print summary ─────────────
print("=" * 60)
print("  Calibration Summary")
print("=" * 60)

for label, cal in [("Follower", follower_cal), ("Leader", leader_cal)]:
    print(f"\n  {label}:")
    print(f"    {'Joint':<15} {'ID':>3} {'Offset':>8} {'Min':>6} {'Max':>6} {'Range':>6}")
    print(f"    {'-'*15} {'-'*3} {'-'*8} {'-'*6} {'-'*6} {'-'*6}")
    for name, c in cal.items():
        rng = c.range_max - c.range_min
        print(f"    {name:<15} {c.id:>3} {c.homing_offset:>8} {c.range_min:>6} {c.range_max:>6} {rng:>6}")