diff --git a/README.md b/README.md index 175793d..87322cc 100644 --- a/README.md +++ b/README.md @@ -180,8 +180,8 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file - [Documentation](https://pymc-dev.github.io/pyMC_core/) - [Issues](https://github.com/pymc-dev/pyMC_Core/issues) - [Discussions](https://github.com/pymc-dev/pyMC_Core/discussions) -- [pyMC Discord](https://discord.gg/SMHkUDwf) -- [Meshcore Discord](https://discord.gg/fThwBrRc3Q) +- [pyMC Discord](https://discord.gg/3s8MMaSTzq) +- [Meshcore Discord](https://meshcore.gg/) --- diff --git a/docs/docs/companion.md b/docs/docs/companion.md index d838a65..a31262d 100644 --- a/docs/docs/companion.md +++ b/docs/docs/companion.md @@ -345,10 +345,27 @@ companion.set_flood_scope(key) companion.set_flood_region(None) ``` +For MeshCore v1.15 parity, the frame protocol also supports a persisted default flood scope: + +- `CMD_SET_DEFAULT_FLOOD_SCOPE` (`63`) stores `name(31)` + `key(16)` +- `CMD_GET_DEFAULT_FLOOD_SCOPE` (`64`) returns the stored value (or empty if unset) + +pyMC_core resolves effective flood scope as: + +1. transient key set by `CMD_SET_FLOOD_SCOPE` / `set_flood_scope()` +2. otherwise persisted default key (if configured) +3. otherwise unscoped flood + When a flood scope is active, all flood packets are tagged with a 16-bit transport code (HMAC-SHA256 derived) and sent as `ROUTE_TYPE_TRANSPORT_FLOOD`. Direct-routed packets are unaffected. +### Firmware v1.15 Note (nRF BLE DFU) + +MeshCore v1.15 may expose Nordic DFU service on the normal nRF companion BLE stack. +This does not change pyMC_core TCP frame protocol behavior, but BLE clients should not +assume only UART service is present on companion devices. + ### Cryptographic Signing ```python @@ -613,6 +630,17 @@ The frame server handles the following companion radio protocol commands: | `CMD_GET_STATS` | 56 | Get statistics | | `CMD_SET_AUTOADD_CONFIG` | 58 | Set auto-add configuration | | `CMD_GET_AUTOADD_CONFIG` | 59 | Get auto-add configuration | +| `CMD_SEND_CHANNEL_DATA` | 62 | Send binary channel datagram (`data_type + payload`) | +| `CMD_SET_DEFAULT_FLOOD_SCOPE` | 63 | Persist default flood scope (`name(31) + key(16)`) | +| `CMD_GET_DEFAULT_FLOOD_SCOPE` | 64 | Get persisted default flood scope | + +`CMD_SEND_CHANNEL_DATA` payload after command byte matches firmware (`MyMesh.cpp`): + +`[channel_idx][path_len][path...][data_type_le16][payload...]` + +- `path_len=0xFF` means flood (no path bytes present) +- otherwise `path_len` uses normal encoded path-length semantics (1/2/3-byte path hashes) +- `data_type=0` is reserved and rejected ### Push Notifications @@ -636,6 +664,19 @@ The frame server sends unsolicited push frames to the companion app when events | `PUSH_CODE_BINARY_RESPONSE` | 0x95 | Binary request response | | `PUSH_CODE_PATH_DISCOVERY_RESPONSE` | 0x96 | Path discovery response | +### Sync Message Responses + +`CMD_SYNC_NEXT_MESSAGE` can return: + +- `RESP_CODE_CONTACT_MSG_RECV` / `RESP_CODE_CONTACT_MSG_RECV_V3` for direct text messages +- `RESP_CODE_CHANNEL_MSG_RECV` / `RESP_CODE_CHANNEL_MSG_RECV_V3` for channel text messages +- `RESP_CODE_CHANNEL_DATA_RECV` (`27`) for channel binary datagrams +- `RESP_CODE_NO_MORE_MESSAGES` when the queue is empty + +`RESP_CODE_CHANNEL_DATA_RECV` payload layout: + +`[code=27][snr][0][0][channel_idx][path_len][data_type_le16][data_len][data...]` + ### Host-Callable Push Methods The frame server exposes methods for the host application to push data to the connected companion app: @@ -926,6 +967,8 @@ class NodePrefs: airtime_factor: float = 0.0 client_repeat: int = 0 # reported in CMD_DEVICE_QUERY device info frame (byte 80) path_hash_mode: int = 0 # 0=1-byte, 1=2-byte, 2=3-byte path hashes for flood packets (byte 81) + default_scope_name: str = "" # persisted default scope label (v1.15) + default_scope_key: bytes = b"" # persisted default scope key (16 bytes) ``` --- diff --git a/examples/common.py b/examples/common.py index 9db8ff5..a444557 100644 --- a/examples/common.py +++ b/examples/common.py @@ -39,9 +39,16 @@ def create_radio( """Create a radio instance with configuration for specified hardware. Args: - radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", - "kiss-tnc", "kiss-modem", or "ch341") - serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem") + radio_type: Type of radio hardware. Supported values: + "waveshare" — Waveshare SX1262 HAT (SPI) + "uconsole" — uConsole LoRa module (SPI) + "meshadv-mini" — MeshAdv Mini (SPI) + "kiss-tnc" — KISS TNC over serial + "kiss-modem" — MeshCore KISS modem over serial + "ch341" — SX1262 via CH341 USB-to-SPI adapter + "pymc_usb" — pymc_usb firmware over USB-CDC + "pymc_tcp" — pymc_usb firmware over Wi-Fi/TCP + serial_port: Serial port path. Used by "kiss-tnc", "kiss-modem", and "pymc_usb". Returns: Radio instance configured for the specified hardware @@ -157,6 +164,73 @@ def create_radio( ) return radio + # ── pymc_tcp (pymc_usb firmware over Wi-Fi/TCP) ───────── + if radio_type == "pymc_tcp": + from pymc_core.hardware.tcp_radio import TCPLoRaRadio + + logger.debug("Using TCP LoRa Radio (pymc_usb firmware over Wi-Fi)") + + tcp_config = { + "host": os.environ.get("PYMC_TCP_HOST", ""), + "port": int(os.environ.get("PYMC_TCP_PORT", 5055)), + "token": os.environ.get("PYMC_TCP_TOKEN", ""), + "connect_timeout": float( + os.environ.get("PYMC_TCP_CONNECT_TIMEOUT", 5.0) + ), + "frequency": int(os.environ.get("LORA_FREQ", 869618000)), + "bandwidth": int(os.environ.get("LORA_BW", 62500)), + "spreading_factor": int(os.environ.get("LORA_SF", 8)), + "coding_rate": int(os.environ.get("LORA_CR", 8)), + "tx_power": int(os.environ.get("LORA_POWER", 22)), + "sync_word": int(os.environ.get("LORA_SYNCWORD", "0x12"), 0), + "preamble_length": int(os.environ.get("LORA_PREAMBLE", 16)), + "lbt_enabled": True, + "lbt_max_attempts": 5, + } + + if not tcp_config["host"]: + raise ValueError( + "pymc_tcp radio requires PYMC_TCP_HOST env var — " + "modem hostname or LAN IP." + ) + + radio = TCPLoRaRadio(**tcp_config) + logger.info( + f"pymc_tcp radio created at {tcp_config['host']}:{tcp_config['port']}: " + f"{tcp_config['frequency']/1e6:.1f}MHz SF{tcp_config['spreading_factor']} " + f"BW{tcp_config['bandwidth']/1000:.0f}kHz {tcp_config['tx_power']}dBm" + ) + return radio + + # ── pymc_usb (pymc_usb firmware over USB-CDC) ─────────── + if radio_type == "pymc_usb": + from pymc_core.hardware.usb_radio import USBLoRaRadio + + logger.debug("Using USB LoRa Radio (pymc_usb firmware)") + + # Default: EU/UK (Narrow), Switzerland preset + usb_config = { + "port": serial_port, # e.g. /dev/ttyACM0 + "baudrate": 921600, + "frequency": int(os.environ.get("LORA_FREQ", 869618000)), + "bandwidth": int(os.environ.get("LORA_BW", 62500)), + "spreading_factor": int(os.environ.get("LORA_SF", 8)), + "coding_rate": int(os.environ.get("LORA_CR", 8)), + "tx_power": int(os.environ.get("LORA_POWER", 22)), + "sync_word": int(os.environ.get("LORA_SYNCWORD", "0x12"), 0), + "preamble_length": int(os.environ.get("LORA_PREAMBLE", 16)), + "lbt_enabled": True, + "lbt_max_attempts": 5, + } + + radio = USBLoRaRadio(**usb_config) + logger.info( + f"pymc_usb radio created on {serial_port}: " + f"{usb_config['frequency']/1e6:.1f}MHz SF{usb_config['spreading_factor']} " + f"BW{usb_config['bandwidth']/1000:.0f}kHz {usb_config['tx_power']}dBm" + ) + return radio + # Direct SX1262 radio for other types from pymc_core.hardware.sx1262_wrapper import SX1262Radio @@ -218,7 +292,8 @@ def create_radio( if radio_type not in configs: raise ValueError( f"Unknown radio type: {radio_type}. " - "Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', 'kiss-modem', or 'ch341'" + "Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', " + "'kiss-modem', 'ch341', 'pymc_usb', or 'pymc_tcp'" ) radio_kwargs = configs[radio_type] @@ -250,8 +325,9 @@ def create_mesh_node( Args: node_name: Name for the mesh node radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", - "kiss-tnc", "kiss-modem", or "ch341") - serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem") + "kiss-tnc", "kiss-modem", "ch341", "pymc_usb", or "pymc_tcp") + serial_port: Serial port for KISS devices or pymc_usb + (e.g. "/dev/ttyUSB0" for KISS, "/dev/ttyACM0" for pymc_usb) use_modem_identity: If True and radio_type is "kiss-modem", use the modem's cryptographic identity instead of generating a local one. This keeps the private key secure on the modem hardware. @@ -301,10 +377,11 @@ def create_mesh_node( logger.info("CH341 radio initialized successfully") print("CH341 USB adapter radio initialized") else: + # waveshare/uconsole/meshadv-mini/pymc_usb/pymc_tcp all use begin() logger.debug("Calling radio.begin()...") ok = radio.begin() if ok is False: - raise RuntimeError("SX1262 radio begin() returned False") + raise RuntimeError("Radio begin() returned False") logger.info("Radio initialized successfully") # Create identity - use modem identity if requested and available diff --git a/pyproject.toml b/pyproject.toml index b48fede..ffc5384 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pymc_core" -version = "1.0.10" +version = "1.0.11" authors = [ {name = "Lloyd Newton", email = "lloyd@rightup.co.uk"}, ] diff --git a/src/pymc_core/__init__.py b/src/pymc_core/__init__.py index f9e4d57..27efc05 100644 --- a/src/pymc_core/__init__.py +++ b/src/pymc_core/__init__.py @@ -3,7 +3,7 @@ Clean, simple API for building mesh network applications. """ -__version__ = "1.0.10" +__version__ = "1.0.11" # Core mesh functionality from .node.node import MeshNode diff --git a/src/pymc_core/companion/companion_base.py b/src/pymc_core/companion/companion_base.py index 3fa8b93..b98bd8c 100644 --- a/src/pymc_core/companion/companion_base.py +++ b/src/pymc_core/companion/companion_base.py @@ -9,6 +9,7 @@ import asyncio import copy +import hashlib import logging import random import struct @@ -30,6 +31,7 @@ MAX_PATH_SIZE, PAYLOAD_TYPE_ADVERT, PAYLOAD_TYPE_CONTROL, + PAYLOAD_TYPE_GRP_DATA, PH_ROUTE_MASK, PUB_KEY_SIZE, REQ_TYPE_GET_STATUS, @@ -39,6 +41,7 @@ ROUTE_TYPE_TRANSPORT_FLOOD, TELEM_PERM_BASE, ) +from ..protocol.crypto import CryptoUtils from ..protocol.packet_utils import PathUtils from ..protocol.transport_keys import calc_transport_code, get_auto_key_for from .channel_store import ChannelStore @@ -80,6 +83,7 @@ PUSH_CALLBACK_KEYS = [ "message_received", "channel_message_received", + "channel_data_received", "advert_received", "contact_path_updated", "send_confirmed", @@ -213,6 +217,10 @@ def _init_companion_stores( self._seen_txt: OrderedDict[str, float] = OrderedDict() self._seen_txt_ttl = 300 self._seen_txt_max = 1000 + # GRP_DATA dedup by packet hash so sync_next_message only returns one entry per packet. + self._seen_grp_data: OrderedDict[str, float] = OrderedDict() + self._seen_grp_data_ttl = 300 + self._seen_grp_data_max = 1000 # Allow subclasses to restore persisted preferences on startup. self._load_prefs() @@ -546,6 +554,34 @@ def set_flood_scope(self, transport_key: Optional[bytes] = None) -> None: else: self._flood_transport_key = None + def set_default_flood_scope( + self, + scope_name: Optional[str], + transport_key: Optional[bytes], + ) -> bool: + """Persist default flood scope (v1.15 companion protocol semantics).""" + if not scope_name or not transport_key or len(transport_key) < 16: + self.prefs.default_scope_name = "" + self.prefs.default_scope_key = b"" + self._save_prefs() + return True + normalized = scope_name[:30].strip() + if not normalized: + return False + self.prefs.default_scope_name = normalized + self.prefs.default_scope_key = bytes(transport_key[:16]) + self._save_prefs() + return True + + def get_default_flood_scope(self) -> Optional[tuple[str, bytes]]: + """Return (name, key) for persisted default scope, or None if unset.""" + name = (getattr(self.prefs, "default_scope_name", "") or "").strip() + key = getattr(self.prefs, "default_scope_key", b"") or b"" + key = bytes(key[:16]).ljust(16, b"\x00") if key else b"" + if not name or key == b"\x00" * 16: + return None + return (name, key) + def set_flood_region(self, region_name: Optional[str] = None) -> None: """Set flood scope from a region name (e.g., ``'#usa'``) or clear it. @@ -560,6 +596,15 @@ def set_flood_region(self, region_name: Optional[str] = None) -> None: else: self._flood_transport_key = None + def _resolve_flood_transport_key(self) -> Optional[bytes]: + """Resolve effective flood key: transient key first, then persisted default.""" + if self._flood_transport_key is not None: + return self._flood_transport_key + default_scope = self.get_default_flood_scope() + if default_scope is None: + return None + return default_scope[1] + def _apply_flood_scope(self, pkt: Packet) -> None: """Apply flood scope transport codes to a packet in-place. @@ -569,12 +614,13 @@ def _apply_flood_scope(self, pkt: Packet) -> None: Matches firmware ``sendFloodScoped()`` in ``BaseChatMesh.cpp``. """ - if self._flood_transport_key is None: + effective_key = self._resolve_flood_transport_key() + if effective_key is None: return route_type = pkt.get_route_type() if route_type != ROUTE_TYPE_FLOOD: return # only scope flood packets, not direct - code = calc_transport_code(self._flood_transport_key, pkt) + code = calc_transport_code(effective_key, pkt) pkt.transport_codes[0] = code pkt.transport_codes[1] = 0 # reserved for home region (firmware TODO) # Switch route type from FLOOD -> TRANSPORT_FLOOD @@ -749,6 +795,9 @@ def on_message_received(self, callback: Callable) -> None: def on_channel_message_received(self, callback: Callable) -> None: self._push_callbacks["channel_message_received"].append(callback) + def on_channel_data_received(self, callback: Callable) -> None: + self._push_callbacks["channel_data_received"].append(callback) + def on_advert_received(self, callback: Callable) -> None: self._push_callbacks["advert_received"].append(callback) @@ -1283,6 +1332,63 @@ async def send_channel_message(self, channel_idx: int, text: str) -> bool: self.stats.record_tx_error() return False + async def send_channel_data( + self, + channel_idx: int, + data_type: int, + payload: bytes, + *, + path: Optional[bytes] = None, + path_len_encoded: Optional[int] = None, + ) -> bool: + """Send a group binary datagram (PAYLOAD_TYPE_GRP_DATA).""" + channel = self.channels.get(channel_idx) + if not channel or data_type <= 0 or data_type > 0xFFFF: + return False + payload = bytes(payload or b"") + if len(payload) > 255: + return False + try: + secret_bytes = bytes(channel.secret or b"") + if len(secret_bytes) < 32: + secret_bytes = secret_bytes + b"\x00" * (32 - len(secret_bytes)) + else: + secret_bytes = secret_bytes[:32] + + hash_input = ( + secret_bytes[:16] + if len(secret_bytes) >= 32 and secret_bytes[16:32] == b"\x00" * 16 + else secret_bytes + ) + channel_hash = hashlib.sha256(hash_input).digest()[0] + plaintext = struct.pack(" None: rssi, ) + def _get_channel_candidates_by_hash(self, channel_hash: int) -> list[tuple[int, Channel]]: + """Return channel candidates that match the 1-byte channel hash.""" + matches: list[tuple[int, Channel]] = [] + max_channels = getattr(self.channels, "max_channels", 40) + for idx in range(max_channels): + channel = self.channels.get(idx) + if channel is None: + continue + secret = bytes(channel.secret or b"") + if len(secret) < 32: + secret = secret + b"\x00" * (32 - len(secret)) + else: + secret = secret[:32] + hash_input = secret[:16] if secret[16:32] == b"\x00" * 16 else secret + if hashlib.sha256(hash_input).digest()[0] == channel_hash: + matches.append((idx, channel)) + return matches + + async def _handle_group_data_packet(self, packet: Packet) -> None: + """Parse and queue incoming PAYLOAD_TYPE_GRP_DATA for sync_next_message.""" + payload = packet.get_payload() + if len(payload) < 4: + return + packet_hash = packet.calculate_packet_hash().hex().upper() + if self._check_dedup( + self._seen_grp_data, + packet_hash, + self._seen_grp_data_ttl, + self._seen_grp_data_max, + ): + return + + channel_hash = payload[0] + cipher_mac = payload[1:3] + ciphertext = payload[3:] + selected_idx: Optional[int] = None + plaintext: Optional[bytes] = None + + for idx, channel in self._get_channel_candidates_by_hash(channel_hash): + secret = bytes(channel.secret or b"") + if len(secret) < 32: + secret = secret + b"\x00" * (32 - len(secret)) + else: + secret = secret[:32] + try: + plaintext = CryptoUtils.mac_then_decrypt(hashlib.sha256(secret).digest(), secret, cipher_mac + ciphertext) + except Exception: + plaintext = None + if plaintext is not None: + selected_idx = idx + break + + if selected_idx is None or plaintext is None or len(plaintext) < 3: + return + data_type = struct.unpack_from(" None: for callback in self._push_callbacks.get(event_name, []): try: diff --git a/src/pymc_core/companion/companion_bridge.py b/src/pymc_core/companion/companion_bridge.py index fe9bda7..855ebe9 100644 --- a/src/pymc_core/companion/companion_bridge.py +++ b/src/pymc_core/companion/companion_bridge.py @@ -19,6 +19,7 @@ PAYLOAD_TYPE_ACK, PAYLOAD_TYPE_ADVERT, PAYLOAD_TYPE_ANON_REQ, + PAYLOAD_TYPE_GRP_DATA, PAYLOAD_TYPE_GRP_TXT, PAYLOAD_TYPE_PATH, PAYLOAD_TYPE_RAW_CUSTOM, @@ -304,6 +305,11 @@ async def process_received_packet(self, packet: Packet) -> None: await handler(packet) except Exception as e: logger.error(f"Handler error for type {ptype:02X}: {e}") + elif ptype == PAYLOAD_TYPE_GRP_DATA: + try: + await self._handle_group_data_packet(packet) + except Exception as e: + logger.error(f"Group data handler error: {e}") # NOTE: PATH packets are already delivered to protocol_response_handler # via PathHandler.__call__ (path.py), which runs as the handler above. diff --git a/src/pymc_core/companion/companion_radio.py b/src/pymc_core/companion/companion_radio.py index 2bc4625..81e4e55 100644 --- a/src/pymc_core/companion/companion_radio.py +++ b/src/pymc_core/companion/companion_radio.py @@ -15,7 +15,7 @@ from ..node.node import MeshNode from ..protocol import LocalIdentity, Packet -from ..protocol.constants import ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD +from ..protocol.constants import PAYLOAD_TYPE_GRP_DATA, ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD from .companion_base import CompanionBase from .constants import ( ADV_TYPE_CHAT, @@ -274,6 +274,8 @@ async def _on_packet_received(self, pkt: Any) -> None: route_type = pkt.get_route_type() is_flood = route_type in (ROUTE_TYPE_FLOOD, ROUTE_TYPE_TRANSPORT_FLOOD) self.stats.record_rx(is_flood=is_flood) + if pkt.get_payload_type() == PAYLOAD_TYPE_GRP_DATA: + await self._handle_group_data_packet(pkt) async def _on_raw_packet_rx_log(self, pkt: Any, data: bytes, analysis: Any) -> None: """Dispatcher raw-packet subscriber: fire rx_log_data(snr, rssi, raw_bytes).""" diff --git a/src/pymc_core/companion/constants.py b/src/pymc_core/companion/constants.py index f914f47..63cd292 100644 --- a/src/pymc_core/companion/constants.py +++ b/src/pymc_core/companion/constants.py @@ -97,7 +97,8 @@ class BinaryReqType(IntEnum): # Protocol version reported in RESP_CODE_DEVICE_INFO; phone uses 9+ to infer # CMD_SEND_ANON_REQ (owner requests, etc.) is supported. # 10+ provides support for multi-byte path lengths. -FIRMWARE_VER_CODE = 10 +# 11+ adds channel binary datagrams and default flood scope commands. +FIRMWARE_VER_CODE = 11 # --------------------------------------------------------------------------- # Commands (app -> radio) @@ -155,6 +156,9 @@ class BinaryReqType(IntEnum): CMD_SET_AUTOADD_CONFIG = 58 CMD_GET_AUTOADD_CONFIG = 59 CMD_SET_PATH_HASH_MODE = 61 +CMD_SEND_CHANNEL_DATA = 62 +CMD_SET_DEFAULT_FLOOD_SCOPE = 63 +CMD_GET_DEFAULT_FLOOD_SCOPE = 64 # --------------------------------------------------------------------------- # Response codes (radio -> app) @@ -185,6 +189,8 @@ class BinaryReqType(IntEnum): RESP_CODE_TUNING_PARAMS = 23 RESP_CODE_STATS = 24 RESP_CODE_AUTOADD_CONFIG = 25 +RESP_CODE_CHANNEL_DATA_RECV = 27 +RESP_CODE_DEFAULT_FLOOD_SCOPE = 28 # --------------------------------------------------------------------------- # Push codes (radio -> app, unsolicited) @@ -226,6 +232,9 @@ class BinaryReqType(IntEnum): # is set to this (e.g. BLEDevice::setMTU(MAX_FRAME_SIZE)). Frame = prefix(1) + len(2) + payload. MAX_FRAME_SIZE = 172 MAX_PAYLOAD_SIZE = MAX_FRAME_SIZE - 3 # max bytes after prefix + 2-byte length +# Firmware companion command parser uses MAX_FRAME_SIZE - 9 for channel binary payloads. +MAX_CHANNEL_DATA_LENGTH = MAX_FRAME_SIZE - 9 +OUT_PATH_UNKNOWN = 0xFF PUB_KEY_SIZE = 32 MAX_PATH_SIZE = 64 diff --git a/src/pymc_core/companion/frame_server.py b/src/pymc_core/companion/frame_server.py index 2b636ae..646bf3a 100644 --- a/src/pymc_core/companion/frame_server.py +++ b/src/pymc_core/companion/frame_server.py @@ -44,6 +44,7 @@ CMD_RESET_PATH, CMD_SEND_ANON_REQ, CMD_SEND_BINARY_REQ, + CMD_SEND_CHANNEL_DATA, CMD_SEND_CHANNEL_TXT_MSG, CMD_SEND_CONTROL_DATA, CMD_SEND_LOGIN, @@ -60,7 +61,9 @@ CMD_SET_CHANNEL, CMD_SET_CUSTOM_VAR, CMD_SET_DEVICE_TIME, + CMD_SET_DEFAULT_FLOOD_SCOPE, CMD_SET_FLOOD_SCOPE, + CMD_GET_DEFAULT_FLOOD_SCOPE, CMD_SET_OTHER_PARAMS, CMD_SET_PATH_HASH_MODE, CMD_SET_RADIO_PARAMS, @@ -77,8 +80,10 @@ FRAME_INBOUND_PREFIX, FRAME_OUTBOUND_PREFIX, MAX_FRAME_SIZE, + MAX_CHANNEL_DATA_LENGTH, MAX_PATH_SIZE, MAX_PAYLOAD_SIZE, + OUT_PATH_UNKNOWN, PUB_KEY_SIZE, PUSH_CODE_ADVERT, PUSH_CODE_BINARY_RESPONSE, @@ -100,7 +105,9 @@ RESP_CODE_ADVERT_PATH, RESP_CODE_AUTOADD_CONFIG, RESP_CODE_BATT_AND_STORAGE, + RESP_CODE_CHANNEL_DATA_RECV, RESP_CODE_CHANNEL_INFO, + RESP_CODE_DEFAULT_FLOOD_SCOPE, RESP_CODE_CHANNEL_MSG_RECV, RESP_CODE_CHANNEL_MSG_RECV_V3, RESP_CODE_CONTACT, @@ -222,6 +229,7 @@ def __init__( CMD_GET_CONTACT_BY_KEY: self._cmd_get_contact_by_key, CMD_SEND_TXT_MSG: self._cmd_send_txt_msg, CMD_SEND_CHANNEL_TXT_MSG: self._cmd_send_channel_txt_msg, + CMD_SEND_CHANNEL_DATA: self._cmd_send_channel_data, CMD_SYNC_NEXT_MESSAGE: self._cmd_sync_next_message, CMD_SEND_LOGIN: self._cmd_send_login, CMD_SEND_STATUS_REQ: self._cmd_send_status_req, @@ -244,6 +252,8 @@ def __init__( CMD_SEND_CONTROL_DATA: self._cmd_send_control_data, CMD_SEND_TRACE_PATH: self._cmd_send_trace_path, CMD_SET_FLOOD_SCOPE: self._cmd_set_flood_scope, + CMD_SET_DEFAULT_FLOOD_SCOPE: self._cmd_set_default_flood_scope, + CMD_GET_DEFAULT_FLOOD_SCOPE: self._cmd_get_default_flood_scope, CMD_GET_DEVICE_TIME: self._cmd_get_device_time, CMD_SET_DEVICE_TIME: self._cmd_set_device_time, CMD_SET_RADIO_PARAMS: self._cmd_set_radio_params, @@ -457,6 +467,32 @@ async def on_channel_message_received( await self._persist_companion_message(msg_dict) _write_push(bytes([PUSH_CODE_MSG_WAITING])) + async def on_channel_data_received( + channel_idx, + path_len, + data_type, + payload, + packet_hash=None, + snr=None, + rssi=None, + ): + msg_dict = { + "sender_key": b"", + "text": "", + "timestamp": 0, + "txt_type": 0, + "is_channel": True, + "channel_idx": channel_idx, + "path_len": path_len, + "packet_hash": packet_hash, + "snr": snr, + "rssi": rssi, + "channel_data_type": data_type, + "channel_data_payload": bytes(payload or b""), + } + await self._persist_companion_message(msg_dict) + _write_push(bytes([PUSH_CODE_MSG_WAITING])) + def on_binary_response(tag_bytes, response_data, parsed=None, request_type=None): frame = ( bytes([PUSH_CODE_BINARY_RESPONSE, 0]) @@ -505,6 +541,7 @@ def on_raw_data_received(payload_bytes: bytes, snr: float, rssi: int) -> None: self.bridge.on_message_received(on_message_received) self.bridge.on_channel_message_received(on_channel_message_received) + self.bridge.on_channel_data_received(on_channel_data_received) self.bridge.on_send_confirmed(on_send_confirmed) self.bridge.on_advert_received(on_advert_received) self.bridge.on_contact_path_updated(on_contact_path_updated) @@ -1105,6 +1142,52 @@ async def _cmd_send_channel_txt_msg(self, data: bytes) -> None: channel_idx, ) + async def _cmd_send_channel_data(self, data: bytes) -> None: + """Handle CMD_SEND_CHANNEL_DATA (62).""" + if len(data) < 4: + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + channel_idx = data[0] + path_len = data[1] + if self.bridge.get_channel(channel_idx) is None: + self._write_err(ERR_CODE_NOT_FOUND) + return + offset = 2 + path = b"" + if path_len != OUT_PATH_UNKNOWN: + if not PathUtils.is_valid_path_len(path_len): + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + path_byte_len = PathUtils.get_path_byte_len(path_len) + if len(data) < offset + path_byte_len + 2: + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + path = data[offset : offset + path_byte_len] + offset += path_byte_len + if len(data) < offset + 2: + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + data_type = int.from_bytes(data[offset : offset + 2], "little") + payload = data[offset + 2 :] + if data_type == 0 or len(payload) > MAX_CHANNEL_DATA_LENGTH: + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + send_channel_data = getattr(self.bridge, "send_channel_data", None) + if not send_channel_data: + self._write_err(ERR_CODE_UNSUPPORTED_CMD) + return + ok = await send_channel_data( + channel_idx, + data_type, + payload, + path=path if path_len != OUT_PATH_UNKNOWN else None, + path_len_encoded=path_len, + ) + if ok: + self._write_ok() + else: + self._write_err(ERR_CODE_TABLE_FULL) + async def _cmd_send_binary_req(self, data: bytes) -> None: if len(data) < 33: self._write_err(ERR_CODE_ILLEGAL_ARG) @@ -1264,6 +1347,24 @@ def _build_message_frame(self, msg: "QueuedMessage") -> bytes: snr_byte += 256 if msg.is_channel: path_len_byte = msg.path_len if msg.path_len < 256 else 0xFF + if getattr(msg, "channel_data_type", 0): + payload = bytes(getattr(msg, "channel_data_payload", b"") or b"") + payload = payload[:MAX_CHANNEL_DATA_LENGTH] + return ( + bytes( + [ + RESP_CODE_CHANNEL_DATA_RECV, + snr_byte & 0xFF, + 0, + 0, + msg.channel_idx, + path_len_byte, + ] + ) + + struct.pack("= 3: @@ -1667,6 +1768,34 @@ async def _cmd_set_flood_scope(self, data: bytes) -> None: self.bridge.set_flood_scope(None) self._write_ok() + async def _cmd_set_default_flood_scope(self, data: bytes) -> None: + """Handle CMD_SET_DEFAULT_FLOOD_SCOPE (63).""" + if len(data) < 31 + 16: + self.bridge.set_default_flood_scope(None, None) + self._write_ok() + return + name_raw = data[:31] + key = data[31 : 31 + 16] + scope_name = name_raw.split(b"\x00", 1)[0].decode("utf-8", errors="replace").strip() + if not scope_name or len(scope_name) >= 31: + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + if not self.bridge.set_default_flood_scope(scope_name, key): + self._write_err(ERR_CODE_ILLEGAL_ARG) + return + self._write_ok() + + async def _cmd_get_default_flood_scope(self, data: bytes) -> None: + """Handle CMD_GET_DEFAULT_FLOOD_SCOPE (64).""" + scope = self.bridge.get_default_flood_scope() + if scope is None: + self._write_frame(bytes([RESP_CODE_DEFAULT_FLOOD_SCOPE])) + return + name, key = scope + name_field = name.encode("utf-8", errors="replace")[:30].ljust(31, b"\x00") + key_field = bytes(key[:16]).ljust(16, b"\x00") + self._write_frame(bytes([RESP_CODE_DEFAULT_FLOOD_SCOPE]) + name_field + key_field) + # ------------------------------------------------------------------------- # Time, radio, tuning, share/export, logout, custom vars, autoadd # ------------------------------------------------------------------------- @@ -1728,12 +1857,48 @@ async def _cmd_share_contact(self, data: bytes) -> None: async def _cmd_export_contact(self, data: bytes) -> None: if len(data) < PUB_KEY_SIZE: raw = self.bridge.export_contact(None) + if raw is None: + self._write_err(ERR_CODE_NOT_FOUND) + return + pubkey = raw[:32] + name_raw = raw[33:65].split(b"\x00")[0] + lat, lon = struct.unpack_from(" None: """Export private/signing key as 64-byte MeshCore format (RESP_CODE_PRIVATE_KEY + 64 bytes). diff --git a/src/pymc_core/companion/models.py b/src/pymc_core/companion/models.py index 2aa28f0..2b81dba 100644 --- a/src/pymc_core/companion/models.py +++ b/src/pymc_core/companion/models.py @@ -142,6 +142,8 @@ class NodePrefs: # Reported in CMD_DEVICE_QUERY device info frame (byte 80). client_repeat: int = 0 path_hash_mode: int = 0 # 0=1-byte, 1=2-byte, 2=3-byte hashes + default_scope_name: str = "" + default_scope_key: bytes = b"" @dataclass @@ -189,3 +191,5 @@ class QueuedMessage: path_len: int = 0 snr: float = 0.0 rssi: int = 0 + channel_data_type: int = 0 + channel_data_payload: bytes = b"" diff --git a/src/pymc_core/hardware/__init__.py b/src/pymc_core/hardware/__init__.py index c865506..3b9c1ad 100644 --- a/src/pymc_core/hardware/__init__.py +++ b/src/pymc_core/hardware/__init__.py @@ -40,6 +40,24 @@ _KISS_MODEM_AVAILABLE = False KissModemWrapper = None +# Conditional import for USBLoRaRadio (requires pyserial) +try: + from .usb_radio import USBLoRaRadio + + _USB_AVAILABLE = True +except ImportError: + _USB_AVAILABLE = False + USBLoRaRadio = None + +# Conditional import for TCPLoRaRadio (stdlib only — socket/threading/asyncio) +try: + from .tcp_radio import TCPLoRaRadio + + _TCP_AVAILABLE = True +except ImportError: + _TCP_AVAILABLE = False + TCPLoRaRadio = None + __all__ = ["LoRaRadio"] # Add WsRadio to exports if available @@ -57,3 +75,11 @@ # Add KissModemWrapper to exports if available if _KISS_MODEM_AVAILABLE: __all__.append("KissModemWrapper") + +# Add USBLoRaRadio to exports if available +if _USB_AVAILABLE: + __all__.append("USBLoRaRadio") + +# Add TCPLoRaRadio to exports if available +if _TCP_AVAILABLE: + __all__.append("TCPLoRaRadio") diff --git a/src/pymc_core/hardware/protocol_constants.py b/src/pymc_core/hardware/protocol_constants.py new file mode 100644 index 0000000..74d53ce --- /dev/null +++ b/src/pymc_core/hardware/protocol_constants.py @@ -0,0 +1,152 @@ +""" +Wire-protocol constants and framing helpers shared by USBLoRaRadio and +TCPLoRaRadio. + +Mirrors the on-the-wire format defined in the pymc_usb firmware +(`firmware/include/protocol.h`). Keep this file and the firmware header +in sync — every command code, struct layout, CRC vector and sync byte +must match bit-for-bit, otherwise both drivers will silently lose frames. + +Frame format: + + SYNC (0xAA) | CMD (1B) | LEN (2B LE) | PAYLOAD (0..N B) | CRC-16/CCITT (2B LE) + +CRC is computed over `CMD + LEN + PAYLOAD` (the SYNC byte is excluded); +polynomial 0x1021, initial 0xFFFF, no reflect, no xor-out (CRC-16/CCITT-FALSE). +""" + +import struct + +# ─── Framing ───────────────────────────────────────────────────────── +PROTO_SYNC = 0xAA + +# Maximum LoRa payload size accepted by the firmware (single-byte LEN +# optimisation in the modem's RX buffer). The wire LEN field itself is +# u16, so the framing helpers below handle anything that fits in 65535 B. +MAX_LORA_PAYLOAD = 255 + +# ─── Host → Modem ──────────────────────────────────────────────────── +CMD_TX_REQUEST = 0x01 +CMD_SET_CONFIG = 0x10 +CMD_GET_CONFIG = 0x11 +CMD_STATUS_REQ = 0x20 +CMD_NOISE_REQ = 0x22 +CMD_CAD_REQUEST = 0x30 +CMD_RX_START = 0x31 +CMD_SET_CAD_PARAMS = 0x34 # v0.5.4 +CMD_SET_WIFI = 0x41 # v0.5 +CMD_AUTH = 0x50 +CMD_WIFI_RESET = 0x60 +CMD_GET_WIFI = 0x61 # v0.5 +CMD_GET_VERSION = 0x70 # v0.5.3 +CMD_PING = 0xFF + +# ─── Modem → Host ──────────────────────────────────────────────────── +CMD_TX_DONE = 0x02 +CMD_TX_FAIL = 0x03 +CMD_RX_PACKET = 0x04 +CMD_CONFIG_RESP = 0x12 +CMD_STATUS_RESP = 0x21 +CMD_NOISE_RESP = 0x23 +CMD_CAD_RESP = 0x32 +CMD_RX_STARTED = 0x33 +CMD_CAD_PARAMS_RESP = 0x35 # v0.5.4 +CMD_AUTH_OK = 0x51 +CMD_WIFI_STATUS = 0x62 # v0.5 +CMD_VERSION_RESP = 0x71 # v0.5.3 +CMD_ERROR = 0xFE +CMD_PONG = 0xFF + +# ─── Error codes (CMD_ERROR payload[0]) ────────────────────────────── +ERR_CRC_MISMATCH = 0x01 +ERR_INVALID_CMD = 0x02 +ERR_RADIO_BUSY = 0x03 +ERR_TX_TIMEOUT = 0x04 +ERR_PAYLOAD_TOO_BIG = 0x05 +ERR_INVALID_CONFIG = 0x06 +ERR_CAD_FAILED = 0x07 +ERR_RADIO_INIT = 0x08 +ERR_UNAUTHORIZED = 0x09 + +# ─── WIFI_STATUS mode codes ────────────────────────────────────────── +# Matches firmware main.cpp::buildWifiStatusPayload +WIFI_MODE_OFFLINE = 0 +WIFI_MODE_STA_CONNECTING = 1 +WIFI_MODE_STA_CONNECTED = 2 +WIFI_MODE_AP_CONFIG = 3 + +# ─── Packed struct layouts ─────────────────────────────────────────── +# RadioConfig (14 B): freq_hz(u32) | bandwidth_hz(u32) | sf(u8) | cr(u8) +# | power_dbm(i8) | syncword(u16) | preamble_len(u8) +RADIO_CONFIG_FMT = " int: + """CRC-16/CCITT-FALSE. Matches the firmware reference implementation.""" + crc = 0xFFFF + for byte in data: + crc ^= byte << 8 + for _ in range(8): + if crc & 0x8000: + crc = (crc << 1) ^ 0x1021 + else: + crc <<= 1 + crc &= 0xFFFF + return crc + + +def build_frame(cmd: int, payload: bytes = b"") -> bytes: + """Build a wire frame: SYNC | CMD | LEN | PAYLOAD | CRC. + + Raises ValueError if the payload does not fit in the 16-bit LEN + field. The firmware caps individual LoRa payloads at + MAX_LORA_PAYLOAD, but the framing itself accepts anything up to + 0xFFFF — exceeding that is a programming error in the caller. + """ + length = len(payload) + if length > 0xFFFF: + raise ValueError( + f"payload length {length} exceeds 16-bit LEN field " + f"(max 65535; firmware caps at MAX_LORA_PAYLOAD={MAX_LORA_PAYLOAD})" + ) + hdr = struct.pack(" list[int]: + normalized_pins = [] + + if en_pins: + normalized_pins.extend(en_pins) + elif en_pin != -1: + normalized_pins.append(en_pin) + + deduped_pins = [] + for pin in normalized_pins: + if pin == -1 or pin in deduped_pins: + continue + deduped_pins.append(pin) + + return deduped_pins + def _get_rx_irq_mask(self) -> int: """Get the standard RX interrupt mask""" return ( @@ -235,9 +259,9 @@ def _safe_radio_operation( def _basic_radio_setup(self, use_busy_check: bool = False) -> bool: """Common radio setup: reset, standby, and LoRa packet type""" self.lora.reset() - time.sleep(self.RADIO_TIMING_DELAY) # Give hardware time to complete reset + time.sleep(self._RADIO_TIMING_DELAY) # Give hardware time to complete reset self.lora.setStandby(self.lora.STANDBY_RC) - time.sleep(self.RADIO_TIMING_DELAY) # Give hardware time to enter standby mode + time.sleep(self._RADIO_TIMING_DELAY) # Give hardware time to enter standby mode # Check if standby mode was set correctly (different methods for different boards) if use_busy_check: @@ -614,13 +638,16 @@ def begin(self) -> bool: else: logger.warning(f"Could not setup RX LED pin {self.rxled_pin}") - # Setup EN pin if specified (powers up the radio when goes HIGH) - if self.en_pin != -1 and not self._en_pin_setup: - if self._gpio_manager.setup_output_pin(self.en_pin, initial_value=True): - self._en_pin_setup = True - logger.debug(f"EN pin {self.en_pin} configured and set HIGH") - else: - logger.warning(f"Could not setup EN pin {self.en_pin}") + # Setup EN pin(s) if specified (powers up the radio when set HIGH) + if self.en_pins and not self._en_pins_setup: + all_en_pins_configured = True + for en_pin in self.en_pins: + if self._gpio_manager.setup_output_pin(en_pin, initial_value=True): + logger.debug(f"EN pin {en_pin} configured and set HIGH") + else: + all_en_pins_configured = False + logger.warning(f"Could not setup EN pin {en_pin}") + self._en_pins_setup = all_en_pins_configured # Basic radio setup if not self._basic_radio_setup(use_busy_check=True): @@ -687,9 +714,9 @@ def begin(self) -> bool: logger.debug("Image calibration for 902-928MHz band") self.lora.calibrateImage(calFreqMin, calFreqMax) - time.sleep(0.01) # Allow calibration to settle self.lora.setDio2RfSwitch(self.use_dio2_rf) + time.sleep(self._RADIO_TIMING_DELAY) if self.use_dio2_rf: logger.info("DIO2 RF switch control enabled") @@ -753,6 +780,7 @@ def begin(self) -> bool: logger.warning(f"Failed to write CAD thresholds: {e}") self.lora.request(self.lora.RX_CONTINUOUS) + time.sleep(self._RADIO_TIMING_DELAY) self._initialized = True logger.info("SX1262 radio initialized successfully") diff --git a/src/pymc_core/hardware/tcp_radio.py b/src/pymc_core/hardware/tcp_radio.py new file mode 100644 index 0000000..71bbd6a --- /dev/null +++ b/src/pymc_core/hardware/tcp_radio.py @@ -0,0 +1,1060 @@ +""" +TCP LoRa Radio Driver for pymc_core + +Implements the LoRaRadio interface using any board running the pymc_usb +firmware in Wi-Fi/TCP mode. Same binary protocol as USBLoRaRadio — only +the transport differs. + +Drop-in replacement for SX1262Radio in pymc_core's hardware layer. + +Default sync word is 0x12 (MeshCore private syncword), matching the +firmware default — change only if the deployment uses a custom value. + +Usage: + from pymc_core.hardware.tcp_radio import TCPLoRaRadio + + radio = TCPLoRaRadio( + host="192.168.1.50", + port=5055, + token="", # empty = no auth (matches firmware NVS) + frequency=869618000, + bandwidth=62500, + spreading_factor=8, + coding_rate=8, + tx_power=22, + sync_word=0x12, + preamble_length=16, + ) + radio.begin() + radio.set_rx_callback(my_callback) + await radio.send(packet_bytes) +""" + +import asyncio +import logging +import random +import socket +import struct +import threading +import time +from typing import Callable, Optional + +from .protocol_constants import ( + PROTO_SYNC, + MAX_LORA_PAYLOAD, + CMD_TX_REQUEST, CMD_SET_CONFIG, + CMD_STATUS_REQ, CMD_NOISE_REQ, + CMD_CAD_REQUEST, CMD_RX_START, CMD_SET_CAD_PARAMS, + CMD_AUTH, CMD_PING, + CMD_TX_DONE, CMD_TX_FAIL, CMD_RX_PACKET, + CMD_CONFIG_RESP, CMD_STATUS_RESP, CMD_NOISE_RESP, + CMD_CAD_RESP, CMD_RX_STARTED, CMD_CAD_PARAMS_RESP, + CMD_AUTH_OK, CMD_ERROR, CMD_PONG, + ERR_UNAUTHORIZED, + RADIO_CONFIG_FMT, + STATUS_RESP_FMT, STATUS_RESP_SIZE, + crc16_ccitt, build_frame, +) + +logger = logging.getLogger("TCPLoRaRadio") + + +# Import LoRaRadio base conditionally — allows standalone testing +try: + from pymc_core.hardware.base import LoRaRadio + _HAS_BASE = True +except ImportError: + _HAS_BASE = False + +if _HAS_BASE: + class _RadioBase(LoRaRadio): + pass +else: + class _RadioBase: + pass + + +class TCPLoRaRadio(_RadioBase): + """TCP LoRa Radio — pymc_core LoRaRadio interface over Wi-Fi/TCP. + + Communicates with any board running the pymc_usb firmware in Wi-Fi + STA mode. Provides the same interface as SX1262Radio and USBLoRaRadio + for transparent integration with pymc_core. + """ + + def __init__( + self, + host: str, + port: int = 5055, + token: str = "", + frequency: int = 869618000, + bandwidth: int = 62500, + spreading_factor: int = 8, + coding_rate: int = 8, + tx_power: int = 22, + sync_word: int = 0x12, + preamble_length: int = 16, + lbt_enabled: bool = True, + lbt_max_attempts: int = 5, + connect_timeout: float = 5.0, + ): + self.host = host + self.port = port + self.token = token or "" + self.connect_timeout = connect_timeout + + # Radio config — matches SX1262Radio/USBLoRaRadio constructor params + self.frequency = frequency + self.bandwidth = bandwidth + self.spreading_factor = spreading_factor + self.coding_rate = coding_rate + self.tx_power = tx_power + self.sync_word = sync_word + self.preamble_length = preamble_length + + # LBT (Listen Before Talk) via CAD + self.lbt_enabled = lbt_enabled + self.lbt_max_attempts = lbt_max_attempts + + # State + self._sock: Optional[socket.socket] = None + self._sock_lock = threading.Lock() # guards socket writes + self._initialized = False + self._rx_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._event_loop: Optional[asyncio.AbstractEventLoop] = None + + # Custom CAD thresholds. Set lazily by set_custom_cad_thresholds() + # or perform_cad(det_peak=..., det_min=...); read by _reopen_socket + # to re-push to the firmware after a reconnect, so they must exist + # from construction time even when never customised. + self._custom_cad_peak: Optional[int] = None + self._custom_cad_min: Optional[int] = None + + # Signal metrics + self.last_rssi: int = -99 + self.last_snr: float = 0.0 + self.last_signal_rssi: int = -99 + self._noise_floor: float = -99.0 + + # RX callback + self.rx_callback: Optional[Callable[[bytes], None]] = None + + # Response synchronization + self._response_events: dict[int, asyncio.Event] = {} + self._response_data: dict[int, Optional[bytes]] = {} + self._response_lock = threading.Lock() + + # TX lock + self._tx_lock = asyncio.Lock() + + # Stats + self._tx_count = 0 + self._rx_count = 0 + + logger.info( + f"TCPLoRaRadio configured: {host}:{port} " + f"(auth={'token' if self.token else 'open'}), " + f"freq={frequency/1e6:.1f}MHz, sf={spreading_factor}, " + f"bw={bandwidth/1000:.0f}kHz, power={tx_power}dBm, " + f"syncword=0x{sync_word:04X}" + ) + + # ══════════════════════════════════════════════════════════ + # LoRaRadio interface + # ══════════════════════════════════════════════════════════ + + def begin(self) -> bool: + """Open TCP connection, authenticate (if token set), push config. + + If the initial connect/handshake fails, the radio is still marked + initialised and the RX worker is started in deferred-connect mode — + it will keep retrying the connection with exponential backoff until + the modem appears. This lets the repeater's HTTP server / setup + wizard come up even when the modem is offline or its host has not + been set yet (e.g. placeholder hostname after a fresh install). + """ + if self._initialized: + return True + + connected = False + try: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.settimeout(self.connect_timeout) + self._sock.connect((self.host, self.port)) + # TCP_NODELAY mirrors the firmware side and avoids Nagle delay + self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + logger.info(f"TCP connected to {self.host}:{self.port}") + connected = True + + if self.token and not self._auth_sync(timeout=3.0): + logger.error("Modem rejected AUTH token — entering deferred mode") + self._close_sock() + connected = False + elif not self._ping_sync(timeout=3.0): + logger.error("Modem not responding to PING — entering deferred mode") + self._close_sock() + connected = False + elif not self._apply_config_sync(): + logger.error("Failed to configure radio — entering deferred mode") + self._close_sock() + connected = False + except Exception as e: + logger.warning( + f"Could not reach modem at {self.host}:{self.port} ({e}); " + f"starting in deferred-connect mode — RX worker will keep retrying" + ) + self._close_sock() + connected = False + + # Always start the RX worker. When connected it processes traffic + # immediately; otherwise _reconnect_with_backoff drives reconnection + # so the repeater UI can stay up while the user sets pymc_tcp.host + # via /api/setup_wizard or /api/update_radio_config. + self._stop_event.clear() + self._rx_thread = threading.Thread( + target=self._rx_worker, daemon=True, name="tcp-lora-rx" + ) + self._rx_thread.start() + + self._initialized = True + if connected: + logger.info("TCPLoRaRadio initialized successfully") + else: + logger.info( + "TCPLoRaRadio initialised in deferred-connect mode " + "(no modem yet — radio commands will return None until reachable)" + ) + return True + + async def send(self, data: bytes) -> Optional[dict]: + """Send a LoRa packet asynchronously with LBT (CAD).""" + if not self._initialized: + logger.error("Radio not initialized") + return None + + async with self._tx_lock: + lbt_backoff_delays: list[float] = [] + + if self.lbt_enabled: + for attempt in range(self.lbt_max_attempts): + try: + channel_busy = await self._perform_cad(timeout=1.0) + if not channel_busy: + logger.debug( + f"CAD clear after {attempt + 1} attempt(s)" + ) + break + else: + logger.debug("CAD busy — channel activity detected") + if attempt < self.lbt_max_attempts - 1: + base_delay = random.randint(50, 200) + backoff_ms = min( + base_delay * (2 ** attempt), 5000 + ) + lbt_backoff_delays.append(float(backoff_ms)) + logger.debug( + f"CAD backoff {backoff_ms}ms " + f"(attempt {attempt+1}/{self.lbt_max_attempts})" + ) + await asyncio.sleep(backoff_ms / 1000.0) + else: + logger.warning( + "CAD max attempts — transmitting anyway" + ) + except Exception as e: + logger.warning(f"CAD failed: {e}, proceeding with TX") + break + + try: + resp = await self._send_command( + CMD_TX_REQUEST, data, + expect_cmd=CMD_TX_DONE, + timeout=10.0, + ) + + if resp is not None: + self._tx_count += 1 + airtime_us = 0 + if len(resp) >= 4: + airtime_us = struct.unpack(" 0, + } + else: + logger.error("TX failed — no TX_DONE response") + await self._send_command( + CMD_RX_START, b"", + expect_cmd=CMD_RX_STARTED, + timeout=2.0, + ) + return None + + except Exception as e: + logger.error(f"TX error: {e}") + return None + + async def wait_for_rx(self) -> bytes: + """Not used — packets arrive via set_rx_callback().""" + raise NotImplementedError( + "Use set_rx_callback(callback) to receive packets asynchronously." + ) + + def set_event_loop(self, loop) -> None: + # Called by repeater.main so radio can schedule live config pushes. + self._event_loop = loop + + def set_rx_callback(self, callback: Callable[[bytes], None]): + """Set RX callback — called by Dispatcher.""" + self.rx_callback = callback + logger.info("RX callback registered") + try: + self._event_loop = asyncio.get_running_loop() + except RuntimeError: + pass + + def sleep(self): + logger.debug("Sleep not applicable for TCP modem") + + def get_last_rssi(self) -> int: + return self.last_rssi + + def get_last_snr(self) -> float: + return self.last_snr + + def get_last_signal_rssi(self) -> int: + return self.last_signal_rssi + + # ══════════════════════════════════════════════════════════ + # Extended interface + # ══════════════════════════════════════════════════════════ + + def check_radio_health(self) -> bool: + if not self._initialized: + return False + + if self._rx_thread is None or not self._rx_thread.is_alive(): + logger.warning("RX thread dead — restarting") + self._stop_event.clear() + self._rx_thread = threading.Thread( + target=self._rx_worker, daemon=True, name="tcp-lora-rx" + ) + self._rx_thread.start() + return False + + if self._event_loop: + self._event_loop.call_soon_threadsafe( + lambda: self._event_loop.create_task(self.refresh_noise_floor()) + ) + return True + + def get_status(self) -> dict: + return { + "initialized": self._initialized, + "frequency": self.frequency, + "tx_power": self.tx_power, + "spreading_factor": self.spreading_factor, + "bandwidth": self.bandwidth, + "coding_rate": self.coding_rate, + "last_rssi": self.last_rssi, + "last_snr": self.last_snr, + "last_signal_rssi": self.last_signal_rssi, + "hardware_ready": self._initialized, + "driver": "pymc_tcp", + "host": self.host, + "port": self.port, + "tx_count": self._tx_count, + "rx_count": self._rx_count, + } + + async def get_modem_status(self) -> Optional[dict]: + resp = await self._send_command( + CMD_STATUS_REQ, b"", + expect_cmd=CMD_STATUS_RESP, + timeout=2.0, + ) + if resp and len(resp) >= STATUS_RESP_SIZE: + fields = struct.unpack(STATUS_RESP_FMT, resp[:STATUS_RESP_SIZE]) + return { + "uptime_sec": fields[0], + "rx_count": fields[1], + "tx_count": fields[2], + "crc_errors": fields[3], + "last_rssi": fields[4], + "last_snr": fields[5] / 10.0, + "noise_floor": fields[6] / 10.0, + "temp_c": fields[7], + "radio_state": ["idle/rx", "tx", "error"][min(fields[8], 2)], + } + return None + + def get_noise_floor(self) -> Optional[float]: + if not self._initialized: + return 0.0 + if self._tx_lock.locked(): + return 0.0 + return self._noise_floor + + async def refresh_noise_floor(self) -> Optional[float]: + resp = await self._send_command( + CMD_NOISE_REQ, b"", + expect_cmd=CMD_NOISE_RESP, + timeout=2.0, + ) + if resp and len(resp) >= 2: + nf_x10 = struct.unpack(" bool: + """Public CAD interface compatible with sx1262_wrapper.perform_cad(). + + When det_peak/det_min are supplied (e.g. by the repeater's CAD + calibration tool), program the chip with those thresholds via + CMD_SET_CAD_PARAMS before running the scan. Without this each + calibration sample would silently fall back to whatever thresholds + were last installed, defeating the sweep. + """ + if det_peak is not None and det_min is not None: + new_peak = int(det_peak) + new_min = int(det_min) + # Skip the firmware roundtrip when thresholds haven't changed + # since the previous call. Saves ~50-80 ms per CAD during the + # repeated-sample phase of the calibration sweep. + if new_peak != self._custom_cad_peak or new_min != self._custom_cad_min: + payload = bytes([ + 0x01, # symNum: CAD_ON_2_SYMB + new_peak & 0xFF, + new_min & 0xFF, + 0x00, # exitMode: STDBY + ]) + await self._send_command( + CMD_SET_CAD_PARAMS, payload, + expect_cmd=CMD_CAD_PARAMS_RESP, timeout=2.0, + ) + self._custom_cad_peak = new_peak + self._custom_cad_min = new_min + + # Calibration engine passes 0.3s, which is tight for TCP transport + # with the firmware's CAD-prime delay; floor it so the sweep gets + # real samples instead of "no response → assumed clear". + effective = max(timeout, 0.6) + return await self._perform_cad(effective) + + def cleanup(self): + self._initialized = False + self._stop_event.set() + + if self._rx_thread and self._rx_thread.is_alive(): + self._rx_thread.join(timeout=2.0) + + self._close_sock() + logger.info("TCPLoRaRadio cleanup complete") + + # ══════════════════════════════════════════════════════════ + # Private — socket I/O + # ══════════════════════════════════════════════════════════ + + def _close_sock(self): + if self._sock is not None: + try: + self._sock.close() + except Exception: + pass + self._sock = None + + def _sock_write(self, data: bytes): + """Thread-safe write to the TCP socket.""" + if self._sock is None: + raise ConnectionError("socket not open") + with self._sock_lock: + self._sock.sendall(data) + + def _auth_sync(self, timeout: float = 3.0) -> bool: + """Send CMD_AUTH with token payload, expect CMD_AUTH_OK.""" + frame = build_frame(CMD_AUTH, self.token.encode("utf-8")) + self._sock_write(frame) + + deadline = time.time() + timeout + while time.time() < deadline: + resp = self._read_frame_sync(timeout=max(0.1, deadline - time.time())) + if resp is None: + continue + cmd, payload = resp + if cmd == CMD_AUTH_OK: + logger.info("Modem accepted AUTH token") + return True + if cmd == CMD_ERROR: + err = payload[0] if payload else 0xFF + if err == ERR_UNAUTHORIZED: + logger.error("AUTH rejected: ERR_UNAUTHORIZED") + return False + logger.warning(f"Unexpected error during AUTH: 0x{err:02X}") + # Ignore other frames (e.g. broadcast RX) during auth handshake + return False + + def _ping_sync(self, timeout: float = 3.0) -> bool: + frame = build_frame(CMD_PING) + self._sock_write(frame) + + deadline = time.time() + timeout + while time.time() < deadline: + resp = self._read_frame_sync(timeout=max(0.1, deadline - time.time())) + if resp and resp[0] == CMD_PONG: + logger.info("Modem PONG received — alive") + return True + return False + + def _apply_config_sync(self) -> bool: + payload = struct.pack( + RADIO_CONFIG_FMT, + self.frequency, + self.bandwidth, + self.spreading_factor, + self.coding_rate, + self.tx_power, + self.sync_word, + self.preamble_length, + ) + frame = build_frame(CMD_SET_CONFIG, payload) + self._sock_write(frame) + + # The firmware may emit RX_PACKET broadcasts concurrently; filter by cmd. + deadline = time.time() + 3.0 + while time.time() < deadline: + resp = self._read_frame_sync(timeout=max(0.1, deadline - time.time())) + if resp is None: + continue + cmd, payload = resp + if cmd == CMD_CONFIG_RESP: + logger.info( + f"Radio configured: {self.frequency/1e6:.1f}MHz " + f"SF{self.spreading_factor} BW{self.bandwidth/1000:.0f}kHz " + f"{self.tx_power}dBm sync=0x{self.sync_word:04X} " + f"pre={self.preamble_length}" + ) + return True + if cmd == CMD_ERROR: + err = payload[0] if payload else 0xFF + logger.error(f"Config rejected by modem: error 0x{err:02X}") + return False + # Ignore RX_PACKET or other unrelated frames during config + logger.error("No config response from modem") + return False + + def _read_frame_sync(self, timeout: float = 2.0) -> Optional[tuple]: + """Read one frame synchronously with a single timeout budget.""" + if self._sock is None: + return None + try: + self._sock.settimeout(timeout) + + # Look for SYNC byte + sync_found = False + deadline = time.time() + timeout + while time.time() < deadline: + b = self._sock.recv(1) + if not b: + return None + if b[0] == PROTO_SYNC: + sync_found = True + break + if not sync_found: + return None + + # Header (cmd + len) + hdr = b"" + while len(hdr) < 3: + chunk = self._sock.recv(3 - len(hdr)) + if not chunk: + return None + hdr += chunk + + cmd = hdr[0] + length = struct.unpack(" bool: + """Open a fresh TCP socket + auth + push config. Returns True on success. + Does NOT start a new RX worker (we are already in one); does NOT flip + self._initialized (the radio is still the same instance from the host's + point of view). Used by _reconnect_with_backoff after a dropped link.""" + try: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.settimeout(self.connect_timeout) + self._sock.connect((self.host, self.port)) + self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + logger.info(f"TCP re-connected to {self.host}:{self.port}") + if self.token and not self._auth_sync(timeout=3.0): + logger.error("Reconnect: AUTH rejected") + self._close_sock() + return False + if not self._ping_sync(timeout=3.0): + logger.error("Reconnect: PING failed") + self._close_sock() + return False + if not self._apply_config_sync(): + logger.error("Reconnect: SET_CONFIG failed") + self._close_sock() + return False + # Re-apply custom CAD if host had programmed any. + if self._custom_cad_peak is not None and self._custom_cad_min is not None: + try: + self.set_custom_cad_thresholds(peak=self._custom_cad_peak, + min_val=self._custom_cad_min) + except Exception as e: + logger.warning(f"Reconnect: re-push CAD failed: {e}") + return True + except Exception as e: + logger.error(f"Reconnect socket open failed: {e}") + self._close_sock() + return False + + def _reconnect_with_backoff(self) -> bool: + """Exponential backoff (1, 2, 5, 10, 30 s) until socket is up or stop_event fires.""" + for delay in (1.0, 2.0, 5.0, 10.0, 30.0): + if self._stop_event.is_set(): + return False + logger.info(f"Reconnect attempt in {delay:.0f}s…") + if self._stop_event.wait(delay): + return False + if self._reopen_socket(): + return True + # Last resort: keep trying at 30 s intervals forever. + while not self._stop_event.is_set(): + if self._stop_event.wait(30.0): + return False + if self._reopen_socket(): + return True + return False + + def _rx_worker(self): + """Background thread: reads socket, dispatches RX packets and responses.""" + logger.debug("RX worker thread started") + buf = bytearray() + + while not self._stop_event.is_set(): + try: + if self._sock is None: + # Deferred-connect mode: begin() couldn't reach the + # modem, so we drive the reconnect ourselves with the + # same exponential backoff used after a runtime drop. + # Returns False if stop_event fires while waiting. + if not self._reconnect_with_backoff(): + return + buf.clear() + continue + + # Short recv timeout so stop_event is honoured promptly + self._sock.settimeout(0.1) + try: + chunk = self._sock.recv(4096) + except socket.timeout: + continue + except (OSError, ConnectionError) as e: + logger.error(f"Socket read error in RX worker: {e} — reconnecting") + self._close_sock() + buf.clear() + if not self._reconnect_with_backoff(): + return + continue + + if not chunk: + # Remote closed — try to reopen instead of dying; the + # previous behaviour left tcp_radio permanently offline + # until systemctl restart (known_issue_tcp_no_reconnect). + logger.warning("TCP connection closed by peer — attempting reconnect") + self._close_sock() + buf.clear() + if self._reconnect_with_backoff(): + continue + return + + buf.extend(chunk) + + # Parse complete frames from buffer + while len(buf) >= 6: + sync_idx = buf.find(PROTO_SYNC) + if sync_idx < 0: + buf.clear() + break + if sync_idx > 0: + buf = buf[sync_idx:] + + if len(buf) < 4: + break + + cmd = buf[1] + length = buf[2] | (buf[3] << 8) + # Sanity-cap the LEN field. The largest legitimate frame + # is RX_PACKET (6 B header + MAX_LORA_PAYLOAD); anything + # bigger means we're parsing garbage from a desync. + # Drop the SYNC byte we just consumed and resync rather + # than waiting indefinitely for a phantom 64 KB frame. + if length > MAX_LORA_PAYLOAD + 32: + logger.warning( + f"RX frame LEN={length} exceeds sanity bound — " + f"dropping SYNC byte and resyncing" + ) + buf = buf[1:] + continue + frame_size = 1 + 1 + 2 + length + 2 + + if len(buf) < frame_size: + break + + hdr = bytes(buf[1:4]) + payload = bytes(buf[4 : 4 + length]) + crc_recv = buf[4 + length] | (buf[5 + length] << 8) + crc_comp = crc16_ccitt(hdr + payload) + + buf = buf[frame_size:] + + if crc_recv != crc_comp: + logger.warning( + f"RX CRC mismatch, cmd=0x{cmd:02X}, dropping" + ) + continue + + self._dispatch_frame(cmd, payload) + + except Exception as e: + logger.error(f"RX worker error: {e}") + time.sleep(0.1) + + logger.debug("RX worker thread exiting") + + def _dispatch_frame(self, cmd: int, payload: bytes): + """Route a received frame.""" + + if cmd == CMD_RX_PACKET: + if len(payload) < 6: + logger.warning(f"RX_PACKET too short: {len(payload)}B") + return + + rssi = struct.unpack(" Optional[bytes]: + if self._sock is None: + return None + + if self._event_loop is None: + try: + self._event_loop = asyncio.get_running_loop() + except RuntimeError: + pass + + evt = asyncio.Event() + with self._response_lock: + self._response_events[expect_cmd] = evt + self._response_data.pop(expect_cmd, None) + + try: + frame = build_frame(cmd, payload) + try: + self._sock_write(frame) + except (OSError, ConnectionError) as e: + logger.error(f"TCP write failed: {e}") + return None + + try: + await asyncio.wait_for(evt.wait(), timeout=timeout) + except asyncio.TimeoutError: + logger.warning( + f"Timeout: cmd=0x{cmd:02X} → expected 0x{expect_cmd:02X}" + ) + return None + + return self._response_data.get(expect_cmd) + + finally: + with self._response_lock: + self._response_events.pop(expect_cmd, None) + self._response_data.pop(expect_cmd, None) + + async def _perform_cad(self, timeout: float = 1.0) -> bool: + resp = await self._send_command( + CMD_CAD_REQUEST, b"", + expect_cmd=CMD_CAD_RESP, + timeout=timeout, + ) + if resp and len(resp) >= 1: + busy = resp[0] != 0 + logger.debug(f"CAD: {'BUSY' if busy else 'CLEAR'}") + return busy + logger.warning("CAD no response — assuming clear") + return False + + # ── Config setters — live push to firmware ─────────────── + # + # Callers (repeater HTTP UI) invoke these from threads that are not + # the asyncio loop, so we bridge with run_coroutine_threadsafe(). + # Local state is always updated first; firmware ack is best-effort — + # a timeout downgrades to False but keeps the in-memory state current + # so the next begin()/_apply_config_sync will flush it. + def _run_async_safe(self, coro, wait_timeout: float = 6.0): + """Run *coro* on self._event_loop, whether we're on that loop or not. + On-loop caller: schedule with ensure_future (fire-and-forget, logs in bg). + Off-loop caller: block with run_coroutine_threadsafe and return its result. + Returns True on success/scheduled, False on timeout/exception.""" + try: + running = asyncio.get_running_loop() + except RuntimeError: + running = None + if running is self._event_loop: + # Same thread/loop — can't block; schedule and log outcome asynchronously. + async def _bg(): + try: + res = await coro + logger.info(f"Async config push result: ok={res is not None}") + except Exception as e: + logger.error(f"Async config push error: {e}", exc_info=True) + asyncio.ensure_future(_bg()) + return True + # Different thread — blocking wait is fine. + try: + fut = asyncio.run_coroutine_threadsafe(coro, self._event_loop) + resp = fut.result(timeout=wait_timeout) + return resp is not None + except Exception as e: + logger.error(f"Cross-thread config push error: {e}", exc_info=True) + return False + + def _push_config_live(self, changed: str) -> bool: + if not self._initialized: + return True + if self._event_loop is None or not self._event_loop.is_running(): + return True + payload = struct.pack( + RADIO_CONFIG_FMT, + self.frequency, self.bandwidth, self.spreading_factor, + self.coding_rate, self.tx_power, self.sync_word, + self.preamble_length, + ) + ok = self._run_async_safe( + self._send_command(CMD_SET_CONFIG, payload, + expect_cmd=CMD_CONFIG_RESP, timeout=5.0), + wait_timeout=6.0, + ) + logger.info(f"Live config push ({changed}): {'OK' if ok else 'TIMEOUT'}") + return ok + + def set_frequency(self, frequency: int) -> bool: + self.frequency = frequency + return self._push_config_live(f"freq={frequency}") + + def set_tx_power(self, power: int) -> bool: + self.tx_power = power + return self._push_config_live(f"power={power}") + + def set_spreading_factor(self, sf: int) -> bool: + self.spreading_factor = sf + return self._push_config_live(f"sf={sf}") + + def set_bandwidth(self, bw: int) -> bool: + self.bandwidth = bw + return self._push_config_live(f"bw={bw}") + + def set_coding_rate(self, cr: int) -> bool: + self.coding_rate = cr + return self._push_config_live(f"cr={cr}") + + def set_preamble_length(self, preamble: int) -> bool: + self.preamble_length = preamble + return self._push_config_live(f"preamble={preamble}") + + def set_sync_word(self, sync_word: int) -> bool: + self.sync_word = sync_word + return self._push_config_live(f"syncword=0x{sync_word:04X}") + + # CAD thresholds — v0.5.4 firmware exposes them via CMD_SET_CAD_PARAMS. + # symNum=0x01 (2 symbols) and exitMode=0x00 (STDBY) match pymc_core SX1262 defaults. + def set_tcp_target(self, host: Optional[str] = None, + port: Optional[int] = None, + token: Optional[str] = None) -> bool: + """Change the modem TCP endpoint at runtime. + + Updates self.host/port/token in place, closes the current socket, + and lets _rx_worker re-establish the connection via the existing + backoff path. Returns True if any field actually changed. + + Use case: a fresh repeater install starts in deferred-connect mode + with a placeholder host; the user supplies the real one later + through a dedicated config panel, and we apply it without a + service restart. + """ + changed = [] + if host is not None and host != self.host: + self.host = host + changed.append(f"host={host}") + if port is not None: + try: + p = int(port) + except (TypeError, ValueError): + p = self.port + if p != self.port and 1 <= p <= 65535: + self.port = p + changed.append(f"port={p}") + if token is not None and token != self.token: + self.token = token + changed.append("token=***") + + if not changed: + return False + + logger.info(f"TCP target changed: {', '.join(changed)} — forcing reconnect") + # Drop the socket; _rx_worker sees self._sock is None and runs + # _reconnect_with_backoff against the new host/port. + self._close_sock() + return True + + def set_custom_cad_thresholds(self, peak: int, min_val: int) -> bool: + self._custom_cad_peak = int(peak) + self._custom_cad_min = int(min_val) + if not self._initialized: + return True + if self._event_loop is None or not self._event_loop.is_running(): + return True + payload = bytes([0x01, peak & 0xFF, min_val & 0xFF, 0x00]) + ok = self._run_async_safe( + self._send_command(CMD_SET_CAD_PARAMS, payload, + expect_cmd=CMD_CAD_PARAMS_RESP, timeout=3.0), + wait_timeout=4.0, + ) + logger.info(f"CAD thresholds pushed peak={peak} min={min_val}: {'OK' if ok else 'TIMEOUT'}") + return ok + + def clear_custom_cad_thresholds(self) -> None: + self._custom_cad_peak = None + self._custom_cad_min = None + # Firmware retains last programmed values until reboot; explicit + # reset would need a dedicated command — out of scope for v0.5.4. + + def __del__(self): + try: + self.cleanup() + except Exception: + pass diff --git a/src/pymc_core/hardware/usb_radio.py b/src/pymc_core/hardware/usb_radio.py new file mode 100644 index 0000000..c22c754 --- /dev/null +++ b/src/pymc_core/hardware/usb_radio.py @@ -0,0 +1,994 @@ +""" +USB LoRa Radio Driver for pymc_core + +Implements the LoRaRadio interface using a pymc_usb modem connected via +USB-CDC. The modem acts as a "dumb" SX1262 transceiver — all MeshCore +protocol logic runs on the host in pymc_core. + +Drop-in replacement for SX1262Radio in pymc_core's hardware layer. + +Default sync word is 0x12 (MeshCore private syncword), matching the +firmware default — change only if the deployment uses a custom value. + +Usage: + from pymc_core.hardware.usb_radio import USBLoRaRadio + + radio = USBLoRaRadio( + port="/dev/ttyACM0", + frequency=869618000, + bandwidth=62500, + spreading_factor=8, + coding_rate=8, + tx_power=22, + sync_word=0x12, + preamble_length=16, + ) + radio.begin() + radio.set_rx_callback(my_callback) + await radio.send(packet_bytes) +""" + +import asyncio +import logging +import random +import struct +import threading +import time +from typing import Callable, Optional + +import serial + +from .protocol_constants import ( + PROTO_SYNC, + MAX_LORA_PAYLOAD, + CMD_TX_REQUEST, CMD_SET_CONFIG, + CMD_STATUS_REQ, CMD_NOISE_REQ, + CMD_CAD_REQUEST, CMD_RX_START, CMD_SET_CAD_PARAMS, + CMD_SET_WIFI, CMD_WIFI_RESET, + CMD_GET_WIFI, CMD_GET_VERSION, CMD_PING, + CMD_TX_DONE, CMD_TX_FAIL, CMD_RX_PACKET, + CMD_CONFIG_RESP, CMD_STATUS_RESP, CMD_NOISE_RESP, + CMD_CAD_RESP, CMD_RX_STARTED, CMD_CAD_PARAMS_RESP, + CMD_WIFI_STATUS, CMD_VERSION_RESP, + CMD_ERROR, CMD_PONG, + WIFI_MODE_OFFLINE, WIFI_MODE_STA_CONNECTING, + WIFI_MODE_STA_CONNECTED, WIFI_MODE_AP_CONFIG, + RADIO_CONFIG_FMT, + STATUS_RESP_FMT, STATUS_RESP_SIZE, + crc16_ccitt, build_frame, +) + +logger = logging.getLogger("USBLoRaRadio") + + +# Import LoRaRadio base conditionally — allows standalone testing +try: + from pymc_core.hardware.base import LoRaRadio + _HAS_BASE = True +except ImportError: + _HAS_BASE = False + +# Define the class with or without the ABC base +if _HAS_BASE: + class _RadioBase(LoRaRadio): + pass +else: + class _RadioBase: + pass + + +class USBLoRaRadio(_RadioBase): + """USB LoRa Radio — pymc_core LoRaRadio interface over USB-CDC serial. + + Communicates with any board running the pymc_usb firmware over a + USB-CDC serial link. Provides the same interface as SX1262Radio for + transparent integration with pymc_core's Dispatcher and MeshNode. + """ + + def __init__( + self, + port: str = "/dev/ttyACM0", + baudrate: int = 921600, + frequency: int = 869618000, + bandwidth: int = 62500, + spreading_factor: int = 8, + coding_rate: int = 8, + tx_power: int = 22, + sync_word: int = 0x12, + preamble_length: int = 16, + lbt_enabled: bool = True, + lbt_max_attempts: int = 5, + ): + self.port = port + self.baudrate = baudrate + + # Radio config — matches SX1262Radio constructor params + self.frequency = frequency + self.bandwidth = bandwidth + self.spreading_factor = spreading_factor + self.coding_rate = coding_rate + self.tx_power = tx_power + self.sync_word = sync_word + self.preamble_length = preamble_length + + # LBT (Listen Before Talk) via CAD + self.lbt_enabled = lbt_enabled + self.lbt_max_attempts = lbt_max_attempts + + # State + self._serial: Optional[serial.Serial] = None + self._initialized = False + self._rx_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._event_loop: Optional[asyncio.AbstractEventLoop] = None + + # Signal metrics — matches SX1262Radio interface + self.last_rssi: int = -99 + self.last_snr: float = 0.0 + self.last_signal_rssi: int = -99 + self._noise_floor: float = -99.0 + + # RX callback — set by Dispatcher via set_rx_callback() + self.rx_callback: Optional[Callable[[bytes], None]] = None + + # Response synchronization (command → response matching) + self._response_events: dict[int, asyncio.Event] = {} + self._response_data: dict[int, Optional[bytes]] = {} + self._response_lock = threading.Lock() + + # Custom CAD thresholds. Set lazily by set_custom_cad_thresholds() + # or perform_cad(det_peak=..., det_min=...); kept in attributes from + # construction so the calibration-sample fast-path doesn't trip on + # AttributeError before the first call. + self._custom_cad_peak: Optional[int] = None + self._custom_cad_min: Optional[int] = None + + # TX lock to serialize transmissions (matches SX1262Radio) + self._tx_lock = asyncio.Lock() + + # Stats + self._tx_count = 0 + self._rx_count = 0 + + logger.info( + f"USBLoRaRadio configured: port={port}, freq={frequency/1e6:.1f}MHz, " + f"sf={spreading_factor}, bw={bandwidth/1000:.0f}kHz, " + f"power={tx_power}dBm, syncword=0x{sync_word:04X}" + ) + + # ══════════════════════════════════════════════════════════ + # LoRaRadio interface implementation + # ══════════════════════════════════════════════════════════ + + def begin(self) -> bool: + """Initialize USB serial connection and configure the modem radio.""" + if self._initialized: + return True + + try: + # dsrdtr=True tells pyserial to leave DTR/DSR alone on open + # rather than pulsing them. On boards with a CP2102 (Heltec + # V3 et al.), pyserial's default toggles DTR and the CP2102 + # in turn pulls EN low, rebooting the ESP32 every time we + # (re-)open the port. dsrdtr=True is the workaround; rtscts + # stays off because the firmware does not implement hardware + # flow control on the RX pipe. + self._serial = serial.Serial() + self._serial.port = self.port + self._serial.baudrate = self.baudrate + self._serial.timeout = 0.1 + self._serial.write_timeout = 2.0 + self._serial.dsrdtr = True + self._serial.rtscts = False + self._serial.open() + + # Short settle in case the caller just power-cycled the device. + time.sleep(0.3) + self._serial.reset_input_buffer() + logger.info(f"Serial connected to {self.port}") + + # 35 s covers the worst-case WifiManager::begin() STA-timeout if + # the host opens the port right after a cold boot. + # Skipping PING here on purpose: SET_CONFIG is itself a + # liveness check (firmware replies with CONFIG_RESP), and on + # macOS a PING immediately followed by SET_CONFIG confuses the + # CDC bulk pipe — CONFIG_RESP never arrives. Keep `_ping_sync` + # available for ad-hoc health probes, just don't chain it with + # SET_CONFIG at boot. + if not self._apply_config_sync(): + logger.error("Failed to configure radio") + self._serial.close() + return False + + # Start RX background thread + self._stop_event.clear() + self._rx_thread = threading.Thread( + target=self._rx_worker, daemon=True, name="usb-lora-rx" + ) + self._rx_thread.start() + + self._initialized = True + logger.info("USBLoRaRadio initialized successfully") + return True + + except Exception as e: + logger.error(f"Failed to initialize USBLoRaRadio: {e}") + if self._serial and self._serial.is_open: + self._serial.close() + return False + + async def send(self, data: bytes) -> Optional[dict]: + """Send a LoRa packet asynchronously with LBT (CAD). + + Returns transmission metadata dict matching SX1262Radio.send(): + { + "airtime_ms": float, + "lbt_attempts": int, + "lbt_backoff_delays_ms": list[float], + "lbt_channel_busy": bool, + } + Returns None on failure. + """ + if not self._initialized: + logger.error("Radio not initialized") + return None + + async with self._tx_lock: + lbt_backoff_delays: list[float] = [] + + # ── Listen Before Talk (CAD) ───────────────────── + if self.lbt_enabled: + for attempt in range(self.lbt_max_attempts): + try: + channel_busy = await self._perform_cad(timeout=1.0) + if not channel_busy: + logger.debug( + f"CAD clear after {attempt + 1} attempt(s)" + ) + break + else: + logger.debug("CAD busy — channel activity detected") + if attempt < self.lbt_max_attempts - 1: + base_delay = random.randint(50, 200) + backoff_ms = min( + base_delay * (2 ** attempt), 5000 + ) + lbt_backoff_delays.append(float(backoff_ms)) + logger.debug( + f"CAD backoff {backoff_ms}ms " + f"(attempt {attempt+1}/{self.lbt_max_attempts})" + ) + await asyncio.sleep(backoff_ms / 1000.0) + else: + logger.warning( + "CAD max attempts — transmitting anyway" + ) + except Exception as e: + logger.warning(f"CAD failed: {e}, proceeding with TX") + break + + # ── Transmit ───────────────────────────────────── + try: + resp = await self._send_command( + CMD_TX_REQUEST, data, + expect_cmd=CMD_TX_DONE, + timeout=10.0, + ) + + if resp is not None: + self._tx_count += 1 + + # Parse airtime from TX_DONE payload (uint32 LE, microseconds) + airtime_us = 0 + if len(resp) >= 4: + airtime_us = struct.unpack(" 0, + } + else: + logger.error("TX failed — no TX_DONE response") + # Try to restore RX anyway + await self._send_command( + CMD_RX_START, b"", + expect_cmd=CMD_RX_STARTED, + timeout=2.0, + ) + return None + + except Exception as e: + logger.error(f"TX error: {e}") + return None + + async def wait_for_rx(self) -> bytes: + """Not used — packets arrive via set_rx_callback().""" + raise NotImplementedError( + "Use set_rx_callback(callback) to receive packets asynchronously." + ) + + def set_rx_callback(self, callback: Callable[[bytes], None]): + """Set RX callback — called by Dispatcher to register _on_packet_received.""" + self.rx_callback = callback + logger.info("RX callback registered") + + # Capture event loop for thread-safe async event dispatch + try: + self._event_loop = asyncio.get_running_loop() + except RuntimeError: + pass + + def sleep(self): + """Put radio into low-power mode (not typically used with USB modem).""" + logger.debug("Sleep not applicable for USB modem") + + def get_last_rssi(self) -> int: + return self.last_rssi + + def get_last_snr(self) -> float: + return self.last_snr + + def get_last_signal_rssi(self) -> int: + return self.last_signal_rssi + + # ══════════════════════════════════════════════════════════ + # Extended interface (matching SX1262Radio extras used by pymc_core) + # ══════════════════════════════════════════════════════════ + + def check_radio_health(self) -> bool: + """Health check — verify RX thread is alive, restart if dead. + + Called by Dispatcher.run_forever() every 60 seconds. + Also triggers a noise floor refresh from the modem. + """ + if not self._initialized: + return False + + # Check RX thread + if self._rx_thread is None or not self._rx_thread.is_alive(): + logger.warning("RX thread dead — restarting") + self._stop_event.clear() + self._rx_thread = threading.Thread( + target=self._rx_worker, daemon=True, name="usb-lora-rx" + ) + self._rx_thread.start() + return False + + # Schedule noise floor refresh (non-blocking) + if self._event_loop: + self._event_loop.call_soon_threadsafe( + lambda: self._event_loop.create_task(self.refresh_noise_floor()) + ) + + return True + + def get_status(self) -> dict: + """Get radio status dict (matches SX1262Radio.get_status()).""" + return { + "initialized": self._initialized, + "frequency": self.frequency, + "tx_power": self.tx_power, + "spreading_factor": self.spreading_factor, + "bandwidth": self.bandwidth, + "coding_rate": self.coding_rate, + "last_rssi": self.last_rssi, + "last_snr": self.last_snr, + "last_signal_rssi": self.last_signal_rssi, + "hardware_ready": self._initialized, + "driver": "pymc_usb", + "port": self.port, + "tx_count": self._tx_count, + "rx_count": self._rx_count, + } + + async def get_modem_status(self) -> Optional[dict]: + """Query detailed status from the modem firmware.""" + resp = await self._send_command( + CMD_STATUS_REQ, b"", + expect_cmd=CMD_STATUS_RESP, + timeout=2.0, + ) + if resp and len(resp) >= STATUS_RESP_SIZE: + fields = struct.unpack(STATUS_RESP_FMT, resp[:STATUS_RESP_SIZE]) + return { + "uptime_sec": fields[0], + "rx_count": fields[1], + "tx_count": fields[2], + "crc_errors": fields[3], + "last_rssi": fields[4], + "last_snr": fields[5] / 10.0, + "noise_floor": fields[6] / 10.0, + "temp_c": fields[7], + "radio_state": ["idle/rx", "tx", "error"][ + min(fields[8], 2) + ], + } + return None + + def get_noise_floor(self) -> Optional[float]: + """Get current noise floor in dBm. + + Matches SX1262Radio.get_noise_floor() interface. + The noise floor is continuously sampled by the modem firmware + using the same algorithm as SX1262Radio._sample_noise_floor(): + - 20 instantaneous RSSI samples during quiet periods + - 500ms quiet time after last packet before sampling + - Samples above floor + 10dB threshold rejected + - Averaged and clamped to -150...-50 dBm + + This is a synchronous method returning the cached value. + The firmware updates it autonomously. For a fresh read, + use await get_modem_status() which returns noise_floor. + """ + if not self._initialized: + return 0.0 + if self._tx_lock.locked(): + return 0.0 + return self._noise_floor + + async def refresh_noise_floor(self) -> Optional[float]: + """Query fresh noise floor from modem firmware. + + Returns noise floor in dBm, or None on failure. + Also updates the cached self._noise_floor value. + """ + resp = await self._send_command( + CMD_NOISE_REQ, b"", + expect_cmd=CMD_NOISE_RESP, + timeout=2.0, + ) + if resp and len(resp) >= 2: + nf_x10 = struct.unpack(" bool: + """Public CAD interface matching SX1262Radio.perform_cad(). + + When det_peak/det_min are supplied (used by the repeater CAD + calibration tool to sweep different thresholds), program the chip + with those values via CMD_SET_CAD_PARAMS before running the scan. + """ + if det_peak is not None and det_min is not None: + new_peak = int(det_peak) + new_min = int(det_min) + # Same caching rationale as tcp_radio: avoid re-sending unchanged + # thresholds during the per-sample inner loop of calibration. + if new_peak != self._custom_cad_peak or new_min != self._custom_cad_min: + payload = bytes([ + 0x01, + new_peak & 0xFF, + new_min & 0xFF, + 0x00, + ]) + await self._send_command( + CMD_SET_CAD_PARAMS, payload, + expect_cmd=CMD_CAD_PARAMS_RESP, timeout=2.0, + ) + self._custom_cad_peak = new_peak + self._custom_cad_min = new_min + + # Calibration tool sends timeout=0.3 which is tight for our stack; + # raise the floor so we don't lose samples to "no response". + effective = max(timeout, 0.6) + return await self._perform_cad(effective) + + # ── Wi-Fi / OTA provisioning (v0.5) ─────────────────────── + + async def set_wifi_credentials( + self, + ssid: str, + password: str, + tcp_port: int = 5055, + tcp_token: str = "", + ) -> Optional[dict]: + """Provision Wi-Fi credentials over USB and reboot into STA mode. + + After this call the modem saves SSID/password/port/token to NVS, + responds with a WIFI_STATUS frame showing the pending config, then + reboots. USB link drops during reboot — caller should re-open the + serial port and call `begin()` again, then verify with + `get_wifi_status()` that STA came up. + + tcp_token: shared secret. Empty = open LAN (TCP + HTTP OTA). Non-empty + enforces CMD_AUTH on TCP and Basic-auth (user='heltec') on HTTP /update. + + Returns the pending WIFI_STATUS dict, or None on timeout/error. + """ + ssid_b = ssid.encode("utf-8") + pass_b = password.encode("utf-8") + tok_b = tcp_token.encode("utf-8") + if len(ssid_b) == 0 or len(ssid_b) > 32: + raise ValueError("SSID must be 1..32 bytes") + if len(pass_b) > 64: + raise ValueError("password must be <=64 bytes") + if len(tok_b) > 64: + raise ValueError("token must be <=64 bytes") + if not (1 <= tcp_port <= 65535): + raise ValueError("tcp_port out of range") + + payload = ( + bytes([len(ssid_b)]) + ssid_b + + bytes([len(pass_b)]) + pass_b + + struct.pack(" Optional[dict]: + """Query current Wi-Fi/OTA status. + + Returns a dict: + { + "mode": int, # 0=off 1=connecting 2=STA 3=AP + "mode_name": str, + "ip": str, # "192.168.1.42" or "0.0.0.0" + "port": int, # TCP port + "ssid": str, + "hostname": str, # "pymc-ab12cd" — append ".local" for mDNS + "mdns": str, # "pymc-ab12cd.local" + } + or None on timeout. + """ + resp = await self._send_command( + CMD_GET_WIFI, b"", expect_cmd=CMD_WIFI_STATUS, timeout=2.0 + ) + if resp is None: + return None + return self._parse_wifi_status(resp) + + async def get_version(self) -> Optional[str]: + """Return firmware version string (e.g. 'v0.5.3'). + + None if the modem doesn't implement CMD_GET_VERSION (pre-v0.5.3 firmware) + or the request times out. + """ + resp = await self._send_command( + CMD_GET_VERSION, b"", expect_cmd=CMD_VERSION_RESP, timeout=2.0 + ) + if resp is None: + return None + try: + return resp.decode("ascii", errors="replace") + except Exception: + return None + + async def wifi_reset(self) -> bool: + """Wipe Wi-Fi NVS and reboot into AP config portal. + + Device will reboot and drop the serial link. USB OTA/provisioning + remains possible via `set_wifi_credentials()` after re-connecting. + """ + resp = await self._send_command( + CMD_WIFI_RESET, b"", expect_cmd=CMD_WIFI_RESET, timeout=2.0 + ) + return resp is not None + + @staticmethod + def _parse_wifi_status(payload: bytes) -> Optional[dict]: + """Parse WIFI_STATUS payload (see protocol.h for layout).""" + try: + i = 0 + mode = payload[i] + i += 1 + ip = f"{payload[i]}.{payload[i+1]}.{payload[i+2]}.{payload[i+3]}" + i += 4 + port = payload[i] | (payload[i+1] << 8) + i += 2 + slen = payload[i] + i += 1 + ssid = payload[i:i+slen].decode("utf-8", errors="replace") + i += slen + hlen = payload[i] + i += 1 + host = payload[i:i+hlen].decode("utf-8", errors="replace") + except (IndexError, UnicodeDecodeError) as e: + logger.error(f"Malformed WIFI_STATUS: {e}") + return None + + mode_names = { + WIFI_MODE_OFFLINE: "offline", + WIFI_MODE_STA_CONNECTING: "connecting", + WIFI_MODE_STA_CONNECTED: "sta", + WIFI_MODE_AP_CONFIG: "ap", + } + return { + "mode": mode, + "mode_name": mode_names.get(mode, "unknown"), + "ip": ip, + "port": port, + "ssid": ssid, + "hostname": host, + "mdns": f"{host}.local" if host else "", + } + + def cleanup(self): + """Clean up resources.""" + self._initialized = False + self._stop_event.set() + + if self._rx_thread and self._rx_thread.is_alive(): + self._rx_thread.join(timeout=2.0) + + if self._serial and self._serial.is_open: + self._serial.close() + + logger.info("USBLoRaRadio cleanup complete") + + # ══════════════════════════════════════════════════════════ + # Private — serial I/O + # ══════════════════════════════════════════════════════════ + + def _ping_sync(self, timeout: float = 3.0) -> bool: + """Synchronous ping — ad-hoc liveness probe. + + Not used from begin() on purpose: on macOS, chaining PING and + SET_CONFIG into the same CDC bulk stream causes SET_CONFIG to be + silently dropped. Keep this for explicit health checks. + """ + frame = build_frame(CMD_PING) + self._serial.write(frame) + + resp = self._read_frame_sync(timeout=timeout, expect_cmd=CMD_PONG) + logger.debug(f"PING _read_frame_sync returned: {resp}") + if resp and resp[0] == CMD_PONG: + logger.info("Modem PONG received — alive") + return True + return False + + def _apply_config_sync(self) -> bool: + """Synchronous config push — used during begin().""" + payload = struct.pack( + RADIO_CONFIG_FMT, + self.frequency, + self.bandwidth, + self.spreading_factor, + self.coding_rate, + self.tx_power, + self.sync_word, + self.preamble_length, + ) + frame = build_frame(CMD_SET_CONFIG, payload) + self._serial.write(frame) + + # 10 s covers the worst case where a burst of RX_PACKET frames from + # nearby MeshCore nodes arrives ahead of CONFIG_RESP and the filter + # has to skip them. + resp = self._read_frame_sync(timeout=10.0, expect_cmd=CMD_CONFIG_RESP) + if resp and resp[0] == CMD_CONFIG_RESP: + logger.info( + f"Radio configured: {self.frequency/1e6:.1f}MHz SF{self.spreading_factor} " + f"BW{self.bandwidth/1000:.0f}kHz {self.tx_power}dBm " + f"sync=0x{self.sync_word:04X} pre={self.preamble_length}" + ) + return True + elif resp and resp[0] == CMD_ERROR: + err = resp[1][0] if len(resp) > 1 and len(resp[1]) > 0 else 0xFF + logger.error(f"Config rejected by modem: error 0x{err:02X}") + return False + else: + logger.error("No config response from modem") + return False + + def _read_frame_sync( + self, + timeout: float = 2.0, + expect_cmd: Optional[int] = None, + ) -> Optional[tuple]: + """Read one frame synchronously. Returns (cmd, payload) or None. + + When `expect_cmd` is set, frames with a different cmd byte are + discarded silently (common case: RX_PACKET from nearby MeshCore + nodes arriving between our command and its response). The single + `timeout` applies to the whole wait, not per-frame. + """ + old_timeout = self._serial.timeout + # Short per-read timeout keeps us responsive; the overall wait is + # bounded by `deadline`. Longer timeouts (e.g. 35s) were observed + # to swallow incoming bytes on macOS + CP2102, so we poll instead. + self._serial.timeout = 0.2 + deadline = time.time() + timeout + try: + while time.time() < deadline: + b = self._serial.read(1) + if len(b) == 0: + continue + if b[0] != PROTO_SYNC: + continue + + # A loose 0xAA byte can appear inside an RX_PACKET payload; + # if we can't complete the frame (short read / bad CRC), + # go back to scanning for the next SYNC instead of bailing. + hdr = self._serial.read(3) + if len(hdr) < 3: + continue + cmd = hdr[0] + length = struct.unpack(" MAX_LORA_PAYLOAD + 16: + continue # garbage length — must be a fake SYNC + + payload = self._serial.read(length) if length > 0 else b"" + if len(payload) < length: + continue + + crc_bytes = self._serial.read(2) + if len(crc_bytes) < 2: + continue + + received_crc = struct.unpack(" 0: + buf.extend(self._serial.read(waiting)) + else: + time.sleep(0.001) + continue + + # Parse complete frames from buffer + while len(buf) >= 6: # Min frame: SYNC+CMD+LEN(2)+CRC(2) + # Find sync + sync_idx = buf.find(PROTO_SYNC) + if sync_idx < 0: + buf.clear() + break + if sync_idx > 0: + buf = buf[sync_idx:] + + if len(buf) < 4: + break + + cmd = buf[1] + length = buf[2] | (buf[3] << 8) + # Sanity-cap the LEN field. The largest legitimate frame + # is RX_PACKET (6 B header + MAX_LORA_PAYLOAD); anything + # bigger means we're parsing garbage from a desync. + # Drop the SYNC byte we just consumed and resync rather + # than waiting indefinitely for a phantom 64 KB frame. + if length > MAX_LORA_PAYLOAD + 32: + logger.warning( + f"RX frame LEN={length} exceeds sanity bound — " + f"dropping SYNC byte and resyncing" + ) + buf = buf[1:] + continue + frame_size = 1 + 1 + 2 + length + 2 + + if len(buf) < frame_size: + break # Incomplete frame, wait for more + + # Extract frame + hdr = bytes(buf[1:4]) + payload = bytes(buf[4 : 4 + length]) + crc_recv = buf[4 + length] | (buf[5 + length] << 8) + crc_comp = crc16_ccitt(hdr + payload) + + # Consume frame + buf = buf[frame_size:] + + if crc_recv != crc_comp: + logger.warning( + f"RX CRC mismatch, cmd=0x{cmd:02X}, dropping" + ) + continue + + self._dispatch_frame(cmd, payload) + + except serial.SerialException as e: + logger.error(f"Serial error in RX worker: {e}") + time.sleep(1.0) + except Exception as e: + logger.error(f"RX worker error: {e}") + time.sleep(0.1) + + logger.debug("RX worker thread exiting") + + def _dispatch_frame(self, cmd: int, payload: bytes): + """Route a received frame to the right handler.""" + + if cmd == CMD_RX_PACKET: + # ── LoRa packet received ───────────────────────── + if len(payload) < 6: + logger.warning(f"RX_PACKET too short: {len(payload)}B") + return + + rssi = struct.unpack(" 0 else 0xFF + logger.warning(f"Modem error: 0x{err_code:02X}") + # Also signal any waiting command in case the error + # is a response to our command + with self._response_lock: + for evt_cmd, evt in list(self._response_events.items()): + self._response_data[evt_cmd] = None + if self._event_loop: + self._event_loop.call_soon_threadsafe(evt.set) + + elif cmd == CMD_TX_FAIL: + # The TX request reached the radio but the chip never asserted + # TX_DONE before the firmware's own timeout. Wake up whoever is + # blocked on CMD_TX_DONE so the caller doesn't sit on the full + # driver timeout. + logger.warning("Modem TX_FAIL — radio did not assert TX_DONE") + with self._response_lock: + evt = self._response_events.get(CMD_TX_DONE) + if evt is not None: + self._response_data[CMD_TX_DONE] = None + if self._event_loop: + self._event_loop.call_soon_threadsafe(evt.set) + + else: + # ── Command response ───────────────────────────── + with self._response_lock: + if cmd in self._response_events: + self._response_data[cmd] = payload + evt = self._response_events[cmd] + if self._event_loop: + self._event_loop.call_soon_threadsafe(evt.set) + + # ── Async command/response ──────────────────────────────── + + async def _send_command( + self, + cmd: int, + payload: bytes, + expect_cmd: int, + timeout: float = 5.0, + ) -> Optional[bytes]: + """Send a command frame and wait for a specific response frame.""" + if not self._serial or not self._serial.is_open: + return None + + # Ensure event loop is captured + if self._event_loop is None: + try: + self._event_loop = asyncio.get_running_loop() + except RuntimeError: + pass + + # Register response expectation + evt = asyncio.Event() + with self._response_lock: + self._response_events[expect_cmd] = evt + self._response_data.pop(expect_cmd, None) + + try: + frame = build_frame(cmd, payload) + self._serial.write(frame) + self._serial.flush() + + try: + await asyncio.wait_for(evt.wait(), timeout=timeout) + except asyncio.TimeoutError: + logger.warning( + f"Timeout: cmd=0x{cmd:02X} → expected 0x{expect_cmd:02X}" + ) + return None + + return self._response_data.get(expect_cmd) + + finally: + with self._response_lock: + self._response_events.pop(expect_cmd, None) + self._response_data.pop(expect_cmd, None) + + async def _perform_cad(self, timeout: float = 1.0) -> bool: + """Perform Channel Activity Detection. Returns True if busy.""" + resp = await self._send_command( + CMD_CAD_REQUEST, b"", + expect_cmd=CMD_CAD_RESP, + timeout=timeout, + ) + if resp and len(resp) >= 1: + busy = resp[0] != 0 + logger.debug(f"CAD: {'BUSY' if busy else 'CLEAR'}") + return busy + else: + logger.warning("CAD no response — assuming clear") + return False + + # ── Config setters (for runtime reconfiguration) ────────── + + def set_frequency(self, frequency: int) -> bool: + self.frequency = frequency + return True + + def set_tx_power(self, power: int) -> bool: + self.tx_power = power + return True + + def set_spreading_factor(self, sf: int) -> bool: + self.spreading_factor = sf + return True + + def set_bandwidth(self, bw: int) -> bool: + self.bandwidth = bw + return True + + def __del__(self): + try: + self.cleanup() + except Exception: + pass diff --git a/src/pymc_core/protocol/constants.py b/src/pymc_core/protocol/constants.py index a3421ed..8736ca2 100644 --- a/src/pymc_core/protocol/constants.py +++ b/src/pymc_core/protocol/constants.py @@ -40,6 +40,8 @@ # --------------------------------------------------------------------------- PAYLOAD_VER_1 = 0x00 # Currently supported PAYLOAD_VER_2 = 0x01 # Reserved for future use +PAYLOAD_VER_3 = 0x02 # Reserved for future use +PAYLOAD_VER_4 = 0x03 # Reserved for future use MAX_SUPPORTED_PAYLOAD_VERSION = PAYLOAD_VER_2 # Accept versions 0-1 # --------------------------------------------------------------------------- diff --git a/tests/hardware/__init__.py b/tests/hardware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/hardware/test_protocol_constants.py b/tests/hardware/test_protocol_constants.py new file mode 100644 index 0000000..4490651 --- /dev/null +++ b/tests/hardware/test_protocol_constants.py @@ -0,0 +1,150 @@ +""" +Offline tests for the shared pymc_usb wire-protocol primitives. + +Verifies CRC-16/CCITT-FALSE (poly 0x1021, init 0xFFFF, no reflect, +no xor-out) and frame layout produced by `crc16_ccitt` / `build_frame` +against fixed reference vectors so the firmware and both Python drivers +(USB + TCP) cannot drift apart silently. + +No network, no hardware — these run in CI on a bare interpreter. + +Place this file at: + tests/hardware/test_protocol_constants.py +""" + +import struct + +import pytest + +from pymc_core.hardware.protocol_constants import ( + CMD_PING, + CMD_TX_REQUEST, + PROTO_SYNC, + build_frame, + crc16_ccitt, +) + + +# ───────────────────────── CRC ───────────────────────────────────── + + +def test_crc_empty_buffer_is_init_value(): + """Empty input → CRC stays at the init value 0xFFFF.""" + assert crc16_ccitt(b"") == 0xFFFF + + +def test_crc_ccitt_false_canonical_vector(): + """ + Canonical CRC-16/CCITT-FALSE check vector: CRC("123456789") == 0x29B1. + + If this fails, the polynomial, initial value, or bit ordering have + drifted from the firmware's `crc16_ccitt` in `protocol.h`. + """ + assert crc16_ccitt(b"123456789") == 0x29B1 + + +def test_crc_single_byte_zero(): + """CRC(0x00) is a stable spot-check: 0xE1F0.""" + assert crc16_ccitt(b"\x00") == 0xE1F0 + + +# ───────────────────────── Frame layout ──────────────────────────── + + +def test_ping_frame_layout(): + """ + PING frame is the simplest case: SYNC | CMD | LEN(2) | CRC(2). + Empty payload → 6 bytes total, no payload field in between. + """ + frame = build_frame(CMD_PING) + assert len(frame) == 6 + assert frame[0] == PROTO_SYNC + assert frame[1] == CMD_PING + assert frame[2:4] == b"\x00\x00" # length = 0, little-endian + + +@pytest.mark.parametrize("n", [1, 16, 100, 200, 255]) +def test_frame_with_payload_total_length(n): + """Total frame length = 1 (SYNC) + 1 (CMD) + 2 (LEN) + N + 2 (CRC).""" + payload = b"\xAB" * n + frame = build_frame(CMD_TX_REQUEST, payload) + assert len(frame) == 6 + n + + +def test_payload_passes_through_unchanged(): + """Build a frame and confirm the payload bytes are at the expected offset.""" + payload = bytes(range(100)) + frame = build_frame(CMD_TX_REQUEST, payload) + assert frame[4 : 4 + 100] == payload + + +def test_length_field_is_little_endian(): + """ + 300-byte payload exercises both LEN bytes. LE encoding of 300 = 0x012C + → bytes [0x2C, 0x01]. If we serialized big-endian, this would be [0x01, 0x2C]. + """ + payload = b"\x55" * 300 + frame = build_frame(CMD_TX_REQUEST, payload) + assert frame[2:4] == b"\x2C\x01" + + +def test_crc_covers_cmd_len_payload_but_not_sync(): + """ + Per protocol.h: CRC is computed over CMD + LEN + PAYLOAD, *not* over SYNC. + Verify by rebuilding the CRC from the raw bytes we expect it to cover. + """ + payload = b"hello world" + frame = build_frame(CMD_TX_REQUEST, payload) + + # Last 2 bytes of the frame are the CRC, little-endian. + crc_in_frame = struct.unpack(" _write_ok.""" @@ -139,6 +170,51 @@ async def test_cmd_send_raw_data_valid_writes_ok(): server._write_err.assert_not_called() +@pytest.mark.asyncio +async def test_cmd_send_channel_data_valid_direct_path(): + """CMD_SEND_CHANNEL_DATA parses path/data_type/payload and delegates to bridge.""" + from pymc_core.protocol.packet_utils import PathUtils + + bridge = _MockBridgeChannelData(send_ok=True) + server = CompanionFrameServer(bridge, "hash", port=0) + server._write_ok = Mock() + server._write_err = Mock() + + path_len = PathUtils.encode_path_len(1, 2) # two 1-byte hops + payload = b"\xDE\xAD\xBE" + data = bytes([1, path_len, 0x10, 0x20, 0x34, 0x12]) + payload + await server._cmd_send_channel_data(data) + + assert bridge.calls == [(1, 0x1234, payload, b"\x10\x20", path_len)] + server._write_ok.assert_called_once() + server._write_err.assert_not_called() + + +@pytest.mark.asyncio +async def test_cmd_send_channel_data_invalid_type_zero(): + """CMD_SEND_CHANNEL_DATA rejects DATA_TYPE_RESERVED (0).""" + bridge = _MockBridgeChannelData(send_ok=True) + server = CompanionFrameServer(bridge, "hash", port=0) + server._write_ok = Mock() + server._write_err = Mock() + + # channel=1, flood path (0xFF), data_type=0x0000, payload=b"x" + await server._cmd_send_channel_data(bytes([1, 0xFF, 0x00, 0x00, 0x78])) + server._write_err.assert_called_once_with(ERR_CODE_ILLEGAL_ARG) + server._write_ok.assert_not_called() + + +@pytest.mark.asyncio +async def test_cmd_send_channel_data_unknown_channel(): + """CMD_SEND_CHANNEL_DATA returns NOT_FOUND for unknown channel index.""" + bridge = _MockBridgeChannelData(send_ok=True) + server = CompanionFrameServer(bridge, "hash", port=0) + server._write_err = Mock() + + await server._cmd_send_channel_data(bytes([2, 0xFF, 0x34, 0x12])) + server._write_err.assert_called_once_with(ERR_CODE_NOT_FOUND) + + @pytest.mark.asyncio async def test_cmd_add_update_contact_writes_single_ok_response(): """CMD_ADD_UPDATE_CONTACT should emit one response frame (OK only).""" @@ -261,6 +337,63 @@ async def test_cmd_send_raw_data_truncated_multibyte_path(): server._write_err.assert_called_once_with(ERR_CODE_UNSUPPORTED_CMD) +@pytest.mark.asyncio +async def test_default_flood_scope_set_get_and_clear(): + """Default flood scope commands encode/decode firmware-compatible payloads.""" + bridge = _MockBridgeChannelData() + server = CompanionFrameServer(bridge, "hash", port=0) + frames = [] + server._write_frame = lambda f: frames.append(f) + server._write_ok = Mock() + server._write_err = Mock() + + scope_name = "regionA" + name_field = scope_name.encode("utf-8").ljust(31, b"\x00") + key = bytes(range(16)) + + await server._cmd_set_default_flood_scope(name_field + key) + server._write_ok.assert_called_once() + assert bridge.default_scope == (scope_name, key) + + await server._cmd_get_default_flood_scope(b"") + assert frames[-1][0] == RESP_CODE_DEFAULT_FLOOD_SCOPE + assert frames[-1][1:32].split(b"\x00", 1)[0] == scope_name.encode("utf-8") + assert frames[-1][32:48] == key + + await server._cmd_set_default_flood_scope(b"") + assert bridge.default_scope is None + await server._cmd_get_default_flood_scope(b"") + assert frames[-1] == bytes([RESP_CODE_DEFAULT_FLOOD_SCOPE]) + + +def test_build_message_frame_channel_data_v15(): + """Queued binary channel data encodes as RESP_CODE_CHANNEL_DATA_RECV.""" + bridge = Mock() + bridge.get_time = Mock(return_value=0) + server = CompanionFrameServer(bridge, "hash", port=0) + server._app_target_ver = 3 + msg = QueuedMessage( + sender_key=b"", + txt_type=0, + timestamp=0, + text="", + is_channel=True, + channel_idx=4, + path_len=0xFF, + snr=2.0, + rssi=-90, + channel_data_type=0x1234, + channel_data_payload=b"\xAA\xBB", + ) + frame = server._build_message_frame(msg) + assert frame[0] == RESP_CODE_CHANNEL_DATA_RECV + assert frame[4] == 4 + assert frame[5] == 0xFF + assert frame[6:8] == b"\x34\x12" + assert frame[8] == 2 + assert frame[9:11] == b"\xAA\xBB" + + @pytest.mark.asyncio async def test_push_trace_data_enqueues_frame(): """push_trace_data enqueues a correctly formatted trace frame.""" diff --git a/tests/test_sx1262_wrapper.py b/tests/test_sx1262_wrapper.py index 288c462..e167975 100644 --- a/tests/test_sx1262_wrapper.py +++ b/tests/test_sx1262_wrapper.py @@ -1,5 +1,6 @@ import pytest +from pymc_core.hardware.sx1262_wrapper import SX1262Radio from pymc_core.hardware.signal_utils import snr_register_to_db @@ -35,3 +36,11 @@ def test_16bit_negative_conversion(): def test_invalid_bit_width_raises(): with pytest.raises(ValueError): snr_register_to_db(0x00, bits=0) + + +def test_normalize_en_pins_uses_single_pin_when_list_missing(): + assert SX1262Radio._normalize_en_pins(en_pin=26, en_pins=None) == [26] + + +def test_normalize_en_pins_prefers_list_and_filters_disabled_entries(): + assert SX1262Radio._normalize_en_pins(en_pin=26, en_pins=[26, -1, 23, 26]) == [26, 23]