diff --git a/plugwise_usb/nodes/circle.py b/plugwise_usb/nodes/circle.py index ea0150259..bdf555688 100644 --- a/plugwise_usb/nodes/circle.py +++ b/plugwise_usb/nodes/circle.py @@ -5,7 +5,7 @@ from asyncio import Task, create_task from collections.abc import Awaitable, Callable from dataclasses import replace -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from functools import wraps import logging from typing import Any, Final, TypeVar, cast @@ -24,6 +24,7 @@ ) from ..connection import StickController from ..constants import ( + DAY_IN_HOURS, DEFAULT_CONS_INTERVAL, MAX_TIME_DRIFT, MINIMAL_POWER_UPDATE, @@ -364,7 +365,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 # Always request last energy log records at initial startup if not self._last_energy_log_requested: - self._last_energy_log_requested, _ = await self.energy_log_update( + self._last_energy_log_requested = await self.energy_log_update( self._current_log_address ) @@ -378,7 +379,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return None # Try collecting energy-stats for _current_log_address - result, _ = await self.energy_log_update(self._current_log_address) + result = await self.energy_log_update(self._current_log_address) if not result: _LOGGER.debug( "async_energy_update | %s | Log rollover | energy_log_update failed", @@ -392,7 +393,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 _prev_log_address, _ = calc_log_address( self._current_log_address, 1, -4 ) - result, _ = await self.energy_log_update(_prev_log_address) + result = await self.energy_log_update(_prev_log_address) if not result: _LOGGER.debug( "async_energy_update | %s | Log rollover | energy_log_update %s failed", @@ -413,7 +414,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return self._energy_counters.energy_statistics if len(missing_addresses) == 1: - result, _ = await self.energy_log_update(missing_addresses[0]) + result = await self.energy_log_update(missing_addresses[0]) if result: await self.power_update() _LOGGER.debug( @@ -462,54 +463,16 @@ async def _get_initial_energy_logs(self) -> None: ) total_addresses = 11 log_address = self._current_log_address - prev_address_timestamp: datetime | None = None while total_addresses > 0: - result, empty_log = await self.energy_log_update(log_address) - if result and empty_log: + result = await self.energy_log_update(log_address) + if not result: # Handle case with None-data in all address slots _LOGGER.debug( - "Energy None-data collected from log address %s, stopping collection", + "All slots at log address %s are empty or outdated – stopping initial collection", log_address, ) break - # Check if the most recent timestamp of an earlier address is recent - # (within 2/4 * log_interval plus 5 mins margin) - log_interval = self.energy_consumption_interval - factor = 2 if self.energy_production_interval is not None else 4 - - if log_interval is not None: - max_gap_minutes = (factor * log_interval) + 5 - if log_address == self._current_log_address: - if ( - self._last_collected_energy_timestamp is not None - and ( - datetime.now(tz=UTC) - self._last_collected_energy_timestamp - ).total_seconds() - // 60 - > max_gap_minutes - ): - _LOGGER.debug( - "Energy data collected from the current log address is outdated, stopping collection" - ) - break - elif ( - prev_address_timestamp is not None - and self._last_collected_energy_timestamp is not None - and ( - prev_address_timestamp - self._last_collected_energy_timestamp - ).total_seconds() - // 60 - > max_gap_minutes - ): - _LOGGER.debug( - "Collected energy data is outdated, stopping collection" - ) - break - - if self._last_collected_energy_timestamp is not None: - prev_address_timestamp = self._last_collected_energy_timestamp - log_address, _ = calc_log_address(log_address, 1, -4) total_addresses -= 1 @@ -544,30 +507,27 @@ async def get_missing_energy_logs(self) -> None: if self._cache_enabled: await self._energy_log_records_save_to_cache() - async def energy_log_update(self, address: int | None) -> tuple[bool, bool]: - """Request energy log statistics from node. Returns true if successful.""" - empty_log = False - result = False + async def energy_log_update(self, address: int | None) -> bool: + """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False.""" + any_record_stored = False if address is None: - return result, empty_log + return False _LOGGER.debug( - "Request of energy log at address %s for node %s", - str(address), - self.name, + "Requesting EnergyLogs from node %s address %s", + self._mac_in_str, + address, ) request = CircleEnergyLogsRequest(self._send, self._mac_in_bytes, address) if (response := await request.send()) is None: _LOGGER.debug( - "Retrieving of energy log at address %s for node %s failed", - str(address), + "Retrieving EnergyLogs data from node %s failed", self._mac_in_str, ) - return result, empty_log + return False - _LOGGER.debug("EnergyLogs data from %s, address=%s", self._mac_in_str, address) + _LOGGER.debug("EnergyLogs from node %s, address=%s:", self._mac_in_str, address) await self._available_update_state(True, response.timestamp) - energy_record_update = False # Forward historical energy log information to energy counters # Each response message contains 4 log counters (slots) of the @@ -578,17 +538,23 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]: _LOGGER.debug( "In slot=%s: pulses=%s, timestamp=%s", _slot, log_pulses, log_timestamp ) - if log_timestamp is None or log_pulses is None: + if ( + log_timestamp is None + or log_pulses is None + # Don't store an old log-record, store am empty record instead + or not self._check_timestamp_is_recent(address, _slot, log_timestamp) + ): self._energy_counters.add_empty_log(response.log_address, _slot) - empty_log = True - elif await self._energy_log_record_update_state( + continue + + if await self._energy_log_record_update_state( response.log_address, _slot, log_timestamp.replace(tzinfo=UTC), log_pulses, import_only=True, ): - energy_record_update = True + any_record_stored = True if not last_energy_timestamp_collected: # Collect the timestamp of the most recent response self._last_collected_energy_timestamp = log_timestamp.replace( @@ -600,24 +566,44 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]: ) last_energy_timestamp_collected = True - result = True self._energy_counters.update() - if energy_record_update: + if any_record_stored: _LOGGER.debug( "Saving energy record update to cache for %s", self._mac_in_str ) await self.save_cache() - return result, empty_log + return any_record_stored + + def _check_timestamp_is_recent( + self, address: int, slot: int, timestamp: datetime + ) -> bool: + """Check if the timestamp of the received log-record is recent. - async def _energy_log_records_load_from_cache(self) -> bool: + A timestamp from within the last 24 hours is considered recent. + """ + age_seconds = ( + datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC) + ).total_seconds() + if age_seconds > DAY_IN_HOURS * 3600: + _LOGGER.warning( + "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...", + self._mac_in_str, + address, + slot, + timestamp, + ) + return False + return True + + async def _energy_log_records_load_from_cache(self) -> bool: # noqa: PLR0912 """Load energy_log_record from cache.""" if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None: _LOGGER.warning( "Failed to restore energy log records from cache for node %s", self.name ) return False - restored_logs: dict[int, list[int]] = {} + restored_logs: dict[int, dict[int, tuple[datetime, int]]] = {} if cache_data == "": _LOGGER.debug("Cache-record is empty") return False @@ -630,24 +616,41 @@ async def _energy_log_records_load_from_cache(self) -> bool: if len(timestamp_energy_log) == 6: address = int(log_fields[0]) slot = int(log_fields[1]) - self._energy_counters.add_pulse_log( - address=address, - slot=slot, - timestamp=datetime( - year=int(timestamp_energy_log[0]), - month=int(timestamp_energy_log[1]), - day=int(timestamp_energy_log[2]), - hour=int(timestamp_energy_log[3]), - minute=int(timestamp_energy_log[4]), - second=int(timestamp_energy_log[5]), - tzinfo=UTC, - ), - pulses=int(log_fields[3]), - import_only=True, + pulses = int(log_fields[3]) + timestamp = datetime( + year=int(timestamp_energy_log[0]), + month=int(timestamp_energy_log[1]), + day=int(timestamp_energy_log[2]), + hour=int(timestamp_energy_log[3]), + minute=int(timestamp_energy_log[4]), + second=int(timestamp_energy_log[5]), + tzinfo=UTC, ) if restored_logs.get(address) is None: - restored_logs[address] = [] - restored_logs[address].append(slot) + restored_logs[address] = {} + restored_logs[address][slot] = (timestamp, pulses) + + # Sort and prune the records loaded from cache + sorted_logs: dict[int, dict[int, tuple[datetime, int]]] = {} + skip_before = datetime.now(tz=UTC) - timedelta(hours=DAY_IN_HOURS) + sorted_addresses = sorted(restored_logs.keys(), reverse=True) + for address in sorted_addresses: + sorted_slots = sorted(restored_logs[address].keys(), reverse=True) + for slot in sorted_slots: + if restored_logs[address][slot][0] > skip_before: + if sorted_logs.get(address) is None: + sorted_logs[address] = {} + sorted_logs[address][slot] = restored_logs[address][slot] + + for address, data in sorted_logs.items(): + for slot, pulse_data in data.items(): + self._energy_counters.add_pulse_log( + address=address, + slot=slot, + pulses=pulse_data[1], + timestamp=pulse_data[0], + import_only=True, + ) self._energy_counters.update() @@ -677,9 +680,10 @@ async def _energy_log_records_save_to_cache(self) -> None: self._energy_counters.get_pulse_logs() ) cached_logs = "" - for address in sorted(logs.keys(), reverse=True): - for slot in sorted(logs[address].keys(), reverse=True): - log = logs[address][slot] + # logs is already sorted in reverse + for address, record in logs.items(): + for slot in record: + log = record[slot] if cached_logs != "": cached_logs += "|" cached_logs += f"{address}:{slot}:{log.timestamp.year}" @@ -718,7 +722,7 @@ async def _energy_log_record_update_state( self._mac_in_str, ) self._set_cache( - CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record + CACHE_ENERGY_COLLECTION, log_cache_record + "|" + cached_logs ) return True diff --git a/pyproject.toml b/pyproject.toml index cb97c1a36..3779f53df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.10" +version = "0.44.11a1" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [