# Nimbus Connection Test

In [None]:
import sys
import logging
from pathlib import Path

from pylabrobot.liquid_handling.backends.hamilton.hamilton_tcp_backend import HamiltonTCPBase
from pylabrobot.liquid_handling.backends.hamilton.tcp_introspection import HamiltonIntrospection
from pylabrobot.liquid_handling.backends.hamilton.packets import Address
from pylabrobot.liquid_handling.backends.hamilton.commands import HamiltonCommand
from pylabrobot.liquid_handling.backends.hamilton.protocol import HamiltonProtocol
from pylabrobot.liquid_handling.backends.hamilton.hoi_params import HoiParams, HoiParamsParser

# plr_logger = logging.getLogger('pylabrobot')
# plr_logger.setLevel(logging.INFO)  # INFO for normal use, DEBUG for troubleshooting
#  Clear existing handlers and add console handler
# plr_logger.handlers.clear()
# console_handler = logging.StreamHandler(sys.stdout)
# console_handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
# plr_logger.addHandler(console_handler)


In [2]:
# Connect and setup
backend = HamiltonTCPBase(
    host="192.168.100.100",
    port=2000,
    read_timeout=30,
    write_timeout=30
)

await backend.setup()

print("\n" + "="*60)
print("SETUP COMPLETE")
print("="*60)
print(f"Client ID: {backend._client_id}")
print(f"Client Address: {backend.client_address}")
print(f"Root Objects: {backend._discovered_objects.get('root', [])}")



SETUP COMPLETE
Client ID: 15
Client Address: 2:15:65535
Root Objects: [Address(module=1, node=1, object=48896)]


In [None]:
# Get name of root object
introspection = HamiltonIntrospection(backend)
root_address = backend._discovered_objects['root'][0]

root_info = await introspection.get_object(root_address)

print(f"✓ Object at {root_address}:")
print(f"  Name: {root_info.name}")
print(f"  Version: {root_info.version}")
print(f"  Methods: {root_info.method_count}")
print(f"  Subobjects: {root_info.subobject_count}")

✓ Object at 1:1:48896:
  Name: NimbusCORE
  Version: 1.0
  Methods: 41
  Subobjects: 31


In [None]:
# Test subobject traversal with fixed implementation
introspection = HamiltonIntrospection(backend)

# Test with NimbusCORE
root_address = backend._discovered_objects['root'][0]
root_info = await introspection.get_object(root_address)

print(f"🔍 Testing subobject traversal on {root_info.name} ({root_info.subobject_count} subobjects)...")

# Try getting subobjects
for i in range(root_info.subobject_count):
    try:
        sub_addr = await introspection.get_subobject_address(root_address, i)
        sub_info = await introspection.get_object(sub_addr)
        print(f"  [{i}] {sub_addr}: {sub_info.name}")
    except Exception as e:
        print(f"  [{i}] Error: {type(e).__name__}: {str(e)[:50]}")

print("\n✅ Subobject traversal test complete!")

🔍 Testing subobject traversal on NimbusCORE (31 subobjects)...
  [0] 1:1:259: Ethernet
  [1] 1:1:263: BoanduzCAN
  [2] 1:1:768: IoNotification
  [3] 1:1:260: XDrive
  [4] 1:1:257: Pipette
  [5] 1:1:262: ChannelCoord
  [6] 1:1:261: XYCoord
  [7] 1:1:265: GripperXYCoord
  [8] 1:1:266: HDDeck
  [9] 1:1:258: Calibration
  [10] 1:1:48880: Service
  [11] 1:1:270: PipetteTeach
  [12] 1:1:271: GripperTeach
  [13] 1:1:269: GantryScanner
  [14] 1:1:384: Configuration
  [15] 1:1:49152: CPU
  [16] 1:1:49408: CPU
  [17] 1:1:272: Channel
  [18] 1:1:49409: CPU
  [19] 1:1:273: Channel
  [20] 1:1:49410: CPU
  [21] 1:1:274: Channel
  [22] 1:1:49411: CPU
  [23] 1:1:275: Channel
  [24] 1:1:264: Gripper
  [25] 1:1:268: DoorLock
  [26] 1:128:48896: LeftDoorLockUnit
  [27] 1:129:48896: RightDoorLockUnit
  [28] 1:96:48896: BarcodeScanner0
  [29] 1:32:48896: DAC0
  [30] 96:1:48896: IoBoard

✅ Subobject traversal test complete!


In [5]:
door_addr = Address(1, 1, 268)

print(f"🔒 DoorLock Methods at {door_addr}")
print("=" * 50)

methods = await introspection.get_all_methods(door_addr)
introspection.print_method_signatures(methods)

🔒 DoorLock Methods at 1:1:268
Method Signatures:
  LockDoor(void) -> void
    Interface: 1, Method ID: 1

  UnlockDoor(void) -> void
    Interface: 1, Method ID: 2

  IsDoorLocked(#) -> Locked
    Interface: 1, Method ID: 3

  ObjectInfo() -> name,version,methods,subobjects
    Interface: 0, Method ID: 1

  MethodInfo() -> method,interfaceid,action,actionid,name,parametertypes,parameternames
    Interface: 0, Method ID: 2

  SubObjectInfo() -> subobject,moduleID,nodeID,objectID
    Interface: 0, Method ID: 3

  InterfaceDescriptors(L) -> interfaceIds,interfaceDescriptors
    Interface: 0, Method ID: 4

  EnumInfo(L73L) -> interfaceId,enumerationNames,numberEnumerationValues,enumerationValues,enumerationValueDescriptions
    Interface: 0, Method ID: 5

  StructInfo(L7L) -> interfaceId,structNames,numberStructureElements,structureElementTypes,structureElementDescriptions
    Interface: 0, Method ID: 6



In [None]:
# DoorLock commands (via HamiltonCommand) + lock-if-unlocked
class LockDoor(HamiltonCommand):
    protocol = HamiltonProtocol.OBJECT_DISCOVERY
    interface_id = 1
    command_id = 1
    def build_parameters(self) -> HoiParams:
        return HoiParams()
    @classmethod
    def parse_response_parameters(cls, data: bytes) -> dict:
        return {"success": True}

class UnlockDoor(HamiltonCommand):
    protocol = HamiltonProtocol.OBJECT_DISCOVERY
    interface_id = 1
    command_id = 2
    def build_parameters(self) -> HoiParams:
        return HoiParams()
    @classmethod
    def parse_response_parameters(cls, data: bytes) -> dict:
        return {"success": True}

class IsDoorLocked(HamiltonCommand):
    protocol = HamiltonProtocol.OBJECT_DISCOVERY
    interface_id = 1
    command_id = 3
    action_code = 0 # Must be 0 (STATUS_REQUEST), default is 3 (COMMAND_REQUEST)
    def build_parameters(self) -> HoiParams:
        return HoiParams()
    @classmethod
    def parse_response_parameters(cls, data: bytes) -> dict:
        t, v = HoiParamsParser(data).parse_next()
        return {"locked": bool(v)}

door_addr = Address(1, 1, 268)

# Check and lock if needed
status = await backend.send_command(IsDoorLocked(door_addr))
print(f"Current: {'LOCKED' if status['locked'] else 'UNLOCKED'}")
if not status["locked"]:
    await backend.send_command(LockDoor(door_addr))
    status = await backend.send_command(IsDoorLocked(door_addr))
    print(f"After lock: {'LOCKED' if status['locked'] else 'UNLOCKED'}")
else:
    print("No action (already locked)")

Current: UNLOCKED
After lock: LOCKED


In [7]:
# Check and unlock if needed
status = await backend.send_command(IsDoorLocked(door_addr))
print(f"Current: {'LOCKED' if status['locked'] else 'UNLOCKED'}")
if status["locked"]:
    await backend.send_command(UnlockDoor(door_addr))
    status = await backend.send_command(IsDoorLocked(door_addr))
    print(f"After unlock: {'LOCKED' if status['locked'] else 'UNLOCKED'}")
else:
    print("No action (already unlocked)")

Current: LOCKED
After unlock: UNLOCKED


In [8]:
await backend.stop()

