diff --git a/pydecentscale/__init__.py b/pydecentscale/__init__.py index 3a0cfdd..e003d6d 100644 --- a/pydecentscale/__init__.py +++ b/pydecentscale/__init__.py @@ -11,6 +11,12 @@ from threading import Thread from itertools import cycle from bleak import BleakScanner,BleakClient +import logging +import functools +import operator + + +logger = logging.getLogger(__name__) class AsyncioEventLoopThread(threading.Thread): @@ -155,9 +161,40 @@ async def _reset_time(self): await asyncio.sleep(0.2) await self.client.write_gatt_char(self.CHAR_WRITE,self.reset_time_command) + def notification_handler(self, sender, data): + if data[0] != 0x03 or len(data) != 7: + # Basic sanity check + logger.info("Invalid notification: not a Decent Scale?") + return + + # Calculate XOR + xor_msg = functools.reduce(operator.xor, data[:-1]) + if xor_msg != data[-1]: + logger.warning("XOR verification failed for notification") + return - def notification_handler(self,sender, data): - self.weight=int.from_bytes(data[2:4], byteorder='big', signed=True)/10 + # Have to decide by type of the package + type_ = data[1] + + if type_ in [0xCA, 0xCE]: + # Weight information + self.weight = int.from_bytes(data[2:4], byteorder='big', signed=True) / 10 + elif type_ == 0xAA: + # Button press + # NOTE: Despite the API documentation saying the XOR field is 0x00, it actually contains the XOR + logger.debug(f"Button press: {data[2]}, duration: {data[3]}") + elif type_ == 0x0F: + # tare increment + pass + elif type_ == 0x0A: + # LED on/off -> returns units and battery level + logger.debug(f"Unit of scale: {'g' if data[3] == 0 else 'oz'}, battery level: {data[4]}%") + elif type_ == 0x0B: + # Timer + # NOTE: The API documentation says there is a section on "Receiving Timer Info" but this is missing + pass + else: + logger.warning(f"Unknown Notification Type received: 0x{type_:02x}") async def _enable_notification(self): await self.client.start_notify(self.CHAR_READ, self.notification_handler) @@ -201,7 +238,8 @@ def disconnect(self): return self.connected def auto_connect(self,n_retries=3): - + address = None + for i in range(n_retries): address=self.find_address() if address: