Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ venv.bak/
switchbot/.vs/slnx.sqlite
switchbot/.vs/switchbot/v16/.suo
switchbot/.vs/VSWorkspaceState.json
switchbot/.vs/ProjectSettings.json
62 changes: 16 additions & 46 deletions switchbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import binascii
import logging
from multiprocessing import Manager, Process
import threading
import time

import bluepy
Expand Down Expand Up @@ -31,6 +31,7 @@
PRESS_KEY_SUFFIX = "00"

_LOGGER = logging.getLogger(__name__)
CONNECT_LOCK = threading.Lock()


def _process_wohand(data) -> dict:
Expand Down Expand Up @@ -93,34 +94,6 @@ def _process_wosensorth(data) -> dict:
return _wosensorth_data


class BLEScanner:
"""Helper for bluepy device scanning."""

def __init__(self, scan_timeout=DEFAULT_SCAN_TIMEOUT, interface=None) -> None:
"""Init class constructor."""
self._scan_timeout = scan_timeout
self._interface = interface
self._devices = None

def start(self) -> dict | None:
"""Start scan in seperate process."""
with Manager() as manager:
_devices = manager.dict()
process = Process(target=self._scan, args=(_devices,))
process.start()
process.join()

return _devices.get(0, None)

def _scan(self, devices) -> dict | None:
"""Scan for advertisement data."""
try:
devices[0] = bluepy.btle.Scanner(self._interface).scan(self._scan_timeout)

except bluepy.btle.BTLEDisconnectError:
pass


class GetSwitchbotDevices:
"""Scan for all Switchbot devices and return by type."""

Expand All @@ -137,9 +110,7 @@ def discover(
devices = None

try:
devices = BLEScanner(
scan_timeout=scan_timeout, interface=self._interface
).start()
devices = bluepy.btle.Scanner(self._interface).scan(scan_timeout)

except bluepy.btle.BTLEManagementError:
_LOGGER.error("Error scanning for switchbot devices", exc_info=True)
Expand All @@ -165,7 +136,7 @@ def discover(
self._all_services_data[dev_id]["mac_address"] = dev.addr
for (adtype, desc, value) in dev.getScanData():
if adtype == 22:
_model = binascii.unhexlify(value[4:6]).decode()
_model = chr(binascii.unhexlify(value.encode())[2] & 0b01111111)
if _model == "H":
self._all_services_data[dev_id]["data"] = _process_wohand(
value[4:]
Expand Down Expand Up @@ -229,7 +200,7 @@ def get_device_data(self, mac) -> dict | None:
_switchbot_data = {}

for item in self._all_services_data:
if self._all_services_data[item]["mac_address"] == mac.replace("-", ":").lower():
if self._all_services_data[item]["mac_address"] == mac:
_switchbot_data = self._all_services_data[item]

return _switchbot_data
Expand All @@ -241,7 +212,7 @@ class SwitchbotDevice:
def __init__(self, mac, password=None, interface=None, **kwargs) -> None:
"""Switchbot base class constructor."""
self._interface = interface
self._mac = mac.replace("-", ":").lower()
self._mac = mac
self._device = None
self._switchbot_device_data = {}
self._scan_timeout = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT)
Expand Down Expand Up @@ -307,13 +278,14 @@ def _sendcommand(self, key, retry) -> bool:
send_success = False
command = self._commandkey(key)
_LOGGER.debug("Sending command to switchbot %s", command)
try:
self._connect()
send_success = self._writekey(command)
except bluepy.btle.BTLEException:
_LOGGER.warning("Error talking to Switchbot", exc_info=True)
finally:
self._disconnect()
with CONNECT_LOCK:
try:
self._connect()
send_success = self._writekey(command)
except bluepy.btle.BTLEException:
_LOGGER.warning("Error talking to Switchbot", exc_info=True)
finally:
self._disconnect()
if send_success:
return True
if retry < 1:
Expand Down Expand Up @@ -345,9 +317,7 @@ def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | N
devices = None

try:
devices = BLEScanner(
scan_timeout=self._scan_timeout, interface=_interface
).start()
devices = bluepy.btle.Scanner(_interface).scan(self._scan_timeout)

except bluepy.btle.BTLEManagementError:
_LOGGER.error("Error scanning for switchbot devices", exc_info=True)
Expand All @@ -371,7 +341,7 @@ def get_device_data(self, retry=DEFAULT_RETRY_COUNT, interface=None) -> dict | N
self._switchbot_device_data["mac_address"] = dev.addr
for (adtype, desc, value) in dev.getScanData():
if adtype == 22:
_model = binascii.unhexlify(value[4:6]).decode()
_model = chr(binascii.unhexlify(value.encode())[2] & 0b01111111)
if _model == "H":
self._switchbot_device_data["data"] = _process_wohand(
value[4:]
Expand Down