# Motor Control Testing

Test notebook for the **DigitalTwinMotorDriver** and **Motor** device class.

Uses the new telescope-mcp architecture - no hardware required!

---

## Architecture Overview

```
Motor (device)          ← High-level async API
    ↓
MotorInstance           ← Runtime instance (position, status)
    ↓
MotorDriver             ← Factory for instances (DigitalTwinMotorDriver)
```

## Motor Configuration

| Axis | Range | Home | Steps/Degree |
|------|-------|------|--------------|
| ALT | 0° (zenith) → ~90° (horizon) | 0 (zenith) | ~19,167 |
| AZ | ±110,000 steps (~±50°) | 0 (center) | ~2,222 |

---
## 1. Setup

Import the new motor architecture from telescope-mcp.

In [18]:
# ============================================
# SETUP - Import motor architecture
# Run this after kernel restart to pick up code changes
# ============================================

# Force reimport of modules (useful during development)
import importlib
import telescope_mcp.drivers.motors.twin
import telescope_mcp.drivers.motors.types
importlib.reload(telescope_mcp.drivers.motors.types)
importlib.reload(telescope_mcp.drivers.motors.twin)

from telescope_mcp.drivers.motors import (
    DigitalTwinMotorDriver,
    DigitalTwinMotorConfig,
    DigitalTwinMotorInstance,
    MotorStatus,
    MotorInfo,
    MotorType,
)
from telescope_mcp.devices import Motor, MotorConfig
import asyncio

print("✓ Motor architecture imported")

✓ Motor architecture imported


In [6]:
# ============================================
# CREATE DIGITAL TWIN DRIVER
# Simulates ALT and AZ motors without hardware
# ============================================

# Configure the digital twin with telescope-specific parameters
config = DigitalTwinMotorConfig(
    # Position limits (matching real hardware)
    altitude_min_steps=0,       # Zenith (straight up)
    altitude_max_steps=140000,  # Horizon
    azimuth_min_steps=-110000,  # CCW limit
    azimuth_max_steps=110000,   # CW limit
    
    # Home positions
    altitude_home_steps=0,      # Home at zenith
    azimuth_home_steps=0,       # Home at center
    
    # Timing simulation
    slew_speed_steps_per_sec=5000.0,
    acceleration_time_sec=0.5,
    simulate_timing=True,
)

driver = DigitalTwinMotorDriver(config)
print(f"✓ DigitalTwinMotorDriver created")
print(f"  Config: {config}")

✓ DigitalTwinMotorDriver created
  Config: DigitalTwinMotorConfig(alt=[0, 140000], az=[-110000, 110000], timing=True)


---
## 2. Low-Level Driver API

Direct interaction with `DigitalTwinMotorInstance` - the protocol-compliant interface.

In [7]:
# ============================================
# GET MOTOR CONTROLLER INSTANCE
# ============================================

# List available controllers (always 1 for digital twin)
controllers = driver.get_available_controllers()
print(f"Available controllers: {controllers}")

# Open the controller
motor = driver.open()
print(f"\n✓ Motor controller opened")
print(f"  Info: {motor.get_info()}")

Available controllers: [{'id': 0, 'type': 'digital_twin', 'name': 'Digital Twin Motor Controller', 'port': 'simulated', 'description': 'Simulated Teensy + AMIS motor controller'}]

✓ Motor controller opened
  Info: {'type': 'digital_twin', 'name': 'Digital Twin Motor Controller', 'port': 'simulated', 'altitude_steps_per_degree': 1555.5555555555557, 'azimuth_steps_per_degree': 814.8148148148148}


In [8]:
# ============================================
# CHECK MOTOR STATUS
# ============================================

from telescope_mcp.drivers.motors import MotorType

def show_status(motor_type: MotorType) -> None:
    """Display motor status."""
    status = motor.get_status(motor_type)
    print(f"{motor_type.value}: pos={status.position_steps:>7}, "
          f"moving={status.is_moving}, "
          f"target={status.target_steps}")

show_status(MotorType.ALTITUDE)
show_status(MotorType.AZIMUTH)

altitude: pos=      0, moving=False, target=None
azimuth: pos=      0, moving=False, target=None


In [9]:
# ============================================
# ABSOLUTE MOVE (Low-Level)
# Move ALT to 45° (~half way to horizon)
# ============================================
import time

target_steps = 70000  # ~45° from zenith
print(f"Moving ALT to {target_steps} steps...")

motor.move(MotorType.ALTITUDE, target_steps)

show_status(MotorType.ALTITUDE)
print("✓ Move complete")

Moving ALT to 70000 steps...
altitude: pos=  70000, moving=False, target=None
✓ Move complete


In [10]:
# ============================================
# RELATIVE MOVE (Low-Level)
# Nudge ALT down by 5000 steps
# ============================================

print("Nudging ALT +5000 steps...")
motor.move_relative(MotorType.ALTITUDE, 5000)

show_status(MotorType.ALTITUDE)
print("✓ Nudge complete")

Nudging ALT +5000 steps...
altitude: pos=  75000, moving=False, target=None
✓ Nudge complete


In [11]:
# ============================================
# HOME MOTOR (Low-Level)
# Return ALT to zenith
# ============================================

print("Homing ALT...")
motor.home(MotorType.ALTITUDE)

show_status(MotorType.ALTITUDE)
print("✓ ALT at home (zenith)")

2026-01-01 22:01:13,844 - telescope_mcp.drivers.motors.twin - INFO - Homing motor | motor=altitude target=0


Homing ALT...


2026-01-01 22:01:29,345 - telescope_mcp.drivers.motors.twin - INFO - Motor homed | motor=altitude


altitude: pos=      0, moving=False, target=None
✓ ALT at home (zenith)


---
## 3. High-Level Motor Device API

The `Motor` class provides async operations with context manager support.

In [12]:
# ============================================
# CREATE MOTOR DEVICE
# High-level async wrapper with context manager
# ============================================

# Create a new driver for the device layer (fresh state)
device_driver = DigitalTwinMotorDriver(config)

# Create Motor device - accepts driver and optional config
motor_config = MotorConfig(
    controller_id=0,
    default_speed=100,
)

motor_device = Motor(driver=device_driver, config=motor_config)
print(f"✓ Motor device created")
print(f"  Available: {motor_device.get_available_controllers()}")

2026-01-01 22:01:33,339 - telescope_mcp.devices.motor - INFO - Motor device initialized


✓ Motor device created
  Available: [{'id': 0, 'type': 'digital_twin', 'name': 'Digital Twin Motor Controller', 'port': 'simulated', 'description': 'Simulated Teensy + AMIS motor controller'}]


In [None]:
# ============================================
# ASYNC MOTOR OPERATIONS
# ============================================

async def demo_async_motor():
    """Demonstrate async motor operations."""
    async with motor_device:  # Connects on enter, disconnects on exit
        print(f"Connected: {motor_device.connected}")
        print(f"Info: {motor_device.info}")
        
        # Move altitude to 30 degrees
        print("\nMoving ALT to 30° (~46666 steps)...")
        await motor_device.move_to(MotorType.ALTITUDE, steps=46666)
        status = motor_device.get_status(MotorType.ALTITUDE)
        print(f"ALT status: {status}")
        
        # Move by relative amount
        print("\nMoving ALT +5000 steps...")
        await motor_device.move_by(MotorType.ALTITUDE, steps=5000)
        status = motor_device.get_status(MotorType.ALTITUDE)
        print(f"ALT status: {status}")
        
        # Home altitude
        print("\nHoming ALT...")
        await motor_device.home(MotorType.ALTITUDE)
        status = motor_device.get_status(MotorType.ALTITUDE)
        print(f"ALT status: {status}")

await demo_async_motor()

---
## 4. Coordinated Movement

Move both axes simultaneously using async.

In [None]:
# ============================================
# COORDINATED ALT/AZ SLEW
# ============================================

async def slew_to_altaz(alt_steps: int, az_steps: int):
    """Slew telescope to ALT/AZ position.
    
    Moves both axes concurrently for faster slewing.
    """
    # Create fresh driver and device
    slew_driver = DigitalTwinMotorDriver(config)
    slew_device = Motor(driver=slew_driver)
    
    async with slew_device:
        print(f"Slewing to ALT={alt_steps}, AZ={az_steps}")
        
        # Start both moves concurrently
        await asyncio.gather(
            slew_device.move_to(MotorType.ALTITUDE, alt_steps),
            slew_device.move_to(MotorType.AZIMUTH, az_steps),
        )
        
        alt_status = slew_device.get_status(MotorType.ALTITUDE)
        az_status = slew_device.get_status(MotorType.AZIMUTH)
        
        print(f"✓ ALT: {alt_status.position_steps} steps")
        print(f"✓ AZ:  {az_status.position_steps} steps")

# Slew to ~45° altitude, ~20° azimuth
await slew_to_altaz(70000, 16000)

In [None]:
# ============================================
# HOME ALL AXES
# ============================================

async def home_all():
    """Return both axes to home positions."""
    home_driver = DigitalTwinMotorDriver(config)
    home_device = Motor(driver=home_driver)
    
    async with home_device:
        # First move to a non-home position
        await asyncio.gather(
            home_device.move_to(MotorType.ALTITUDE, 50000),
            home_device.move_to(MotorType.AZIMUTH, 30000),
        )
        print("Moved away from home")
        
        # Home all
        print("Homing all axes...")
        await home_device.home_all()
        
        alt_status = home_device.get_status(MotorType.ALTITUDE)
        az_status = home_device.get_status(MotorType.AZIMUTH)
        
        print(f"✓ ALT: {alt_status.position_steps} (home)")
        print(f"✓ AZ:  {az_status.position_steps} (home)")

await home_all()

---
## 5. Position Limits & Error Handling

The digital twin enforces position limits just like real hardware.

In [None]:
# ============================================
# TEST POSITION LIMITS
# ============================================

async def test_limits():
    """Test that position limits are enforced."""
    limit_driver = DigitalTwinMotorDriver(config)
    limit_device = Motor(driver=limit_driver)
    
    async with limit_device:
        # Try to exceed altitude max (140000)
        print("Testing ALT max limit (140000)...")
        try:
            await limit_device.move_to(MotorType.ALTITUDE, 200000)
            status = limit_device.get_status(MotorType.ALTITUDE)
            print(f"  Position: {status.position_steps}")
        except ValueError as e:
            print(f"  ✓ Limit enforced: {e}")
        
        # Try to go below altitude min (0)
        print("\nTesting ALT min limit (0)...")
        try:
            await limit_device.move_to(MotorType.ALTITUDE, -5000)
            status = limit_device.get_status(MotorType.ALTITUDE)
            print(f"  Position: {status.position_steps}")
        except ValueError as e:
            print(f"  ✓ Limit enforced: {e}")
        
        # Try to exceed azimuth range
        print("\nTesting AZ limits (±110000)...")
        try:
            await limit_device.move_to(MotorType.AZIMUTH, 150000)
            status = limit_device.get_status(MotorType.AZIMUTH)
            print(f"  Position: {status.position_steps}")
        except ValueError as e:
            print(f"  ✓ Limit enforced: {e}")

await test_limits()

---
## 6. DriverFactory Integration

Use `DriverFactory` for automatic driver selection based on config.

In [None]:
# ============================================
# DRIVER FACTORY
# Automatic driver/controller selection based on mode
# ============================================

from telescope_mcp.drivers.config import DriverFactory, DriverMode

# Create factory in DIGITAL_TWIN mode
factory = DriverFactory(mode=DriverMode.DIGITAL_TWIN)

# Create motor driver through factory
motor_driver = factory.create_motor_driver()
print(f"✓ Factory mode: {factory.mode.value}")
print(f"✓ Driver type: {type(motor_driver).__name__}")
print(f"  Available: {motor_driver.get_available_controllers()}")

In [None]:
# ============================================
# USE FACTORY-CREATED DRIVER WITH MOTOR DEVICE
# ============================================

async def demo_factory_motor():
    """Demonstrate Motor device with factory-created driver."""
    # Create device using factory driver
    factory_device = Motor(driver=motor_driver)
    
    async with factory_device:
        print(f"Connected via factory")
        print(f"  Type: {factory_device.info.type}")
        print(f"  Name: {factory_device.info.name}")
        
        # Quick movement test
        await factory_device.move_to(MotorType.ALTITUDE, 35000)
        status = factory_device.get_status(MotorType.ALTITUDE)
        print(f"  ALT position: {status.position_steps}")

await demo_factory_motor()

---
## 7. Fast Mode (No Timing Simulation)

Disable timing simulation for faster testing.

In [13]:
# ============================================
# FAST MODE - No timing simulation
# ============================================

fast_config = DigitalTwinMotorConfig(
    simulate_timing=False,  # Instant moves!
)

async def demo_fast_mode():
    """Demonstrate instant moves without timing simulation."""
    fast_driver = DigitalTwinMotorDriver(fast_config)
    fast_device = Motor(driver=fast_driver)
    
    import time
    
    async with fast_device:
        start = time.time()
        
        # Move across full range - should be instant
        await fast_device.move_to(MotorType.ALTITUDE, 140000)
        await fast_device.move_to(MotorType.ALTITUDE, 0)
        await fast_device.move_to(MotorType.AZIMUTH, 110000)
        await fast_device.move_to(MotorType.AZIMUTH, -110000)
        await fast_device.move_to(MotorType.AZIMUTH, 0)
        
        elapsed = time.time() - start
        print(f"✓ 5 full-range moves completed in {elapsed:.3f}s")
        print(f"  (With timing=True, this would take ~3+ minutes)")

await demo_fast_mode()

2026-01-01 22:03:53,464 - telescope_mcp.devices.motor - INFO - Motor device initialized
2026-01-01 22:03:53,465 - telescope_mcp.devices.motor - INFO - Connecting to motor controller | controller_id=0
2026-01-01 22:03:53,466 - telescope_mcp.devices.motor - INFO - Motor controller connected | type=digital_twin name="Digital Twin Motor Controller"
2026-01-01 22:03:53,467 - telescope_mcp.devices.motor - INFO - Moving motor | motor=altitude target_steps=140000 speed=100
2026-01-01 22:03:53,467 - telescope_mcp.devices.motor - INFO - Move complete | motor=altitude position=140000
2026-01-01 22:03:53,468 - telescope_mcp.devices.motor - INFO - Moving motor | motor=altitude target_steps=0 speed=100
2026-01-01 22:03:53,469 - telescope_mcp.devices.motor - INFO - Move complete | motor=altitude position=0
2026-01-01 22:03:53,469 - telescope_mcp.devices.motor - INFO - Moving motor | motor=azimuth target_steps=110000 speed=100
2026-01-01 22:03:53,470 - telescope_mcp.devices.motor - INFO - Move complet

✓ 5 full-range moves completed in 0.007s
  (With timing=True, this would take ~3+ minutes)


---
## 8. Protocol Compliance Verification

Verify that implementations satisfy the defined protocols.

In [14]:
# ============================================
# PROTOCOL COMPLIANCE
# Verify implementations satisfy protocols
# ============================================

from telescope_mcp.drivers.motors import MotorInstance, MotorDriver

# Create test instances
test_driver = DigitalTwinMotorDriver(fast_config)
test_instance = test_driver.open()

print("Protocol compliance checks:")
print(f"  MotorInstance: {isinstance(test_instance, MotorInstance)}")
print(f"  MotorDriver:   {isinstance(test_driver, MotorDriver)}")

# Verify required methods exist on instance
print("\nMotorInstance methods:")
instance_methods = ["get_info", "get_status", "move", "move_relative", "stop", "home"]
for method in instance_methods:
    has_method = hasattr(test_instance, method)
    print(f"  {method}(): {'✓' if has_method else '✗'}")

# Verify driver methods
print("\nMotorDriver methods:")
driver_methods = ["get_available_controllers", "open", "close"]
for method in driver_methods:
    has_method = hasattr(test_driver, method)
    print(f"  {method}(): {'✓' if has_method else '✗'}")

test_driver.close()

Protocol compliance checks:
  MotorInstance: True
  MotorDriver:   True

MotorInstance methods:
  get_info(): ✓
  get_status(): ✓
  move(): ✓
  move_relative(): ✓
  stop(): ✓
  home(): ✓

MotorDriver methods:
  get_available_controllers(): ✓
  open(): ✓
  close(): ✓


---
## 9. Azimuth Homing Sequence

Calibration procedure to find the full azimuth range of motion:

1. **Find CCW limit** - Move slowly CCW until stepper slips (stall detection)
2. **Back off** - Move CW 2-3 steps, mark as HOME (position 0)
3. **Traverse to CW limit** - Move CW until slip detected
4. **Back off** - Move CCW 2-3 steps, record total range

⚠️ **Hardware note:** Moving too hard into the end stop can damage the 3D printer belt mount. Use slow speed (20%) for limit detection.

In [None]:
# ============================================
# HOMING SEQUENCE - Data Classes
# ============================================

from dataclasses import dataclass
from typing import Callable

@dataclass
class HomingResult:
    """Result of homing sequence.
    
    Attributes:
        home_position: Position marked as 0 (after CCW limit backoff)
        ccw_limit: Raw CCW limit position found  
        cw_limit: Raw CW limit position found
        total_range: Total steps from home to CW limit
        backoff_steps: Steps backed off from limits
    """
    home_position: int
    ccw_limit: int
    cw_limit: int  
    total_range: int
    backoff_steps: int

print("✓ HomingResult class defined")

✓ Homing functions defined


In [21]:
# ============================================
# RUN HOMING SEQUENCE (with simulated magnetometer)
# ============================================

# Re-import to ensure consistent types after reload
from telescope_mcp.drivers.motors.types import MotorType as MT
from telescope_mcp.drivers.motors import DigitalTwinMotorDriver, DigitalTwinMotorConfig
from telescope_mcp.devices import Motor

# Magnetometer calibration callback
magnetometer_data: list[tuple[int, float, float, float]] = []

def start_magnetometer_calibration():
    """Start recording magnetometer data during traverse."""
    print("      [Magnetometer] Calibration recording started")
    magnetometer_data.clear()

async def run_homing_demo():
    """Demonstrate the homing sequence."""
    # Use fast config for demo (no timing delays)
    homing_config = DigitalTwinMotorConfig(
        simulate_timing=False,  # Fast for demo
        azimuth_min_steps=-110000,
        azimuth_max_steps=110000,
    )
    homing_driver = DigitalTwinMotorDriver(homing_config)
    homing_device = Motor(driver=homing_driver)
    
    async with homing_device:
        # Start from a random middle position
        homing_device._instance.set_position(MT.AZIMUTH, 25000)
        print(f"Starting position: {homing_device._instance.get_status(MT.AZIMUTH).position_steps}")
        
        # Run homing sequence (use MT for consistent enum reference)
        result = await home_azimuth_internal(
            homing_device,
            motor_type=MT,
            backoff_steps=300,
            homing_speed=20,
            step_size=500,
            on_traverse_start=start_magnetometer_calibration,
        )
        
        return result

async def home_azimuth_internal(device, motor_type, backoff_steps, homing_speed, step_size, on_traverse_start):
    """Internal homing with explicit motor type reference."""
    print("=" * 50)
    print("AZIMUTH HOMING SEQUENCE")
    print("=" * 50)
    
    instance = device._instance
    
    # Step 1: Find CCW limit
    print("\n[1/4] Finding CCW limit (moving slowly)...")
    ccw_limit = instance.move_until_stall(
        motor_type.AZIMUTH, 
        direction=-1,
        speed=homing_speed,
        step_size=step_size,
    )
    print(f"      CCW limit detected at {ccw_limit} steps")
    
    # Step 2: Back off from CCW limit and mark as home
    print(f"\n[2/4] Backing off {backoff_steps} steps from CCW limit...")
    instance.move_relative(motor_type.AZIMUTH, backoff_steps, speed=50)
    home_position = instance.get_status(motor_type.AZIMUTH).position_steps
    print(f"      HOME position: {home_position} steps")
    
    # Step 3: Traverse to CW limit
    print("\n[3/4] Traversing to CW limit...")
    if on_traverse_start:
        print("      Starting magnetometer calibration...")
        on_traverse_start()
    
    cw_limit = instance.move_until_stall(
        motor_type.AZIMUTH,
        direction=+1,
        speed=homing_speed,
        step_size=step_size,
    )
    print(f"      CW limit detected at {cw_limit} steps")
    
    # Step 4: Back off from CW limit
    print(f"\n[4/4] Backing off {backoff_steps} steps from CW limit...")
    instance.move_relative(motor_type.AZIMUTH, -backoff_steps, speed=50)
    final_cw = instance.get_status(motor_type.AZIMUTH).position_steps
    
    total_range = final_cw - home_position
    
    print("\n" + "=" * 50)
    print("HOMING COMPLETE")
    print("=" * 50)
    print(f"  CCW limit:    {ccw_limit} steps")
    print(f"  HOME:         {home_position} steps") 
    print(f"  CW limit:     {cw_limit} steps")
    print(f"  Usable range: {total_range} steps")
    
    return HomingResult(
        home_position=home_position,
        ccw_limit=ccw_limit,
        cw_limit=cw_limit,
        total_range=total_range,
        backoff_steps=backoff_steps,
    )

homing_result = await run_homing_demo()

2026-01-01 22:14:28,335 - telescope_mcp.devices.motor - INFO - Motor device initialized
2026-01-01 22:14:28,336 - telescope_mcp.devices.motor - INFO - Connecting to motor controller | controller_id=0
2026-01-01 22:14:28,336 - telescope_mcp.devices.motor - INFO - Motor controller connected | type=digital_twin name="Digital Twin Motor Controller"
2026-01-01 22:14:28,337 - telescope_mcp.drivers.motors.twin - INFO - Starting move_until_stall | motor=azimuth direction=CCW speed=20 step_size=500
2026-01-01 22:14:28,338 - telescope_mcp.drivers.motors.twin - INFO - Stall detected at limit | motor=azimuth position=-110000 limit=min
2026-01-01 22:14:28,339 - telescope_mcp.drivers.motors.twin - INFO - Starting move_until_stall | motor=azimuth direction=CW speed=20 step_size=500
2026-01-01 22:14:28,341 - telescope_mcp.drivers.motors.twin - INFO - Stall detected at limit | motor=azimuth position=110000 limit=max
2026-01-01 22:14:28,342 - telescope_mcp.devices.motor - INFO - Motor controller disconn

Starting position: 25000
AZIMUTH HOMING SEQUENCE

[1/4] Finding CCW limit (moving slowly)...
      CCW limit detected at -110000 steps

[2/4] Backing off 300 steps from CCW limit...
      HOME position: -109700 steps

[3/4] Traversing to CW limit...
      Starting magnetometer calibration...
      [Magnetometer] Calibration recording started
      CW limit detected at 110000 steps

[4/4] Backing off 300 steps from CW limit...

HOMING COMPLETE
  CCW limit:    -110000 steps
  HOME:         -109700 steps
  CW limit:     110000 steps
  Usable range: 219400 steps


In [22]:
# ============================================
# VERIFY HOMING RESULT
# ============================================

print("Homing Result Details:")
print(f"  home_position:  {homing_result.home_position}")
print(f"  ccw_limit:      {homing_result.ccw_limit}")
print(f"  cw_limit:       {homing_result.cw_limit}")
print(f"  total_range:    {homing_result.total_range}")
print(f"  backoff_steps:  {homing_result.backoff_steps}")

# Calculate degrees (using ~2444 steps/degree for azimuth)
steps_per_degree = 220000 / 90.0
degrees_range = homing_result.total_range / steps_per_degree
print(f"\n  Azimuth range: ~{degrees_range:.1f}°")

Homing Result Details:
  home_position:  -109700
  ccw_limit:      -110000
  cw_limit:       110000
  total_range:    219400
  backoff_steps:  300

  Azimuth range: ~89.8°


---
## Reference: Motor Physical Limits

### Altitude (ALT)
- **Range:** 0° (zenith) to ~90° (horizon)
- **Steps:** 0 - 140,000 (~1,556 steps/degree)
- **Home:** 0 steps (zenith)

### Azimuth (AZ)  
- **Range:** ±50° from center
- **Steps:** -110,000 to +110,000 (~2,444 steps/degree)
- **Home:** 0 steps (center)

### Architecture
```
MotorDriver (protocol)          ← Factory interface
    └── DigitalTwinMotorDriver  ← Test implementation
    └── SerialMotorDriver       ← Hardware (future)

MotorInstance (protocol)        ← Runtime interface  
    └── DigitalTwinMotorInstance

Motor (device class)            ← High-level async API
```