diff --git a/CHANGELOG.md b/CHANGELOG.md index 713130353..1099fbbba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Ongoing + +- Improve energy-collection via PR [311](https://github.com/plugwise/python-plugwise-usb/pull/311) + ## v0.44.10 - 2025-08-11 - PR [302](https://github.com/plugwise/python-plugwise-usb/pull/302) Improve registry discovery and SED/SCAN configuration management diff --git a/plugwise_usb/nodes/circle.py b/plugwise_usb/nodes/circle.py index ea0150259..8d79d9c75 100644 --- a/plugwise_usb/nodes/circle.py +++ b/plugwise_usb/nodes/circle.py @@ -2,12 +2,13 @@ from __future__ import annotations -from asyncio import Task, create_task +from asyncio import Task, create_task, gather from collections.abc import Awaitable, Callable from dataclasses import replace from datetime import UTC, datetime from functools import wraps import logging +from math import ceil from typing import Any, Final, TypeVar, cast from ..api import ( @@ -24,6 +25,7 @@ ) from ..connection import StickController from ..constants import ( + DAY_IN_HOURS, DEFAULT_CONS_INTERVAL, MAX_TIME_DRIFT, MINIMAL_POWER_UPDATE, @@ -72,6 +74,8 @@ # Default firmware if not known DEFAULT_FIRMWARE: Final = datetime(2008, 8, 26, 15, 46, tzinfo=UTC) +MAX_LOG_HOURS = DAY_IN_HOURS + FuncT = TypeVar("FuncT", bound=Callable[..., Any]) _LOGGER = logging.getLogger(__name__) @@ -114,7 +118,6 @@ def __init__( self._energy_counters = EnergyCounters(mac) self._retrieve_energy_logs_task: None | Task[None] = None self._last_energy_log_requested: bool = False - self._last_collected_energy_timestamp: datetime | None = None self._group_member: list[int] = [] @@ -364,8 +367,8 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 # Always request last energy log records at initial startup if not self._last_energy_log_requested: - self._last_energy_log_requested, _ = await self.energy_log_update( - self._current_log_address + self._last_energy_log_requested = await self.energy_log_update( + self._current_log_address, save_cache=False ) if self._energy_counters.log_rollover: @@ -378,26 +381,25 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return None # Try collecting energy-stats for _current_log_address - result, _ = await self.energy_log_update(self._current_log_address) + result = await self.energy_log_update(self._current_log_address, save_cache=True) if not result: _LOGGER.debug( - "async_energy_update | %s | Log rollover | energy_log_update failed", + "async_energy_update | %s | Log rollover | energy_log_update from address %s failed", self._mac_in_str, + self._current_log_address, ) return None if self._current_log_address is not None: # Retry with previous log address as Circle node pointer to self._current_log_address # could be rolled over while the last log is at previous address/slot - _prev_log_address, _ = calc_log_address( - self._current_log_address, 1, -4 - ) - result, _ = await self.energy_log_update(_prev_log_address) + prev_log_address, _ = calc_log_address(self._current_log_address, 1, -4) + result = await self.energy_log_update(prev_log_address, save_cache=True) if not result: _LOGGER.debug( - "async_energy_update | %s | Log rollover | energy_log_update %s failed", + "async_energy_update | %s | Log rollover | energy_log_update from address %s failed", self._mac_in_str, - _prev_log_address, + prev_log_address, ) return None @@ -413,7 +415,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return self._energy_counters.energy_statistics if len(missing_addresses) == 1: - result, _ = await self.energy_log_update(missing_addresses[0]) + result = await self.energy_log_update(missing_addresses[0], save_cache=True) if result: await self.power_update() _LOGGER.debug( @@ -452,64 +454,38 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return None async def _get_initial_energy_logs(self) -> None: - """Collect initial energy logs from the last 10 log addresses.""" + """Collect initial energy logs for recent hours up to MAX_LOG_HOURS.""" if self._current_log_address is None: return + if self.energy_consumption_interval is None: + return + _LOGGER.debug( - "Start collecting initial energy logs from the last 10 log addresses for node %s.", + "Start collecting today's energy logs for node %s.", self._mac_in_str, ) - total_addresses = 11 + + # When only consumption is measured, 1 address contains data from 4 hours + # When both consumption and production are measured, 1 address contains data from 2 hours + cons_only = self.energy_production_interval is None + factor = 4 if cons_only else 2 + max_addresses_to_collect = MAX_LOG_HOURS // factor + total_addresses = min( + max_addresses_to_collect, ceil(datetime.now(tz=UTC).hour / factor) + 1 + ) log_address = self._current_log_address - prev_address_timestamp: datetime | None = None while total_addresses > 0: - result, empty_log = await self.energy_log_update(log_address) - if result and empty_log: - # Handle case with None-data in all address slots + result = await self.energy_log_update(log_address, save_cache=False) + if not result: + # Stop initial log collection when an address contains no (None) or outdated data + # Outdated data can indicate a EnergyLog address rollover: from address 6014 to 0 _LOGGER.debug( - "Energy None-data collected from log address %s, stopping collection", + "All slots at log address %s are empty or outdated – stopping initial collection", log_address, ) break - # Check if the most recent timestamp of an earlier address is recent - # (within 2/4 * log_interval plus 5 mins margin) - log_interval = self.energy_consumption_interval - factor = 2 if self.energy_production_interval is not None else 4 - - if log_interval is not None: - max_gap_minutes = (factor * log_interval) + 5 - if log_address == self._current_log_address: - if ( - self._last_collected_energy_timestamp is not None - and ( - datetime.now(tz=UTC) - self._last_collected_energy_timestamp - ).total_seconds() - // 60 - > max_gap_minutes - ): - _LOGGER.debug( - "Energy data collected from the current log address is outdated, stopping collection" - ) - break - elif ( - prev_address_timestamp is not None - and self._last_collected_energy_timestamp is not None - and ( - prev_address_timestamp - self._last_collected_energy_timestamp - ).total_seconds() - // 60 - > max_gap_minutes - ): - _LOGGER.debug( - "Collected energy data is outdated, stopping collection" - ) - break - - if self._last_collected_energy_timestamp is not None: - prev_address_timestamp = self._last_collected_energy_timestamp - log_address, _ = calc_log_address(log_address, 1, -4) total_addresses -= 1 @@ -535,80 +511,98 @@ async def get_missing_energy_logs(self) -> None: ) missing_addresses = sorted(missing_addresses, reverse=True) tasks = [ - create_task(self.energy_log_update(address)) + create_task(self.energy_log_update(address, save_cache=False)) for address in missing_addresses ] - for task in tasks: - await task + for idx, task in enumerate(tasks): + result = await task + # When an energy log collection task returns False, stop and cancel the remaining tasks + if not result: + to_cancel = tasks[idx + 1 :] + for t in to_cancel: + t.cancel() + # Drain cancellations to avoid "Task exception was never retrieved" + await gather(*to_cancel, return_exceptions=True) + break if self._cache_enabled: await self._energy_log_records_save_to_cache() - async def energy_log_update(self, address: int | None) -> tuple[bool, bool]: - """Request energy log statistics from node. Returns true if successful.""" - empty_log = False - result = False + async def energy_log_update(self, address: int | None, save_cache: bool = True) -> bool: + """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False.""" + any_record_stored = False if address is None: - return result, empty_log + return False _LOGGER.debug( - "Request of energy log at address %s for node %s", - str(address), - self.name, + "Requesting EnergyLogs from node %s address %s", + self._mac_in_str, + address, ) request = CircleEnergyLogsRequest(self._send, self._mac_in_bytes, address) if (response := await request.send()) is None: _LOGGER.debug( - "Retrieving of energy log at address %s for node %s failed", - str(address), + "Retrieving EnergyLogs data from node %s failed", self._mac_in_str, ) - return result, empty_log + return False - _LOGGER.debug("EnergyLogs data from %s, address=%s", self._mac_in_str, address) + _LOGGER.debug("EnergyLogs from node %s, address=%s:", self._mac_in_str, address) await self._available_update_state(True, response.timestamp) - energy_record_update = False # Forward historical energy log information to energy counters # Each response message contains 4 log counters (slots) of the # energy pulses collected during the previous hour of given timestamp - last_energy_timestamp_collected = False for _slot in range(4, 0, -1): log_timestamp, log_pulses = response.log_data[_slot] _LOGGER.debug( "In slot=%s: pulses=%s, timestamp=%s", _slot, log_pulses, log_timestamp ) - if log_timestamp is None or log_pulses is None: + if ( + log_timestamp is None + or log_pulses is None + # Don't store an old log record; store an empty record instead + or not self._check_timestamp_is_recent(address, _slot, log_timestamp) + ): self._energy_counters.add_empty_log(response.log_address, _slot) - empty_log = True - elif await self._energy_log_record_update_state( + continue + + await self._energy_log_record_update_state( response.log_address, _slot, log_timestamp.replace(tzinfo=UTC), log_pulses, import_only=True, - ): - energy_record_update = True - if not last_energy_timestamp_collected: - # Collect the timestamp of the most recent response - self._last_collected_energy_timestamp = log_timestamp.replace( - tzinfo=UTC - ) - _LOGGER.debug( - "Setting last_collected_energy_timestamp to %s", - self._last_collected_energy_timestamp, - ) - last_energy_timestamp_collected = True + ) + any_record_stored = True - result = True self._energy_counters.update() - if energy_record_update: + if any_record_stored and self._cache_enabled and save_cache: _LOGGER.debug( "Saving energy record update to cache for %s", self._mac_in_str ) await self.save_cache() - return result, empty_log + return any_record_stored + + def _check_timestamp_is_recent( + self, address: int, slot: int, timestamp: datetime + ) -> bool: + """Check if a log record timestamp is within the last MAX_LOG_HOURS hours.""" + age_seconds = max( + 0.0, + (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds() + ) + if age_seconds > MAX_LOG_HOURS * 3600: + _LOGGER.warning( + "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...", + self._mac_in_str, + address, + slot, + timestamp, + ) + return False + return True async def _energy_log_records_load_from_cache(self) -> bool: """Load energy_log_record from cache."""