diff --git a/setup.py b/setup.py index 99cd67ae..9e0807a5 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ name = 'PySwitchbot', packages = ['switchbot'], install_requires=['bluepy'], - version = '0.10.1', + version = '0.11.0', description = 'A library to communicate with Switchbot', author='Daniel Hjelseth Hoyer', url='https://github.com/Danielhiversen/pySwitchbot/', diff --git a/switchbot/__init__.py b/switchbot/__init__.py index 69dc05fe..4aab1014 100644 --- a/switchbot/__init__.py +++ b/switchbot/__init__.py @@ -1,13 +1,16 @@ -"""Library to handle connection with Switchbot""" +"""Library to handle connection with Switchbot.""" +from __future__ import annotations + import binascii import logging +import threading import time import bluepy DEFAULT_RETRY_COUNT = 3 -DEFAULT_RETRY_TIMEOUT = 0.2 -DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10 +DEFAULT_RETRY_TIMEOUT = 1 +DEFAULT_SCAN_TIMEOUT = 5 UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b" HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b" @@ -28,23 +31,191 @@ PRESS_KEY_SUFFIX = "00" _LOGGER = logging.getLogger(__name__) +CONNECT_LOCK = threading.Lock() + + +def _process_wohand(data) -> dict: + """Process woHand/Bot services data.""" + _bot_data = {} + + _sensor_data = binascii.unhexlify(data.encode()) + + # 128 switch or 0 press. + _bot_data["switchMode"] = bool(_sensor_data[1] & 0b10000000) + + # 64 off or 0 for on, if not inversed in app. + if _bot_data["switchMode"]: + _bot_data["isOn"] = not bool(_sensor_data[1] & 0b01000000) + + else: + _bot_data["isOn"] = False + + _bot_data["battery"] = _sensor_data[2] & 0b01111111 + + return _bot_data + + +def _process_wocurtain(data, reverse=True) -> dict: + """Process woCurtain/Curtain services data.""" + _curtain_data = {} + + _sensor_data = binascii.unhexlify(data.encode()) + + _curtain_data["calibration"] = bool(_sensor_data[1] & 0b01000000) + _curtain_data["battery"] = _sensor_data[2] & 0b01111111 + _position = max(min(_sensor_data[3] & 0b01111111, 100), 0) + _curtain_data["position"] = (100 - _position) if reverse else _position + + # light sensor level (1-10) + _curtain_data["lightLevel"] = (_sensor_data[4] >> 4) & 0b00001111 + + return _curtain_data + + +def _process_wosensorth(data) -> dict: + """Process woSensorTH/Temp sensor services data.""" + _wosensorth_data = {} + + _sensor_data = binascii.unhexlify(data.encode()) + + _temp_sign = _sensor_data[4] & 0b10000000 + _temp_c = _temp_sign * ((_sensor_data[4] & 0b01111111) + (_sensor_data[3] / 10)) + _temp_f = (_temp_c * 9 / 5) + 32 + _temp_f = (_temp_f * 10) / 10 + + _wosensorth_data["temp"]["c"] = _temp_c + _wosensorth_data["temp"]["f"] = _temp_f + + _wosensorth_data["fahrenheit"] = bool(_sensor_data[5] & 0b10000000) + _wosensorth_data["humidity"] = _sensor_data[5] & 0b01111111 + _wosensorth_data["battery"] = _sensor_data[2] & 0b01111111 + + return _wosensorth_data + + +class GetSwitchbotDevices: + """Scan for all Switchbot devices and return by type.""" + + def __init__(self, interface=None) -> None: + """Get switchbot devices class constructor.""" + self._interface = interface + self._all_services_data = {} + + def discover( + self, retry=DEFAULT_RETRY_COUNT, scan_timeout=DEFAULT_SCAN_TIMEOUT + ) -> dict | None: + """Find switchbot devices and their advertisement data.""" + + devices = None + + try: + devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout) + + except bluepy.btle.BTLEManagementError: + _LOGGER.error("Error scanning for switchbot devices", exc_info=True) + + if devices is None: + if retry < 1: + _LOGGER.error( + "Scanning for Switchbot devices failed. Stop trying", exc_info=True + ) + return None + + _LOGGER.warning( + "Error scanning for Switchbot devices. Retrying (remaining: %d)", + retry, + ) + time.sleep(DEFAULT_RETRY_TIMEOUT) + return self.discover(retry - 1, scan_timeout) + + for dev in devices: + if dev.getValueText(7) == UUID: + dev_id = dev.addr.replace(":", "") + self._all_services_data[dev_id] = {} + self._all_services_data[dev_id]["mac_address"] = dev.addr + for (adtype, desc, value) in dev.getScanData(): + if adtype == 22: + _model = binascii.unhexlify(value[4:6]).decode() + if _model == "H": + self._all_services_data[dev_id]["data"] = _process_wohand( + value[4:] + ) + self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi + self._all_services_data[dev_id]["model"] = _model + self._all_services_data[dev_id]["modelName"] = "WoHand" + elif _model == "c": + self._all_services_data[dev_id][ + "data" + ] = _process_wocurtain(value[4:]) + self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi + self._all_services_data[dev_id]["model"] = _model + self._all_services_data[dev_id]["modelName"] = "WoCurtain" + elif _model == "T": + self._all_services_data[dev_id][ + "data" + ] = _process_wosensorth(value[4:]) + self._all_services_data[dev_id]["data"]["rssi"] = dev.rssi + self._all_services_data[dev_id]["model"] = _model + self._all_services_data[dev_id]["modelName"] = "WoSensorTH" + + else: + continue + else: + self._all_services_data[dev_id][desc] = value + + return self._all_services_data + + def get_curtains(self) -> dict | None: + """Return all WoCurtain/Curtains devices with services data.""" + if not self._all_services_data: + self.discover() + + _curtain_devices = {} + + for item in self._all_services_data: + if self._all_services_data[item]["model"] == "c": + _curtain_devices[item] = self._all_services_data[item] + + return _curtain_devices + + def get_bots(self) -> dict | None: + """Return all WoHand/Bot devices with services data.""" + if not self._all_services_data: + self.discover() + + _bot_devices = {} + + for item in self._all_services_data: + if self._all_services_data[item]["model"] == "H": + _bot_devices[item] = self._all_services_data[item] + + return _bot_devices + + def get_device_data(self, mac) -> dict | None: + """Return data for specific device.""" + if not self._all_services_data: + self.discover() + + _switchbot_data = {} + + for item in self._all_services_data: + if self._all_services_data[item]["mac_address"] == mac: + _switchbot_data = self._all_services_data[item] + + return _switchbot_data class SwitchbotDevice: - # pylint: disable=too-few-public-methods - # pylint: disable=too-many-instance-attributes """Base Representation of a Switchbot Device.""" def __init__(self, mac, password=None, interface=None, **kwargs) -> None: + """Switchbot base class constructor.""" self._interface = interface self._mac = mac self._device = None - self._battery_percent = 0 + self._switchbot_device_data = {} + self._scan_timeout = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT) self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT) - self._time_between_update_command = kwargs.pop( - "time_between_update_command", DEFAULT_TIME_BETWEEN_UPDATE_COMMAND - ) - self._last_time_command_send = time.time() if password is None or password == "": self._password_encoded = None else: @@ -56,13 +227,13 @@ def _connect(self) -> None: if self._device is not None: return try: - _LOGGER.debug("Connecting to Switchbot...") + _LOGGER.debug("Connecting to Switchbot") self._device = bluepy.btle.Peripheral( self._mac, bluepy.btle.ADDR_TYPE_RANDOM, self._interface ) - _LOGGER.debug("Connected to Switchbot.") + _LOGGER.debug("Connected to Switchbot") except bluepy.btle.BTLEException: - _LOGGER.debug("Failed connecting to Switchbot.", exc_info=True) + _LOGGER.debug("Failed connecting to Switchbot", exc_info=True) self._device = None raise @@ -73,7 +244,7 @@ def _disconnect(self) -> None: try: self._device.disconnect() except bluepy.btle.BTLEException: - _LOGGER.warning("Error disconnecting from Switchbot.", exc_info=True) + _LOGGER.warning("Error disconnecting from Switchbot", exc_info=True) finally: self._device = None @@ -93,119 +264,124 @@ def _writekey(self, key) -> bool: hand = hand_service.getCharacteristics(HANDLE)[0] _LOGGER.debug("Sending command, %s", key) write_result = hand.write(binascii.a2b_hex(key), withResponse=True) - self._last_time_command_send = time.time() if not write_result: _LOGGER.error( "Sent command but didn't get a response from Switchbot confirming command was sent." - " Please check the Switchbot." + " Please check the Switchbot" ) else: - _LOGGER.info("Successfully sent command to Switchbot (MAC: %s).", self._mac) + _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac) return write_result def _sendcommand(self, key, retry) -> bool: send_success = False command = self._commandkey(key) _LOGGER.debug("Sending command to switchbot %s", command) - try: - self._connect() - send_success = self._writekey(command) - except bluepy.btle.BTLEException: - _LOGGER.warning("Error talking to Switchbot.", exc_info=True) - finally: - self._disconnect() + with CONNECT_LOCK: + try: + self._connect() + send_success = self._writekey(command) + except bluepy.btle.BTLEException: + _LOGGER.warning("Error talking to Switchbot", exc_info=True) + finally: + self._disconnect() if send_success: return True if retry < 1: _LOGGER.error( - "Switchbot communication failed. Stopping trying.", exc_info=True + "Switchbot communication failed. Stopping trying", exc_info=True ) return False - _LOGGER.warning( - "Cannot connect to Switchbot. Retrying (remaining: %d)...", retry - ) + _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry) time.sleep(DEFAULT_RETRY_TIMEOUT) return self._sendcommand(key, retry - 1) - def get_servicedata(self, retry=DEFAULT_RETRY_COUNT, scan_timeout=5) -> bytearray: - """Get BTLE 16b Service Data, - returns after the given timeout period in seconds.""" + def get_mac(self) -> str: + """Return mac address of device.""" + return self._mac + + def get_battery_percent(self) -> int: + """Return device battery level in percent.""" + if not self._switchbot_device_data: + return None + return self._switchbot_device_data["data"]["battery"] + + def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | None: + """Find switchbot devices and their advertisement data.""" + if interface: + _interface = interface + else: + _interface = self._interface + devices = None - waiting_time = self._time_between_update_command - time.time() - if waiting_time > 0: - time.sleep(waiting_time) try: - devices = bluepy.btle.Scanner().scan(scan_timeout) + devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout) except bluepy.btle.BTLEManagementError: - _LOGGER.warning("Error updating Switchbot.", exc_info=True) + _LOGGER.error("Error scanning for switchbot devices", exc_info=True) if devices is None: if retry < 1: _LOGGER.error( - "Switchbot update failed. Stopping trying.", exc_info=True + "Scanning for Switchbot devices failed. Stop trying", exc_info=True ) return None _LOGGER.warning( - "Cannot update Switchbot. Retrying (remaining: %d)...", retry + "Error scanning for Switchbot devices. Retrying (remaining: %d)", + retry, ) time.sleep(DEFAULT_RETRY_TIMEOUT) - return self.get_servicedata(retry - 1, scan_timeout) + return self.get_device_data(retry=retry - 1, interface=_interface) - for device in devices: - if self._mac.lower() == device.addr.lower(): - for (adtype, _, value) in device.getScanData(): + for dev in devices: + if self._mac.lower() == dev.addr.lower(): + self._switchbot_device_data["mac_address"] = dev.addr + for (adtype, desc, value) in dev.getScanData(): if adtype == 22: - service_data = value[4:].encode() - service_data = binascii.unhexlify(service_data) - - return service_data - - return None - - def get_mac(self) -> str: - """Returns the mac address of the device.""" - return self._mac - - def get_min_time_update(self): - """Returns the first time an update can be executed.""" - return self._last_time_command_send + self._time_between_update_command - - def get_battery_percent(self) -> int: - """Returns device battery level in percent.""" - return self._battery_percent + _model = binascii.unhexlify(value[4:6]).decode() + if _model == "H": + self._switchbot_device_data["data"] = _process_wohand( + value[4:] + ) + self._switchbot_device_data["data"]["rssi"] = dev.rssi + self._switchbot_device_data["model"] = _model + self._switchbot_device_data["modelName"] = "WoHand" + elif _model == "c": + self._switchbot_device_data["data"] = _process_wocurtain( + value[4:] + ) + self._switchbot_device_data["data"]["rssi"] = dev.rssi + self._switchbot_device_data["model"] = _model + self._switchbot_device_data["modelName"] = "WoCurtain" + elif _model == "T": + self._switchbot_device_data["data"] = _process_wosensorth( + value[4:] + ) + self._switchbot_device_data["data"]["rssi"] = dev.rssi + self._switchbot_device_data["model"] = _model + self._switchbot_device_data["modelName"] = "WoSensorTH" + + else: + continue + else: + self._switchbot_device_data[desc] = value + + return self._switchbot_device_data class Switchbot(SwitchbotDevice): """Representation of a Switchbot.""" def __init__(self, *args, **kwargs) -> None: - self._is_on = None - self._mode = None + """Switchbot Bot/WoHand constructor.""" super().__init__(*args, **kwargs) + self._inverse = kwargs.pop("inverse_mode", False) - def update(self, scan_timeout=5) -> None: - """Updates the mode, battery percent and state of the device.""" - barray = self.get_servicedata(scan_timeout=scan_timeout) - - if barray is None: - return - - _mode = barray[1] & 0b10000000 # 128 switch or 0 toggle - if _mode != 0: - self._mode = "switch" - else: - self._mode = "toggle" - - _is_on = barray[1] & 0b01000000 # 64 on or 0 for off - if _is_on == 0 and self._mode == "switch": - self._is_on = True - else: - self._is_on = False - - self._battery_percent = barray[2] & 0b01111111 + def update(self, interface=None) -> None: + """Update mode, battery percent and state of device.""" + self.get_device_data(retry=self._retry_count, interface=interface) def turn_on(self) -> bool: """Turn device on.""" @@ -220,42 +396,47 @@ def press(self) -> bool: return self._sendcommand(PRESS_KEY, self._retry_count) def switch_mode(self) -> str: - """Return Toggle or Switch from cache. - Run update first.""" - return self._mode + """Return true or false from cache.""" + # To get actual position call update() first. + if not self._switchbot_device_data: + return None + return self._switchbot_device_data["data"]["switchMode"] def is_on(self) -> bool: - """Return switch state from cache. - Run update first.""" - return self._is_on + """Return switch state from cache.""" + # To get actual position call update() first. + if not self._switchbot_device_data: + return None + + if self._inverse: + return not self._switchbot_device_data["data"]["isOn"] + + return self._switchbot_device_data["data"]["isOn"] class SwitchbotCurtain(SwitchbotDevice): """Representation of a Switchbot Curtain.""" def __init__(self, *args, **kwargs) -> None: - """Constructor for a Switchbot Curtain. - The position of the curtain is saved in self._pos with 0 = open and 100 = closed. - This is independent of the calibration of the curtain bot (Open left to right/ - Open right to left/Open from the middle). - The parameter 'reverse_mode' reverse these values, - if 'reverse_mode' = True, position = 0 equals close - and position = 100 equals open. The parameter is default set to True so that - the definition of position is the same as in Home Assistant.""" - self._reverse = kwargs.pop("reverse_mode", True) - self._pos = 0 - self._light_level = 0 - self._is_calibrated = 0 + """Switchbot Curtain/WoCurtain constructor.""" + + # The position of the curtain is saved returned with 0 = open and 100 = closed. + # This is independent of the calibration of the curtain bot (Open left to right/ + # Open right to left/Open from the middle). + # The parameter 'reverse_mode' reverse these values, + # if 'reverse_mode' = True, position = 0 equals close + # and position = 100 equals open. The parameter is default set to True so that + # the definition of position is the same as in Home Assistant. + super().__init__(*args, **kwargs) + self._reverse = kwargs.pop("reverse_mode", True) def open(self) -> bool: """Send open command.""" - self._pos = 100 if self._reverse else 0 return self._sendcommand(OPEN_KEY, self._retry_count) def close(self) -> bool: """Send close command.""" - self._pos = 0 if self._reverse else 100 return self._sendcommand(CLOSE_KEY, self._retry_count) def stop(self) -> bool: @@ -265,33 +446,34 @@ def stop(self) -> bool: def set_position(self, position: int) -> bool: """Send position command (0-100) to device.""" position = (100 - position) if self._reverse else position - self._pos = position hex_position = "%0.2X" % position return self._sendcommand(POSITION_KEY + hex_position, self._retry_count) - def update(self, scan_timeout=5) -> None: - """Updates the current position, battery percent and light level of the device.""" - barray = self.get_servicedata(scan_timeout=scan_timeout) - - if barray is None: - return - - self._is_calibrated = barray[1] & 0b01000000 - self._battery_percent = barray[2] & 0b01111111 - position = max(min(barray[3] & 0b01111111, 100), 0) - self._pos = (100 - position) if self._reverse else position - self._light_level = (barray[4] >> 4) & 0b00001111 # light sensor level (1-10) + def update(self, interface=None) -> None: + """Update position, battery percent and light level of device.""" + self.get_device_data(retry=self._retry_count, interface=interface) def get_position(self) -> int: - """Returns the current cached position (0-100), the actual position could vary. - To get the actual position call update() first.""" - return self._pos + """Return cached position (0-100) of Curtain.""" + # To get actual position call update() first. + if not self._switchbot_device_data: + return None + return self._switchbot_device_data["data"]["position"] def get_light_level(self) -> int: - """Returns the current cached light level, the actual light level could vary. - To get the actual light level call update() first.""" - return self._light_level + """Return cached light level.""" + # To get actual light level call update() first. + if not self._switchbot_device_data: + return None + return self._switchbot_device_data["data"]["lightLevel"] def is_reversed(self) -> bool: - """Returns True if the curtain open from left to right.""" + """Return True if the curtain open from left to right.""" return self._reverse + + def is_calibrated(self) -> bool: + """Return True curtain is calibrated.""" + # To get actual light level call update() first. + if not self._switchbot_device_data: + return None + return self._switchbot_device_data["data"]["calibration"]