diff --git a/switchbot/__init__.py b/switchbot/__init__.py index df2fd832..de79f214 100644 --- a/switchbot/__init__.py +++ b/switchbot/__init__.py @@ -8,6 +8,7 @@ DEFAULT_RETRY_COUNT = 3 DEFAULT_RETRY_TIMEOUT = 0.2 +DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10 UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b" HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b" @@ -34,11 +35,14 @@ class SwitchbotDevice: # pylint: disable=too-few-public-methods """Base Representation of a Switchbot Device.""" - def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT, password=None, interface=None) -> None: + def __init__(self, mac, password=None, interface=None, **kwargs) -> None: self._interface = interface self._mac = mac self._device = None - self._retry_count = retry_count + self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT) + self._time_between_update_command = kwargs.pop("time_between_update_command", + DEFAULT_TIME_BETWEEN_UPDATE_COMMAND) + self._last_time_command_send = time.time() if password is None or password == "": self._password_encoded = None else: @@ -85,6 +89,7 @@ def _writekey(self, key) -> bool: hand = hand_service.getCharacteristics(HANDLE)[0] _LOGGER.debug("Sending command, %s", key) write_result = hand.write(binascii.a2b_hex(key), withResponse=True) + self._last_time_command_send = time.time() if not write_result: _LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. " "Please check the Switchbot.") @@ -112,6 +117,14 @@ def _sendcommand(self, key, retry) -> bool: time.sleep(DEFAULT_RETRY_TIMEOUT) return self._sendcommand(key, retry - 1) + def get_mac(self) -> str: + """Returns the mac address of the device.""" + return self._mac + + def get_min_time_update(self): + """Returns the first time an update can be executed.""" + return self._last_time_command_send + self._time_between_update_command + class Switchbot(SwitchbotDevice): """Representation of a Switchbot.""" @@ -132,12 +145,28 @@ def press(self) -> bool: class SwitchbotCurtain(SwitchbotDevice): """Representation of a Switchbot Curtain.""" + def __init__(self, *args, **kwargs) -> None: + """Constructor for a Switchbot Curtain. + The position of the curtain is saved in self._pos with 0 = open and 100 = closed. + This is independent of the calibration of the curtain bot (Open left to right/ + Open right to left/Open from the middle). + The parameter 'reverse_mode' reverse these values, if 'reverse_mode' = True, position = 0 equals close + and position = 100 equals open. The parameter is default set to True so that + the definition of position is the same as in Home Assistant.""" + self._reverse = kwargs.pop('reverse_mode', True) + self._pos = 0 + self._light_level = 0 + self._battery_percent = 0 + super().__init__(*args, **kwargs) + def open(self) -> bool: """Send open command.""" + self._pos = (100 if self._reverse else 0) return self._sendcommand(OPEN_KEY, self._retry_count) def close(self) -> bool: """Send close command.""" + self._pos = (0 if self._reverse else 100) return self._sendcommand(CLOSE_KEY, self._retry_count) def stop(self) -> bool: @@ -146,5 +175,44 @@ def stop(self) -> bool: def set_position(self, position: int) -> bool: """Send position command (0-100) to device.""" - hex_position = "%0.2X" % (100 - position) # curtain position in reverse mode + position = ((100 - position) if self._reverse else position) + self._pos = position + hex_position = "%0.2X" % position return self._sendcommand(POSITION_KEY + hex_position, self._retry_count) + + def update(self, scan_timeout=5) -> None: + """Updates the current position, battery percent and light level of the device. + Returns after the given timeout period in seconds.""" + waiting_time = self.get_min_time_update() - time.time() + if waiting_time > 0: + time.sleep(waiting_time) + devices = bluepy.btle.Scanner().scan(scan_timeout) + + for device in devices: + if self.get_mac().lower() == device.addr.lower(): + for (adtype, _, value) in device.getScanData(): + if adtype == 22: + barray = bytearray(value, 'ascii') + self._battery_percent = int(barray[-6:-4], 16) + position = max(min(int(barray[-4:-2], 16), 100), 0) + self._pos = ((100 - position) if self._reverse else position) + self._light_level = int(barray[-2:], 16) + + def get_position(self) -> int: + """Returns the current cached position (0-100), the actual position could vary. + To get the actual position call update() first.""" + return self._pos + + def get_battery_percent(self) -> int: + """Returns the current cached battery percent (0-100), the actual battery percent could vary. + To get the actual battery percent call update() first.""" + return self._battery_percent + + def get_light_level(self) -> int: + """Returns the current cached light level, the actual light level could vary. + To get the actual light level call update() first.""" + return self._light_level + + def is_reversed(self) -> bool: + """Returns True if the curtain open from left to right.""" + return self._reverse