Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Ongoing

- PR [341](https://github.com/plugwise/python-plugwise-usb/pull/341): Schedule clock synchronization every 3600 seconds
- PR [342](https://github.com/plugwise/python-plugwise-usb/pull/342): Improve node_type chaching.

## 0.46.0 - 2025-09-12
Expand Down
8 changes: 6 additions & 2 deletions plugwise_usb/network/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ async def save_cache(self) -> None:
mac: node_type.name for mac, node_type in self._nodetypes.items()
}
_LOGGER.debug("Save NodeTypes for %s Nodes", len(cache_data_to_save))
await self.write_cache(cache_data_to_save, rewrite=True) # Make sure the cache-contents is actual
await self.write_cache(
cache_data_to_save, rewrite=True
) # Make sure the cache-contents is actual

async def clear_cache(self) -> None:
"""Clear current cache."""
Expand All @@ -54,7 +56,9 @@ async def restore_cache(self) -> None:
node_type = None

if node_type is None:
_LOGGER.warning("Invalid NodeType in cache for mac %s: %s", mac, node_value)
_LOGGER.warning(
"Invalid NodeType in cache for mac %s: %s", mac, node_value
)
continue
self._nodetypes[mac] = node_type
_LOGGER.debug(
Expand Down
44 changes: 36 additions & 8 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from __future__ import annotations

from asyncio import Task, create_task, gather
from asyncio import CancelledError, Task, create_task, gather, sleep
from collections.abc import Awaitable, Callable
from dataclasses import replace
from datetime import UTC, datetime, timedelta
from functools import wraps
import logging
import random
from math import ceil
from typing import Any, Final, TypeVar, cast

Expand Down Expand Up @@ -74,7 +75,9 @@
# Default firmware if not known
DEFAULT_FIRMWARE: Final = datetime(2008, 8, 26, 15, 46, tzinfo=UTC)

MAX_LOG_HOURS = DAY_IN_HOURS
MAX_LOG_HOURS: Final = DAY_IN_HOURS

CLOCK_SYNC_PERIOD: Final = 3600

FuncT = TypeVar("FuncT", bound=Callable[..., Any])
_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -141,6 +144,8 @@ def __init__(
"""Initialize base class for Sleeping End Device."""
super().__init__(mac, node_type, controller, loaded_callback)

# Clock
self._clock_synchronize_task: Task[None] | None = None
# Relay
self._relay_lock: RelayLock = RelayLock()
self._relay_state: RelayState = RelayState()
Expand Down Expand Up @@ -852,6 +857,21 @@ async def _relay_update_lock(
)
await self.save_cache()

async def _clock_synchronize_scheduler(self) -> None:
"""Background task: periodically synchronize the clock until cancelled."""
try:
while True:
await sleep(CLOCK_SYNC_PERIOD + random.uniform(-5, 5))
try:
await self.clock_synchronize()
except Exception:
_LOGGER.exception(
"Clock synchronization failed for %s", self._mac_in_str
)
except CancelledError:
_LOGGER.debug("Clock sync scheduler cancelled for %s", self._mac_in_str)
raise

async def clock_synchronize(self) -> bool:
"""Synchronize clock. Returns true if successful."""
get_clock_request = CircleClockGetRequest(self._send, self._mac_in_bytes)
Expand All @@ -866,18 +886,17 @@ async def clock_synchronize(self) -> bool:
tzinfo=UTC,
)
clock_offset = clock_response.timestamp.replace(microsecond=0) - _dt_of_circle
if (clock_offset.seconds < MAX_TIME_DRIFT) or (
clock_offset.seconds > -(MAX_TIME_DRIFT)
):
if abs(clock_offset.total_seconds()) < MAX_TIME_DRIFT:
return True
_LOGGER.info(
"Reset clock of node %s because time has drifted %s sec",
"Reset clock of node %s because time drifted %s seconds (max %s seconds)",
self._mac_in_str,
str(clock_offset.seconds),
str(int(abs(clock_offset.total_seconds()))),
str(MAX_TIME_DRIFT),
)
if self._node_protocols is None:
raise NodeError(
"Unable to synchronize clock en when protocol version is unknown"
"Unable to synchronize clock when protocol version is unknown"
)
set_clock_request = CircleClockSetRequest(
self._send,
Expand Down Expand Up @@ -992,6 +1011,10 @@ async def initialize(self) -> bool:
)
self._initialized = False
return False
if self._clock_synchronize_task is None or self._clock_synchronize_task.done():
self._clock_synchronize_task = create_task(
self._clock_synchronize_scheduler()
)

if not self._calibration and not await self.calibration_update():
_LOGGER.debug(
Expand Down Expand Up @@ -1082,6 +1105,11 @@ async def unload(self) -> None:
if self._cache_enabled:
await self._energy_log_records_save_to_cache()

if self._clock_synchronize_task:
self._clock_synchronize_task.cancel()
await gather(self._clock_synchronize_task, return_exceptions=True)
self._clock_synchronize_task = None

await super().unload()

@raise_not_loaded
Expand Down
6 changes: 2 additions & 4 deletions plugwise_usb/nodes/circle_plus.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,12 @@ async def clock_synchronize(self) -> bool:
tzinfo=UTC,
)
clock_offset = clock_response.timestamp.replace(microsecond=0) - _dt_of_circle
if (clock_offset.seconds < MAX_TIME_DRIFT) or (
clock_offset.seconds > -(MAX_TIME_DRIFT)
):
if abs(clock_offset.total_seconds()) < MAX_TIME_DRIFT:
return True
_LOGGER.info(
"Reset realtime clock of node %s because time has drifted %s seconds while max drift is set to %s seconds)",
self._node_info.mac,
str(clock_offset.seconds),
str(int(abs(clock_offset.total_seconds()))),
str(MAX_TIME_DRIFT),
)
clock_set_request = CirclePlusRealTimeClockSetRequest(
Expand Down
130 changes: 130 additions & 0 deletions tests/stick_test_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
"""Stick Test Program."""

import asyncio
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
import importlib
import logging
import random

import crcmod

crc_fun = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0x0000, xorOut=0x0000)


_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG)

pw_constants = importlib.import_module("plugwise_usb.constants")

Expand All @@ -12,6 +24,124 @@
# generate energy log timestamps with fixed hour timestamp used in tests
hour_timestamp = utc_now.replace(minute=0, second=0, microsecond=0)


def construct_message(data: bytes, seq_id: bytes = b"0000") -> bytes:
"""Construct plugwise message."""
body = data[:4] + seq_id + data[4:]
return bytes(
pw_constants.MESSAGE_HEADER
+ body
+ bytes(f"{crc_fun(body):04X}", pw_constants.UTF8)
+ pw_constants.MESSAGE_FOOTER
)


class DummyTransport:
"""Dummy transport class."""

protocol_data_received: Callable[[bytes], None]

def __init__(
self,
loop: asyncio.AbstractEventLoop,
test_data: dict[bytes, tuple[str, bytes, bytes | None]] | None = None,
) -> None:
"""Initialize dummy transport class."""
self._loop = loop
self._msg = 0
self._seq_id = b"1233"
self._processed: list[bytes] = []
self._first_response = test_data
self._second_response = test_data
if test_data is None:
self._first_response = RESPONSE_MESSAGES
self._second_response = SECOND_RESPONSE_MESSAGES
self.random_extra_byte = 0
self._closing = False

def inc_seq_id(self, seq_id: bytes | None) -> bytes:
"""Increment sequence id."""
if seq_id is None:
return b"0000"
temp_int = int(seq_id, 16) + 1
if temp_int >= 65532:
temp_int = 0
temp_str = str(hex(temp_int)).lstrip("0x").upper()
while len(temp_str) < 4:
temp_str = "0" + temp_str
return temp_str.encode()

def is_closing(self) -> bool:
"""Close connection."""
return self._closing

def write(self, data: bytes) -> None:
"""Write data back to system."""
log = None
ack = None
response = None
if data in self._processed and self._second_response is not None:
log, ack, response = self._second_response.get(data, (None, None, None))
if log is None and self._first_response is not None:
log, ack, response = self._first_response.get(data, (None, None, None))
if log is None:
resp = PARTLY_RESPONSE_MESSAGES.get(data[:24], (None, None, None))
if resp is None:
_LOGGER.debug("No msg response for %s", str(data))
return
log, ack, response = resp
if ack is None:
_LOGGER.debug("No ack response for %s", str(data))
return

self._seq_id = self.inc_seq_id(self._seq_id)
if response and self._msg == 0:
self.message_response_at_once(ack, response, self._seq_id)
self._processed.append(data)
else:
self.message_response(ack, self._seq_id)
self._processed.append(data)
if response is None or self._closing:
return
self._loop.create_task(self._delayed_response(response, self._seq_id))
self._msg += 1

async def _delayed_response(self, data: bytes, seq_id: bytes) -> None:
delay = random.uniform(0.005, 0.025)
await asyncio.sleep(delay)
self.message_response(data, seq_id)

def message_response(self, data: bytes, seq_id: bytes) -> None:
"""Handle message response."""
self.random_extra_byte += 1
if self.random_extra_byte > 25:
self.protocol_data_received(b"\x83")
self.random_extra_byte = 0
self.protocol_data_received(construct_message(data, seq_id) + b"\x83")
else:
self.protocol_data_received(construct_message(data, seq_id))

def message_response_at_once(self, ack: bytes, data: bytes, seq_id: bytes) -> None:
"""Full message."""
self.random_extra_byte += 1
if self.random_extra_byte > 25:
self.protocol_data_received(b"\x83")
self.random_extra_byte = 0
self.protocol_data_received(
construct_message(ack, seq_id)
+ construct_message(data, seq_id)
+ b"\x83"
)
else:
self.protocol_data_received(
construct_message(ack, seq_id) + construct_message(data, seq_id)
)

def close(self) -> None:
"""Close connection."""
self._closing = True


LOG_TIMESTAMPS = {}
_one_hour = timedelta(hours=1)
for x in range(168):
Expand Down
Loading
Loading