Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name = 'PySwitchbot',
packages = ['switchbot'],
install_requires=['bleak'],
install_requires=['bleak', 'bleak-retry-connector'],
version = '0.15.0',
description = 'A library to communicate with Switchbot',
author='Daniel Hjelseth Hoyer',
Expand Down
92 changes: 47 additions & 45 deletions switchbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import bleak
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak_retry_connector import BleakClient, establish_connection

DEFAULT_RETRY_COUNT = 3
DEFAULT_RETRY_TIMEOUT = 1
Expand Down Expand Up @@ -285,61 +286,62 @@ def _commandkey(self, key: str) -> str:
async def _sendcommand(self, key: str, retry: int) -> bytes:
"""Send command to device and read response."""
command = bytearray.fromhex(self._commandkey(key))
notify_msg = b""
_LOGGER.debug("Sending command to switchbot %s", command)

max_attempts = retry + 1
async with CONNECT_LOCK:
try:
async with bleak.BleakClient(
address_or_ble_device=self._device,
timeout=float(self._scan_timeout),
) as client:
_LOGGER.debug("Connnected to switchbot: %s", client.is_connected)

_LOGGER.debug("Subscribe to notifications")
await client.start_notify(
_sb_uuid(comms_type="rx"), self._notification_handler
)

_LOGGER.debug("Sending command, %s", key)
await client.write_gatt_char(
_sb_uuid(comms_type="tx"), command, False
)

await asyncio.sleep(
1.0
) # Bot needs pause. Otherwise notification could be missed.

notify_msg = self._last_notification
_LOGGER.info("Notification received: %s", notify_msg)
for attempt in range(max_attempts):
try:
return await self._send_command_locked(key, command)
except (bleak.BleakError, asyncio.exceptions.TimeoutError):
if attempt == retry:
_LOGGER.error(
"Switchbot communication failed. Stopping trying",
exc_info=True,
)
return b"\x00"

_LOGGER.debug("Switchbot communication failed with:", exc_info=True)

raise RuntimeError("Unreachable")

async def _send_command_locked(self, key: str, command: bytes) -> bytes:
"""Send command to device and read response."""
client: BleakClient | None = None
try:
_LOGGER.debug("Connnecting to switchbot: %s", self._device.address)

_LOGGER.debug("UnSubscribe to notifications")
await client.stop_notify(_sb_uuid(comms_type="rx"))
client = await establish_connection(
BleakClient, self._device.address, self._device, max_attempts=1
)
_LOGGER.debug("Connnected to switchbot: %s", client.is_connected)

except (bleak.BleakError, asyncio.exceptions.TimeoutError):
_LOGGER.debug("Subscribe to notifications")
await client.start_notify(
_sb_uuid(comms_type="rx"), self._notification_handler
)

if retry < 1:
_LOGGER.error(
"Switchbot communication failed. Stopping trying", exc_info=True
)
return b"\x00"
_LOGGER.debug("Sending command, %s", key)
await client.write_gatt_char(_sb_uuid(comms_type="tx"), command, False)

_LOGGER.debug("Switchbot communication failed with:", exc_info=True)
await asyncio.sleep(
1.0
) # Bot needs pause. Otherwise notification could be missed.

if notify_msg:
if notify_msg == b"\x07":
_LOGGER.error("Password required")
elif notify_msg == b"\t":
_LOGGER.error("Password incorrect")
return notify_msg
notify_msg = self._last_notification
_LOGGER.info("Notification received: %s", notify_msg)

_LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry)
_LOGGER.debug("UnSubscribe to notifications")
await client.stop_notify(_sb_uuid(comms_type="rx"))

if retry < 1: # failsafe
return b"\x00"
finally:
if client:
await client.disconnect()

await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
return await self._sendcommand(key, retry - 1)
if notify_msg == b"\x07":
_LOGGER.error("Password required")
elif notify_msg == b"\t":
_LOGGER.error("Password incorrect")
return notify_msg

def get_address(self) -> str:
"""Return address of device."""
Expand Down