Add Marstek MQTT responder to answer CT002/CT003 polls locally#328
Conversation
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
WalkthroughThis PR adds an optional Marstek MQTT responder that answers CT002/CT003 poll traffic on the same broker. Changes include extending CT002 with consumer reporting APIs, introducing a raw powermeter watts hook across the wrapper hierarchy, implementing the Marstek MQTT protocol and binding, integrating responder handling into MQTT Insights service, wiring device registration in the main application, and documenting the new configuration options. ChangesMarstek MQTT Responder Integration
🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
When [MARSTEK] credentials are configured, the managed fake CT MAC returned by ensure_managed_fake_device() is now used to answer the Marstek CT002/CT003 MQTT poll protocol (cd=1 on hame_energy/marstek_energy topics) on the same broker as MQTT Insights. Combined with hame-relay, this makes the emulator's readings visible as a CT in the Marstek app. Enabled by default via MARSTEK_MQTT_ENABLED in [MQTT_INSIGHTS]; opt out by setting it to false. Without Marstek credentials the responder stays silent (one info log per CT device). https://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e
The MQTT Insights listener is a single async-for loop. When a Marstek poll handler awaited binding.get_values() inline it could block every subsequent message (other CT polls, Insights commands) for as long as the powermeter took to yield a reading. Spawn each response in its own task instead, and track/cancel those tasks on disconnect/shutdown. Also snapshot _marstek_bindings under the lock before scanning in _find_marstek_binding, and drop the type: ignore on the topic helpers by returning an explicit 2-tuple. Adds two integration tests: register-before-start populates subscriptions on first connect, and a slow handler on one binding doesn't block a concurrent fast poll on another binding. https://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e
Align cd=1/cd=4 payloads with HME-style parsing, propagate cloud ver_v into bindings, add optional MARSTEK_MQTT_INTERVAL for periodic broadcasts, and expose typed reporting_consumer_* data from CT002 while building slave CSV in the MQTT layer.
0cfc611 to
536d8ba
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/astrameter/main.py (1)
641-706:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRebuild managed Marstek mapping after config-driven restarts.
managed_marstekis computed once before the service loop, but Lines 736-739 reload config/device types without recomputing it. After a web “Save & Restart”, Marstek MAC/version wiring can stay stale until full process restart.Proposed fix
+def _build_managed_marstek( + cfg: configparser.ConfigParser, device_types: list[str] +) -> dict[str, tuple[str, int]]: + managed_marstek: dict[str, tuple[str, int]] = {} + marstek_enabled = cfg.getboolean("MARSTEK", "ENABLE", fallback=False) + if not marstek_enabled: + return managed_marstek + # existing registration logic from main() Lines 646-706... + return managed_marstek @@ - managed_marstek: dict[str, tuple[str, int]] = {} - marstek_enabled = cfg.getboolean("MARSTEK", "ENABLE", fallback=False) - if marstek_enabled: - ... + managed_marstek = _build_managed_marstek(cfg, device_types) @@ except KeyboardInterrupt: if not restart_requested: break logger.info("Restarting service…") cfg = configparser.ConfigParser(dict_type=OrderedDict, interpolation=None) cfg.read(args.config) _apply_cli_overrides(cfg, args) device_types, device_ids, skip_test = _resolve_device_config(cfg, args) + managed_marstek = _build_managed_marstek(cfg, device_types)Also applies to: 736-739
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/astrameter/main.py` around lines 641 - 706, managed_marstek is only built once (using marstek_enabled, MarstekConfig, ensure_managed_fake_device, normalize_mac, ver_v_from_marstek_api_version and device_types) but never recomputed after the config/device_types reload later, leaving MAC/version wiring stale after a config-driven restart; fix by extracting the registration logic into a helper (e.g., build_managed_marstek(marstek_cfg, device_types) that returns the managed_marstek dict) and call it both at initial startup and again immediately after the config/device_types are reloaded (the same place where cfg and device_types are refreshed), or simply re-run the existing loop there to rebuild managed_marstek when marstek_enabled is true.
🧹 Nitpick comments (1)
src/astrameter/powermeter/wrappers/transform_test.py (1)
11-13: ⚡ Quick winStrengthen raw-path dispatch verification in the fixture/test.
Line 11-Line 13 reuses one
AsyncMockfor both methods, so this test can’t proveget_powermeter_watts_raw()is actually used. Use separate mocks and assert awaited calls.Proposed test hardening
`@pytest.fixture` def mock_powermeter(): pm = Mock() - m = AsyncMock() - pm.get_powermeter_watts = m - pm.get_powermeter_watts_raw = m + pm.get_powermeter_watts = AsyncMock() + pm.get_powermeter_watts_raw = AsyncMock() pm.wait_for_message = AsyncMock() pm.wait_for_next_message = AsyncMock() return pm async def test_transformed_raw_matches_wrapped_without_offset(mock_powermeter): mock_powermeter.get_powermeter_watts.return_value = [100.0, 200.0, 300.0] + mock_powermeter.get_powermeter_watts_raw.return_value = [100.0, 200.0, 300.0] t = TransformedPowermeter(mock_powermeter, [10.0], [1.0]) assert await t.get_powermeter_watts() == [110.0, 210.0, 310.0] assert await t.get_powermeter_watts_raw() == [100.0, 200.0, 300.0] + mock_powermeter.get_powermeter_watts.assert_awaited_once() + mock_powermeter.get_powermeter_watts_raw.assert_awaited_once()Also applies to: 19-23
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/astrameter/powermeter/wrappers/transform_test.py` around lines 11 - 13, Replace the single shared AsyncMock with two distinct mocks so the test can verify raw-path dispatch: create separate AsyncMock instances and assign one to pm.get_powermeter_watts and the other to pm.get_powermeter_watts_raw, then update the assertions to assert that the specific mock for get_powermeter_watts_raw was awaited (and the other was/was not awaited as appropriate) to prove the raw method was actually called; ensure you do the same change for the second occurrence covering lines 19-23.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/astrameter/config/config_loader.py`:
- Around line 735-737: The MARSTEK_MQTT_ENABLED boolean should be parsed like
TLS and HA_DISCOVERY to treat an empty value as missing: first read the raw
string via config.get(section, "MARSTEK_MQTT_ENABLED", fallback=None) (or
similar), check if the raw value is None or empty/whitespace and only then call
config.getboolean(section, "MARSTEK_MQTT_ENABLED", fallback=True) if a non-empty
value exists; update the marstek_mqtt_enabled assignment in config_loader.py to
use this two-step pattern so an explicit blank key falls back to True instead of
raising.
In `@src/astrameter/main.py`:
- Around line 359-371: The _marstek_get_values coroutine can await
chosen.wait_for_next_message() indefinitely and block Marstek poll responder
tasks; wrap the call to chosen.wait_for_next_message() in an asyncio timeout
(e.g., using asyncio.wait_for with a small configurable timeout) and catch
asyncio.TimeoutError so the function falls back to safe behavior (returning
zeros or reading last-known values) before calling
chosen.get_powermeter_watts_raw(); update the _marstek_get_values implementation
to use the timeout and error handling around wait_for_next_message to prevent
hanging when the Powermeter (chosen) is quiet/offline.
In `@src/astrameter/mqtt_insights/marstek_mqtt.py`:
- Around line 196-201: The loop that builds CD4 CSV lines calls .strip() on
row.last_ip which can be None; change the host extraction in the loop that
contains parts.append so it safely handles optional last_ip (e.g., treat None as
empty string before stripping and then default to "0.0.0.0"), then pass that
safe host value into _cd4_escape_field in the parts.append call to avoid raising
on None.
In `@src/astrameter/mqtt_insights/service.py`:
- Around line 707-714: The polling loop currently spawns a new asyncio task for
every binding on each tick (see _serve_marstek_poll, self._marstek_tasks) which
allows concurrent overlapping reads for the same binding; change this to
serialize work per binding by tracking active work keyed by the binding (e.g.,
self._marstek_tasks_by_binding: Dict[binding, Task] or a per-binding
asyncio.Lock) and before creating a new task check if one already exists/running
for that binding and skip spawning (or await the lock) until it finishes; ensure
you add a done_callback to remove the entry from the per-binding map (mirror the
existing discard logic) and apply the same fix where similar spawning occurs
around lines 737-739.
- Around line 205-213: unregister_marstek currently removes the binding but
doesn't stop any running per-device poll tasks, allowing stale handlers to
publish after unregistration; update unregister_marstek to also stop or fence
those tasks by tracking per-device tasks (e.g., add or use a mapping like
_marstek_tasks), pop and cancel the task for device_id under the same
_marstek_lock before unsubscribing, and/or modify the poll handler to re-check
the current binding from _marstek_bindings (compare
self._marstek_bindings.get(device_id) to the captured binding) before any
publish to ensure it is still the active binding; ensure cancellation uses
task.cancel() and awaits it safely inside the _marstek_lock to avoid races.
---
Outside diff comments:
In `@src/astrameter/main.py`:
- Around line 641-706: managed_marstek is only built once (using
marstek_enabled, MarstekConfig, ensure_managed_fake_device, normalize_mac,
ver_v_from_marstek_api_version and device_types) but never recomputed after the
config/device_types reload later, leaving MAC/version wiring stale after a
config-driven restart; fix by extracting the registration logic into a helper
(e.g., build_managed_marstek(marstek_cfg, device_types) that returns the
managed_marstek dict) and call it both at initial startup and again immediately
after the config/device_types are reloaded (the same place where cfg and
device_types are refreshed), or simply re-run the existing loop there to rebuild
managed_marstek when marstek_enabled is true.
---
Nitpick comments:
In `@src/astrameter/powermeter/wrappers/transform_test.py`:
- Around line 11-13: Replace the single shared AsyncMock with two distinct mocks
so the test can verify raw-path dispatch: create separate AsyncMock instances
and assign one to pm.get_powermeter_watts and the other to
pm.get_powermeter_watts_raw, then update the assertions to assert that the
specific mock for get_powermeter_watts_raw was awaited (and the other was/was
not awaited as appropriate) to prove the raw method was actually called; ensure
you do the same change for the second occurrence covering lines 19-23.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c6558c90-d5ff-4d58-9e95-d27a95748186
📒 Files selected for processing (24)
CHANGELOG.mdREADME.mdconfig.ini.examplesrc/astrameter/config/config_loader.pysrc/astrameter/ct002/__init__.pysrc/astrameter/ct002/ct002.pysrc/astrameter/main.pysrc/astrameter/main_test.pysrc/astrameter/marstek_api.pysrc/astrameter/mqtt_insights/__init__.pysrc/astrameter/mqtt_insights/marstek_mqtt.pysrc/astrameter/mqtt_insights/marstek_mqtt_test.pysrc/astrameter/mqtt_insights/mqtt_insights_test.pysrc/astrameter/mqtt_insights/service.pysrc/astrameter/powermeter/base.pysrc/astrameter/powermeter/wrappers/base.pysrc/astrameter/powermeter/wrappers/hampel_test.pysrc/astrameter/powermeter/wrappers/pid_test.pysrc/astrameter/powermeter/wrappers/smoothing_test.pysrc/astrameter/powermeter/wrappers/throttling.pysrc/astrameter/powermeter/wrappers/throttling_test.pysrc/astrameter/powermeter/wrappers/transform_test.pysrc/astrameter/shelly/shelly_udp_test.pytests/test_ct002_protocol.py
| for row in rows: | ||
| host = row.last_ip.strip() or "0.0.0.0" | ||
| parts.append( | ||
| f"slv_t={_cd4_escape_field(row.device_type)},slv_id={_cd4_escape_field(row.consumer_id)}," | ||
| f"slv_ip={_cd4_escape_field(host)},slv_p={row.phase}" | ||
| ) |
There was a problem hiding this comment.
Handle optional last_ip safely in CD4 CSV formatting.
Line 197 calls .strip() on row.last_ip; if last_ip is None (optional in this API), the responder will throw and fail CD4 payload generation.
Proposed fix
parts: list[str] = []
for row in rows:
- host = row.last_ip.strip() or "0.0.0.0"
+ host = (row.last_ip or "").strip() or "0.0.0.0"
parts.append(
f"slv_t={_cd4_escape_field(row.device_type)},slv_id={_cd4_escape_field(row.consumer_id)},"
f"slv_ip={_cd4_escape_field(host)},slv_p={row.phase}"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for row in rows: | |
| host = row.last_ip.strip() or "0.0.0.0" | |
| parts.append( | |
| f"slv_t={_cd4_escape_field(row.device_type)},slv_id={_cd4_escape_field(row.consumer_id)}," | |
| f"slv_ip={_cd4_escape_field(host)},slv_p={row.phase}" | |
| ) | |
| for row in rows: | |
| host = (row.last_ip or "").strip() or "0.0.0.0" | |
| parts.append( | |
| f"slv_t={_cd4_escape_field(row.device_type)},slv_id={_cd4_escape_field(row.consumer_id)}," | |
| f"slv_ip={_cd4_escape_field(host)},slv_p={row.phase}" | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/astrameter/mqtt_insights/marstek_mqtt.py` around lines 196 - 201, The
loop that builds CD4 CSV lines calls .strip() on row.last_ip which can be None;
change the host extraction in the loop that contains parts.append so it safely
handles optional last_ip (e.g., treat None as empty string before stripping and
then default to "0.0.0.0"), then pass that safe host value into
_cd4_escape_field in the parts.append call to avoid raising on None.
- Treat empty MARSTEK_MQTT_ENABLED as missing so a blank key doesn't raise. - Bound _marstek_get_values' wait_for_next_message with a 2 s timeout so a quiet/offline powermeter can't pin a poll responder task; fall back to last-known values on timeout. - Rebuild managed_marstek after a config-driven restart so MAC/version wiring tracks the reloaded config and device_types. - Serialize Marstek poll handlers per binding: spawn a task only when none is already in flight for that device_id, preventing overlapping reads on the same powermeter from broadcast and message paths. - Cancel any in-flight per-device poll task in unregister_marstek and re-check the active binding before publishing so a stale handler can't emit a reply for a removed device. - transform_test: use distinct AsyncMocks for raw vs. transformed paths and assert the raw mock was awaited.
Summary
Adds a Marstek MQTT responder feature to the MQTT Insights service that allows AstraMeter to answer Marstek CT002/CT003 poll requests on the local MQTT broker. When combined with hame-relay bridging to the Marstek cloud, this surfaces the emulator in the Marstek mobile app as if it were a real device.
Key Changes
New
marstek_mqtt.pymodule: Pure helper functions and dataclass for the Marstek MQTT protocolMarstekMqttBinding: Dataclass holding per-device registration (device_id, ct_type, MAC, get_values callback, wifi_rssi, ver_v)app_topics_for(),device_topics_for(),parse_app_topic()is_poll_payload(),build_response(),normalize_mac()marstek_mqtt_test.pyExtended
MqttInsightsService:marstek_mqtt_enabledconfig option (default: true)register_marstek()andunregister_marstek()for device lifecycle management_handle_marstek_message()that dispatches polls quickly_serve_marstek_poll()that offloads to async tasks to prevent slow powermeters from blocking the listener loopIntegration in
main.py:marstek_macthrough device lifecycleConfiguration:
MARSTEK_MQTT_ENABLEDconfig option to[MQTT_INSIGHTS]sectionImplementation Details
hame_energy/andmarstek_energy/topic prefixes for compatibilityhttps://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e
Summary by CodeRabbit
Release Notes
New Features
Documentation
MARSTEK_MQTT_ENABLEDandMARSTEK_MQTT_INTERVALsettings.Tests