Skip to content

Commit

Permalink
Add read time command for PVVX firmware (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem committed Jul 4, 2023
1 parent 878cb77 commit 1d4b054
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 11 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ Bluetooth Clocks supports the following devices:
| (e.g. PineTime with | | | |
| InfiniTime firmware) | | | |
+--------------------------+------------+-------------------+-----------+
| `PVVX firmware`_ | Yes | No | No |
| (LYWSD03MMC, MHO-C401, | | | (not yet) |
| `PVVX firmware`_ | Yes | No | Yes |
| (LYWSD03MMC, MHO-C401, | | | |
| CGG1, CGDK2, MJWSD05MMC, | | | |
| MHO-C122) | | | |
+--------------------------+------------+-------------------+-----------+
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ select = ["ALL"]
ignore = [
"ANN101", # flake8-annotations
"ANN102",
"ARG002", # flake8-unused-arguments
"ARG001", # flake8-unused-arguments
"ARG002",
"ARG003",
"DTZ001", # flake8-datetimez
"DTZ006",
Expand Down
75 changes: 71 additions & 4 deletions src/bluetooth_clocks/devices/pvvx.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Bluetooth clock support for devices running the PVVX firmware."""
from __future__ import annotations

import asyncio
import logging
import struct
from time import localtime
from typing import TYPE_CHECKING
Expand All @@ -10,7 +12,12 @@
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from bleak import BleakClient, BleakGATTCharacteristic

from bluetooth_clocks import BluetoothClock
from bluetooth_clocks.exceptions import InvalidTimeBytesError

_logger = logging.getLogger(__name__)


class PVVX(BluetoothClock):
Expand All @@ -20,8 +27,8 @@ class PVVX(BluetoothClock):
SERVICE_UUID = UUID("00001f10-0000-1000-8000-00805f9b34fb")
CHAR_UUID = UUID("00001f1f-0000-1000-8000-00805f9b34fb")

TIME_GET_FORMAT = None
"""The PVVX firmware doesn't support reading the time."""
TIME_GET_FORMAT = "<BLL"
"""The format string to convert bytes read from the device to a time."""

TIME_SET_FORMAT = "<BL"
"""The format string to convert a time to bytes written to the PVVX device."""
Expand All @@ -32,6 +39,18 @@ class PVVX(BluetoothClock):
SERVICE_DATA_UUID = UUID("0000181a-0000-1000-8000-00805f9b34fb")
"""UUID of the service data the PVVX device is advertising."""

PVVX_GET_SET_TIME_COMMAND = 0x23
"""Command for PVVX firmware to Get/Set Time."""

def __init__(self, device: BLEDevice) -> None:
"""Create a PVVX object.
Args:
device (BLEDevice): The Bluetooth device.
"""
super().__init__(device)
self.notified_time = 0.0

@classmethod
def recognize(
cls,
Expand All @@ -53,6 +72,54 @@ def recognize(
"""
return str(cls.SERVICE_DATA_UUID) in advertisement_data.service_data

async def get_time(self) -> float:
"""Get the time of the PVVX device.
Returns:
float: The time of the Bluetooth clock.
"""
_logger.info("Connecting to device...")
async with BleakClient(self.address) as client:
service = client.services.get_service(self.SERVICE_UUID)
characteristic = service.get_characteristic(self.CHAR_UUID)

# Define callback function for notifications and subscribe
def time_callback(sender: BleakGATTCharacteristic, data: bytearray) -> None:
"""Convert bytes read from a notification to a timestamp."""
self.notified_time = self.get_time_from_bytes(data)

await client.start_notify(characteristic, time_callback)

# Write Get/Set Time command
await client.write_gatt_char(
characteristic,
[self.PVVX_GET_SET_TIME_COMMAND],
response=self.WRITE_WITH_RESPONSE,
)

# Wait for a few seconds so a notification is received.
await asyncio.sleep(5)

return self.notified_time

def get_time_from_bytes(self, time_bytes: bytes) -> float:
"""Convert bytes read from the PVVX device to a timestamp.
Args:
time_bytes (bytes): The raw bytes read from the device.
Raises:
InvalidTimeBytesError: If `time_bytes` don't have the right format.
Returns:
float: The time encoded as a Unix timestamp.
"""
try:
_, time_time, _ = struct.unpack(self.TIME_GET_FORMAT, time_bytes)
except struct.error as exception:
raise InvalidTimeBytesError(time_bytes) from exception
return float(time_time)

def get_bytes_from_time(
self,
timestamp: float,
Expand All @@ -70,9 +137,9 @@ def get_bytes_from_time(
bytes: The bytes needed to set the time of the device to `timestamp`.
"""
# Convert timestamp to little-endian unsigned long integer
# And add header byte
# And add header byte for Get/Set Time command.
return struct.pack(
self.TIME_SET_FORMAT,
0x23,
self.PVVX_GET_SET_TIME_COMMAND,
int(timestamp + localtime(timestamp).tm_gmtoff),
)
11 changes: 7 additions & 4 deletions tests/test_pvvx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from bluetooth_clocks import supported_devices
from bluetooth_clocks.devices.pvvx import PVVX
from bluetooth_clocks.exceptions import TimeNotReadableError

if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
Expand Down Expand Up @@ -67,15 +66,19 @@ def test_recognize() -> None:

def test_readable() -> None:
"""Test whether the time is readable on the device."""
assert not PVVX.is_readable()
assert PVVX.is_readable()


@pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem")
@time_machine.travel(datetime(2023, 7, 4, 17, 4, 5, tzinfo=CET_TZ), tick=False)
def test_get_time_from_bytes() -> None:
"""Test the conversion from bytes to a timestamp."""
with pytest.raises(TimeNotReadableError):
assert (
PVVX(BLEDevice("A4:C1:38:D9:01:10", "", {}, -67)).get_time_from_bytes(
bytes([0x23, 0xDD, 0xBC, 0xB9, 0x63]),
bytes([0x23, 0xE5, 0x34, 0xA4, 0x64, 0x33, 0x3B, 0xA3, 0x64]),
)
== time()
)


@pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem")
Expand Down

0 comments on commit 1d4b054

Please sign in to comment.