Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions switchbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

DEFAULT_RETRY_COUNT = 3
DEFAULT_RETRY_TIMEOUT = 0.2
DEFAULT_TIME_BETWEEN_UPDATE_COMMAND = 10

UUID = "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
HANDLE = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
Expand All @@ -34,11 +35,14 @@ class SwitchbotDevice:
# pylint: disable=too-few-public-methods
"""Base Representation of a Switchbot Device."""

def __init__(self, mac, retry_count=DEFAULT_RETRY_COUNT, password=None, interface=None) -> None:
def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
self._interface = interface
self._mac = mac
self._device = None
self._retry_count = retry_count
self._retry_count = kwargs.pop("retry_count", DEFAULT_RETRY_COUNT)
self._time_between_update_command = kwargs.pop("time_between_update_command",
DEFAULT_TIME_BETWEEN_UPDATE_COMMAND)
self._last_time_command_send = time.time()
if password is None or password == "":
self._password_encoded = None
else:
Expand Down Expand Up @@ -85,6 +89,7 @@ def _writekey(self, key) -> bool:
hand = hand_service.getCharacteristics(HANDLE)[0]
_LOGGER.debug("Sending command, %s", key)
write_result = hand.write(binascii.a2b_hex(key), withResponse=True)
self._last_time_command_send = time.time()
if not write_result:
_LOGGER.error("Sent command but didn't get a response from Switchbot confirming command was sent. "
"Please check the Switchbot.")
Expand Down Expand Up @@ -112,6 +117,14 @@ def _sendcommand(self, key, retry) -> bool:
time.sleep(DEFAULT_RETRY_TIMEOUT)
return self._sendcommand(key, retry - 1)

def get_mac(self) -> str:
"""Returns the mac address of the device."""
return self._mac

def get_min_time_update(self):
"""Returns the first time an update can be executed."""
return self._last_time_command_send + self._time_between_update_command


class Switchbot(SwitchbotDevice):
"""Representation of a Switchbot."""
Expand All @@ -132,12 +145,28 @@ def press(self) -> bool:
class SwitchbotCurtain(SwitchbotDevice):
"""Representation of a Switchbot Curtain."""

def __init__(self, *args, **kwargs) -> None:
"""Constructor for a Switchbot Curtain.
The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
This is independent of the calibration of the curtain bot (Open left to right/
Open right to left/Open from the middle).
The parameter 'reverse_mode' reverse these values, if 'reverse_mode' = True, position = 0 equals close
and position = 100 equals open. The parameter is default set to True so that
the definition of position is the same as in Home Assistant."""
self._reverse = kwargs.pop('reverse_mode', True)
self._pos = 0
self._light_level = 0
self._battery_percent = 0
super().__init__(*args, **kwargs)

def open(self) -> bool:
"""Send open command."""
self._pos = (100 if self._reverse else 0)
return self._sendcommand(OPEN_KEY, self._retry_count)

def close(self) -> bool:
"""Send close command."""
self._pos = (0 if self._reverse else 100)
return self._sendcommand(CLOSE_KEY, self._retry_count)

def stop(self) -> bool:
Expand All @@ -146,5 +175,44 @@ def stop(self) -> bool:

def set_position(self, position: int) -> bool:
"""Send position command (0-100) to device."""
hex_position = "%0.2X" % (100 - position) # curtain position in reverse mode
position = ((100 - position) if self._reverse else position)
self._pos = position
hex_position = "%0.2X" % position
return self._sendcommand(POSITION_KEY + hex_position, self._retry_count)

def update(self, scan_timeout=5) -> None:
"""Updates the current position, battery percent and light level of the device.
Returns after the given timeout period in seconds."""
waiting_time = self.get_min_time_update() - time.time()
if waiting_time > 0:
time.sleep(waiting_time)
devices = bluepy.btle.Scanner().scan(scan_timeout)

for device in devices:
if self.get_mac().lower() == device.addr.lower():
for (adtype, _, value) in device.getScanData():
if adtype == 22:
barray = bytearray(value, 'ascii')
self._battery_percent = int(barray[-6:-4], 16)
position = max(min(int(barray[-4:-2], 16), 100), 0)
self._pos = ((100 - position) if self._reverse else position)
self._light_level = int(barray[-2:], 16)

def get_position(self) -> int:
"""Returns the current cached position (0-100), the actual position could vary.
To get the actual position call update() first."""
return self._pos

def get_battery_percent(self) -> int:
"""Returns the current cached battery percent (0-100), the actual battery percent could vary.
To get the actual battery percent call update() first."""
return self._battery_percent

def get_light_level(self) -> int:
"""Returns the current cached light level, the actual light level could vary.
To get the actual light level call update() first."""
return self._light_level

def is_reversed(self) -> bool:
"""Returns True if the curtain open from left to right."""
return self._reverse