Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read time command for PVVX firmware #19

Merged
merged 1 commit into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ Bluetooth Clocks supports the following devices:
| (e.g. PineTime with | | | |
| InfiniTime firmware) | | | |
+--------------------------+------------+-------------------+-----------+
| `PVVX firmware`_ | Yes | No | No |
| (LYWSD03MMC, MHO-C401, | | | (not yet) |
| `PVVX firmware`_ | Yes | No | Yes |
| (LYWSD03MMC, MHO-C401, | | | |
| CGG1, CGDK2, MJWSD05MMC, | | | |
| MHO-C122) | | | |
+--------------------------+------------+-------------------+-----------+
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ select = ["ALL"]
ignore = [
"ANN101", # flake8-annotations
"ANN102",
"ARG002", # flake8-unused-arguments
"ARG001", # flake8-unused-arguments
"ARG002",
"ARG003",
"DTZ001", # flake8-datetimez
"DTZ006",
Expand Down
75 changes: 71 additions & 4 deletions src/bluetooth_clocks/devices/pvvx.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Bluetooth clock support for devices running the PVVX firmware."""
from __future__ import annotations

import asyncio
import logging
import struct
from time import localtime
from typing import TYPE_CHECKING
Expand All @@ -10,7 +12,12 @@
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from bleak import BleakClient, BleakGATTCharacteristic

from bluetooth_clocks import BluetoothClock
from bluetooth_clocks.exceptions import InvalidTimeBytesError

_logger = logging.getLogger(__name__)


class PVVX(BluetoothClock):
Expand All @@ -20,8 +27,8 @@ class PVVX(BluetoothClock):
SERVICE_UUID = UUID("00001f10-0000-1000-8000-00805f9b34fb")
CHAR_UUID = UUID("00001f1f-0000-1000-8000-00805f9b34fb")

TIME_GET_FORMAT = None
"""The PVVX firmware doesn't support reading the time."""
TIME_GET_FORMAT = "<BLL"
"""The format string to convert bytes read from the device to a time."""

TIME_SET_FORMAT = "<BL"
"""The format string to convert a time to bytes written to the PVVX device."""
Expand All @@ -32,6 +39,18 @@ class PVVX(BluetoothClock):
SERVICE_DATA_UUID = UUID("0000181a-0000-1000-8000-00805f9b34fb")
"""UUID of the service data the PVVX device is advertising."""

PVVX_GET_SET_TIME_COMMAND = 0x23
"""Command for PVVX firmware to Get/Set Time."""

def __init__(self, device: BLEDevice) -> None:
"""Create a PVVX object.

Args:
device (BLEDevice): The Bluetooth device.
"""
super().__init__(device)
self.notified_time = 0.0

@classmethod
def recognize(
cls,
Expand All @@ -53,6 +72,54 @@ def recognize(
"""
return str(cls.SERVICE_DATA_UUID) in advertisement_data.service_data

async def get_time(self) -> float:
"""Get the time of the PVVX device.

Returns:
float: The time of the Bluetooth clock.
"""
_logger.info("Connecting to device...")
async with BleakClient(self.address) as client:
service = client.services.get_service(self.SERVICE_UUID)
characteristic = service.get_characteristic(self.CHAR_UUID)

# Define callback function for notifications and subscribe
def time_callback(sender: BleakGATTCharacteristic, data: bytearray) -> None:
"""Convert bytes read from a notification to a timestamp."""
self.notified_time = self.get_time_from_bytes(data)

await client.start_notify(characteristic, time_callback)

# Write Get/Set Time command
await client.write_gatt_char(
characteristic,
[self.PVVX_GET_SET_TIME_COMMAND],
response=self.WRITE_WITH_RESPONSE,
)

# Wait for a few seconds so a notification is received.
await asyncio.sleep(5)

return self.notified_time

def get_time_from_bytes(self, time_bytes: bytes) -> float:
"""Convert bytes read from the PVVX device to a timestamp.

Args:
time_bytes (bytes): The raw bytes read from the device.

Raises:
InvalidTimeBytesError: If `time_bytes` don't have the right format.

Returns:
float: The time encoded as a Unix timestamp.
"""
try:
_, time_time, _ = struct.unpack(self.TIME_GET_FORMAT, time_bytes)
except struct.error as exception:
raise InvalidTimeBytesError(time_bytes) from exception
return float(time_time)

def get_bytes_from_time(
self,
timestamp: float,
Expand All @@ -70,9 +137,9 @@ def get_bytes_from_time(
bytes: The bytes needed to set the time of the device to `timestamp`.
"""
# Convert timestamp to little-endian unsigned long integer
# And add header byte
# And add header byte for Get/Set Time command.
return struct.pack(
self.TIME_SET_FORMAT,
0x23,
self.PVVX_GET_SET_TIME_COMMAND,
int(timestamp + localtime(timestamp).tm_gmtoff),
)
11 changes: 7 additions & 4 deletions tests/test_pvvx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from bluetooth_clocks import supported_devices
from bluetooth_clocks.devices.pvvx import PVVX
from bluetooth_clocks.exceptions import TimeNotReadableError

if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
Expand Down Expand Up @@ -67,15 +66,19 @@ def test_recognize() -> None:

def test_readable() -> None:
"""Test whether the time is readable on the device."""
assert not PVVX.is_readable()
assert PVVX.is_readable()


@pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem")
@time_machine.travel(datetime(2023, 7, 4, 17, 4, 5, tzinfo=CET_TZ), tick=False)
def test_get_time_from_bytes() -> None:
"""Test the conversion from bytes to a timestamp."""
with pytest.raises(TimeNotReadableError):
assert (
PVVX(BLEDevice("A4:C1:38:D9:01:10", "", {}, -67)).get_time_from_bytes(
bytes([0x23, 0xDD, 0xBC, 0xB9, 0x63]),
bytes([0x23, 0xE5, 0x34, 0xA4, 0x64, 0x33, 0x3B, 0xA3, 0x64]),
)
== time()
)


@pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem")
Expand Down