Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 92 additions & 88 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task, create_task
from collections.abc import Awaitable, Callable
from dataclasses import replace
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from functools import wraps
import logging
from typing import Any, Final, TypeVar, cast
Expand All @@ -24,6 +24,7 @@
)
from ..connection import StickController
from ..constants import (
DAY_IN_HOURS,
DEFAULT_CONS_INTERVAL,
MAX_TIME_DRIFT,
MINIMAL_POWER_UPDATE,
Expand Down Expand Up @@ -364,7 +365,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09

# Always request last energy log records at initial startup
if not self._last_energy_log_requested:
self._last_energy_log_requested, _ = await self.energy_log_update(
self._last_energy_log_requested = await self.energy_log_update(
self._current_log_address
)

Expand All @@ -378,7 +379,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return None

# Try collecting energy-stats for _current_log_address
result, _ = await self.energy_log_update(self._current_log_address)
result = await self.energy_log_update(self._current_log_address)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update failed",
Expand All @@ -392,7 +393,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
_prev_log_address, _ = calc_log_address(
self._current_log_address, 1, -4
)
result, _ = await self.energy_log_update(_prev_log_address)
result = await self.energy_log_update(_prev_log_address)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update %s failed",
Expand All @@ -413,7 +414,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return self._energy_counters.energy_statistics

if len(missing_addresses) == 1:
result, _ = await self.energy_log_update(missing_addresses[0])
result = await self.energy_log_update(missing_addresses[0])
if result:
await self.power_update()
_LOGGER.debug(
Expand Down Expand Up @@ -462,54 +463,16 @@ async def _get_initial_energy_logs(self) -> None:
)
total_addresses = 11
log_address = self._current_log_address
prev_address_timestamp: datetime | None = None
while total_addresses > 0:
result, empty_log = await self.energy_log_update(log_address)
if result and empty_log:
result = await self.energy_log_update(log_address)
if not result:
# Handle case with None-data in all address slots
_LOGGER.debug(
"Energy None-data collected from log address %s, stopping collection",
"All slots at log address %s are empty or outdated – stopping initial collection",
log_address,
)
break

# Check if the most recent timestamp of an earlier address is recent
# (within 2/4 * log_interval plus 5 mins margin)
log_interval = self.energy_consumption_interval
factor = 2 if self.energy_production_interval is not None else 4

if log_interval is not None:
max_gap_minutes = (factor * log_interval) + 5
if log_address == self._current_log_address:
if (
self._last_collected_energy_timestamp is not None
and (
datetime.now(tz=UTC) - self._last_collected_energy_timestamp
).total_seconds()
// 60
> max_gap_minutes
):
_LOGGER.debug(
"Energy data collected from the current log address is outdated, stopping collection"
)
break
elif (
prev_address_timestamp is not None
and self._last_collected_energy_timestamp is not None
and (
prev_address_timestamp - self._last_collected_energy_timestamp
).total_seconds()
// 60
> max_gap_minutes
):
_LOGGER.debug(
"Collected energy data is outdated, stopping collection"
)
break

if self._last_collected_energy_timestamp is not None:
prev_address_timestamp = self._last_collected_energy_timestamp

log_address, _ = calc_log_address(log_address, 1, -4)
total_addresses -= 1

Expand Down Expand Up @@ -544,30 +507,27 @@ async def get_missing_energy_logs(self) -> None:
if self._cache_enabled:
await self._energy_log_records_save_to_cache()

async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
"""Request energy log statistics from node. Returns true if successful."""
empty_log = False
result = False
async def energy_log_update(self, address: int | None) -> bool:
"""Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
any_record_stored = False
if address is None:
return result, empty_log
return False

_LOGGER.debug(
"Request of energy log at address %s for node %s",
str(address),
self.name,
"Requesting EnergyLogs from node %s address %s",
self._mac_in_str,
address,
)
request = CircleEnergyLogsRequest(self._send, self._mac_in_bytes, address)
if (response := await request.send()) is None:
_LOGGER.debug(
"Retrieving of energy log at address %s for node %s failed",
str(address),
"Retrieving EnergyLogs data from node %s failed",
self._mac_in_str,
)
return result, empty_log
return False

_LOGGER.debug("EnergyLogs data from %s, address=%s", self._mac_in_str, address)
_LOGGER.debug("EnergyLogs from node %s, address=%s:", self._mac_in_str, address)
await self._available_update_state(True, response.timestamp)
energy_record_update = False

# Forward historical energy log information to energy counters
# Each response message contains 4 log counters (slots) of the
Expand All @@ -578,17 +538,23 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
_LOGGER.debug(
"In slot=%s: pulses=%s, timestamp=%s", _slot, log_pulses, log_timestamp
)
if log_timestamp is None or log_pulses is None:
if (
log_timestamp is None
or log_pulses is None
# Don't store an old log-record, store am empty record instead
or not self._check_timestamp_is_recent(address, _slot, log_timestamp)
):
self._energy_counters.add_empty_log(response.log_address, _slot)
empty_log = True
elif await self._energy_log_record_update_state(
continue

if await self._energy_log_record_update_state(
response.log_address,
_slot,
log_timestamp.replace(tzinfo=UTC),
log_pulses,
import_only=True,
):
energy_record_update = True
any_record_stored = True
if not last_energy_timestamp_collected:
# Collect the timestamp of the most recent response
self._last_collected_energy_timestamp = log_timestamp.replace(
Expand All @@ -600,24 +566,44 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
)
last_energy_timestamp_collected = True

result = True
self._energy_counters.update()
if energy_record_update:
if any_record_stored:
_LOGGER.debug(
"Saving energy record update to cache for %s", self._mac_in_str
)
await self.save_cache()

return result, empty_log
return any_record_stored

def _check_timestamp_is_recent(
self, address: int, slot: int, timestamp: datetime
) -> bool:
"""Check if the timestamp of the received log-record is recent.

async def _energy_log_records_load_from_cache(self) -> bool:
A timestamp from within the last 24 hours is considered recent.
"""
age_seconds = (
datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)
).total_seconds()
if age_seconds > DAY_IN_HOURS * 3600:
_LOGGER.warning(
"EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...",
self._mac_in_str,
address,
slot,
timestamp,
)
return False
return True

async def _energy_log_records_load_from_cache(self) -> bool: # noqa: PLR0912
"""Load energy_log_record from cache."""
if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None:
_LOGGER.warning(
"Failed to restore energy log records from cache for node %s", self.name
)
return False
restored_logs: dict[int, list[int]] = {}
restored_logs: dict[int, dict[int, tuple[datetime, int]]] = {}
if cache_data == "":
_LOGGER.debug("Cache-record is empty")
return False
Expand All @@ -630,24 +616,41 @@ async def _energy_log_records_load_from_cache(self) -> bool:
if len(timestamp_energy_log) == 6:
address = int(log_fields[0])
slot = int(log_fields[1])
self._energy_counters.add_pulse_log(
address=address,
slot=slot,
timestamp=datetime(
year=int(timestamp_energy_log[0]),
month=int(timestamp_energy_log[1]),
day=int(timestamp_energy_log[2]),
hour=int(timestamp_energy_log[3]),
minute=int(timestamp_energy_log[4]),
second=int(timestamp_energy_log[5]),
tzinfo=UTC,
),
pulses=int(log_fields[3]),
import_only=True,
pulses = int(log_fields[3])
timestamp = datetime(
year=int(timestamp_energy_log[0]),
month=int(timestamp_energy_log[1]),
day=int(timestamp_energy_log[2]),
hour=int(timestamp_energy_log[3]),
minute=int(timestamp_energy_log[4]),
second=int(timestamp_energy_log[5]),
tzinfo=UTC,
)
if restored_logs.get(address) is None:
restored_logs[address] = []
restored_logs[address].append(slot)
restored_logs[address] = {}
restored_logs[address][slot] = (timestamp, pulses)

# Sort and prune the records loaded from cache
sorted_logs: dict[int, dict[int, tuple[datetime, int]]] = {}
skip_before = datetime.now(tz=UTC) - timedelta(hours=DAY_IN_HOURS)
sorted_addresses = sorted(restored_logs.keys(), reverse=True)
for address in sorted_addresses:
sorted_slots = sorted(restored_logs[address].keys(), reverse=True)
for slot in sorted_slots:
if restored_logs[address][slot][0] > skip_before:
if sorted_logs.get(address) is None:
sorted_logs[address] = {}
sorted_logs[address][slot] = restored_logs[address][slot]

for address, data in sorted_logs.items():
for slot, pulse_data in data.items():
self._energy_counters.add_pulse_log(
address=address,
slot=slot,
pulses=pulse_data[1],
timestamp=pulse_data[0],
import_only=True,
)

self._energy_counters.update()

Expand Down Expand Up @@ -677,9 +680,10 @@ async def _energy_log_records_save_to_cache(self) -> None:
self._energy_counters.get_pulse_logs()
)
cached_logs = ""
for address in sorted(logs.keys(), reverse=True):
for slot in sorted(logs[address].keys(), reverse=True):
log = logs[address][slot]
# logs is already sorted in reverse
for address, record in logs.items():
for slot in record:
log = record[slot]
if cached_logs != "":
cached_logs += "|"
cached_logs += f"{address}:{slot}:{log.timestamp.year}"
Expand Down Expand Up @@ -718,7 +722,7 @@ async def _energy_log_record_update_state(
self._mac_in_str,
)
self._set_cache(
CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record
CACHE_ENERGY_COLLECTION, log_cache_record + "|" + cached_logs
)
return True

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "plugwise_usb"
version = "0.44.10"
version = "0.44.11a1"
license = "MIT"
keywords = ["home", "automation", "plugwise", "module", "usb"]
classifiers = [
Expand Down