Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make bind() optional and implement a generator for device discovery #427

Merged
merged 1 commit into from Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
133 changes: 45 additions & 88 deletions broadlink/__init__.py
@@ -1,15 +1,13 @@
#!/usr/bin/python3
"""The python-broadlink library."""
import socket
import time
from datetime import datetime
from typing import Dict, List, Union, Tuple, Type

from .alarm import S1C
from .climate import hysen
from .cover import dooya
from .device import device
from .helpers import get_local_ip
from .device import device, scan
from .exceptions import exception
from .light import lb1
from .remote import rm, rm2, rm4
from .sensor import a1
Expand Down Expand Up @@ -94,11 +92,11 @@ def get_devices() -> Dict[int, Tuple[Type[device], str, str]]:


def gendevice(
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = None,
is_locked: bool = None,
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = None,
is_locked: bool = None,
) -> device:
"""Generate a device."""
try:
Expand All @@ -118,91 +116,50 @@ def gendevice(
)


def hello(
host: str,
port: int = 80,
timeout: int = 10,
local_ip_address: str = None,
) -> device:
"""Direct device discovery.

Useful if the device is locked.
"""
try:
return next(xdiscover(timeout, local_ip_address, host, port))
except StopIteration:
raise exception(-4000) # Network timeout.


def discover(
timeout: int = None,
timeout: int = 10,
local_ip_address: str = None,
discover_ip_address: str = '255.255.255.255',
discover_ip_port: int = 80,
) -> List[device]:
"""Discover devices connected to the local network."""
local_ip_address = local_ip_address or get_local_ip()
address = local_ip_address.split('.')
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
cs.bind((local_ip_address, 0))
port = cs.getsockname()[1]
starttime = time.time()

devices = []

timezone = int(time.timezone / -3600)
packet = bytearray(0x30)

year = datetime.now().year

if timezone < 0:
packet[0x08] = 0xff + timezone - 1
packet[0x09] = 0xff
packet[0x0a] = 0xff
packet[0x0b] = 0xff
else:
packet[0x08] = timezone
packet[0x09] = 0
packet[0x0a] = 0
packet[0x0b] = 0

packet[0x0c] = year & 0xff
packet[0x0d] = year >> 8
packet[0x0e] = datetime.now().minute
packet[0x0f] = datetime.now().hour
subyear = str(year)[2:]
packet[0x10] = int(subyear)
packet[0x11] = datetime.now().isoweekday()
packet[0x12] = datetime.now().day
packet[0x13] = datetime.now().month
packet[0x18] = int(address[0])
packet[0x19] = int(address[1])
packet[0x1a] = int(address[2])
packet[0x1b] = int(address[3])
packet[0x1c] = port & 0xff
packet[0x1d] = port >> 8
packet[0x26] = 6

checksum = sum(packet, 0xbeaf) & 0xffff
packet[0x20] = checksum & 0xff
packet[0x21] = checksum >> 8

cs.sendto(packet, (discover_ip_address, discover_ip_port))
if timeout is None:
response = cs.recvfrom(1024)
responsepacket = bytearray(response[0])
host = response[1]
devtype = responsepacket[0x34] | responsepacket[0x35] << 8
mac = responsepacket[0x3f:0x39:-1]
name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8')
is_locked = bool(responsepacket[-1])
device = gendevice(devtype, host, mac, name=name, is_locked=is_locked)
cs.close()
return device
felipediel marked this conversation as resolved.
Show resolved Hide resolved

while (time.time() - starttime) < timeout:
cs.settimeout(timeout - (time.time() - starttime))
try:
response = cs.recvfrom(1024)
except socket.timeout:
cs.close()
return devices
responsepacket = bytearray(response[0])
host = response[1]
devtype = responsepacket[0x34] | responsepacket[0x35] << 8
mac = responsepacket[0x3f:0x39:-1]
name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8')
is_locked = bool(responsepacket[-1])
device = gendevice(devtype, host, mac, name=name, is_locked=is_locked)
devices.append(device)
cs.close()
return devices
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses]


def xdiscover(
timeout: int = 10,
local_ip_address: str = None,
discover_ip_address: str = '255.255.255.255',
discover_ip_port: int = 80,
) -> Generator[device, None, None]:
"""Discover devices connected to the local network.

This function returns a generator that yields devices instantly.
"""
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses:
yield gendevice(*resp)


# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode.
Expand Down
105 changes: 104 additions & 1 deletion broadlink/device.py
Expand Up @@ -2,13 +2,93 @@
import threading
import random
import time
from typing import Tuple, Union
from datetime import datetime
from typing import Generator, Tuple, Union

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from .exceptions import check_error, exception

HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]


def scan(
timeout: int = 10,
local_ip_address: str = None,
discover_ip_address: str = '255.255.255.255',
discover_ip_port: int = 80,
) -> Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

if local_ip_address:
cs.bind((local_ip_address, 0))
felipediel marked this conversation as resolved.
Show resolved Hide resolved
port = cs.getsockname()[1]
else:
local_ip_address = "0.0.0.0"
port = 0

address = local_ip_address.split('.')
starttime = time.time()

timezone = int(time.timezone / -3600)
packet = bytearray(0x30)

year = datetime.now().year

if timezone < 0:
packet[0x08] = 0xff + timezone - 1
packet[0x09] = 0xff
packet[0x0a] = 0xff
packet[0x0b] = 0xff
else:
packet[0x08] = timezone
packet[0x09] = 0
packet[0x0a] = 0
packet[0x0b] = 0

packet[0x0c] = year & 0xff
packet[0x0d] = year >> 8
packet[0x0e] = datetime.now().minute
packet[0x0f] = datetime.now().hour
subyear = str(year)[2:]
packet[0x10] = int(subyear)
packet[0x11] = datetime.now().isoweekday()
packet[0x12] = datetime.now().day
packet[0x13] = datetime.now().month
packet[0x18] = int(address[3])
packet[0x19] = int(address[2])
packet[0x1a] = int(address[1])
packet[0x1b] = int(address[0])
packet[0x1c] = port & 0xff
packet[0x1d] = port >> 8
felipediel marked this conversation as resolved.
Show resolved Hide resolved
packet[0x26] = 6

checksum = sum(packet, 0xbeaf) & 0xffff
packet[0x20] = checksum & 0xff
packet[0x21] = checksum >> 8

cs.sendto(packet, (discover_ip_address, discover_ip_port))

try:
while (time.time() - starttime) < timeout:
cs.settimeout(timeout - (time.time() - starttime))
try:
response, host = cs.recvfrom(1024)
except socket.timeout:
break

devtype = response[0x34] | response[0x35] << 8
mac = bytes(reversed(response[0x3a:0x40]))
name = response[0x40:].split(b'\x00')[0].decode('utf-8')
is_locked = bool(response[-1])
yield devtype, host, mac, name, is_locked
finally:
cs.close()


class device:
"""Controls a Broadlink device."""
Expand Down Expand Up @@ -101,6 +181,29 @@ def auth(self) -> bool:
self.update_aes(key)
return True

def hello(self, local_ip_address=None) -> bool:
"""Send a hello message to the device.

Device information is checked before updating name and lock status.
"""
responses = scan(
timeout=self.timeout,
local_ip_address=local_ip_address,
discover_ip_address=self.host[0],
discover_ip_port=self.host[1],
)
try:
devtype, host, mac, name, is_locked = next(responses)
except StopIteration:
raise exception(-4000) # Network timeout.

if (devtype, host, mac) != (self.devtype, self.host, self.mac):
raise exception(-2040) # Device information is not intact.

self.name = name
self.is_locked = is_locked
return True

def get_fwversion(self) -> int:
"""Get firmware version."""
packet = bytearray([0x68])
Expand Down
12 changes: 6 additions & 6 deletions broadlink/exceptions.py
Expand Up @@ -80,6 +80,10 @@ class SDKException(BroadlinkException):
"""Common base class for all SDK exceptions."""


class DeviceInformationError(SDKException):
"""Device information is not intact."""


class ChecksumError(SDKException):
"""Received data packet check error."""

Expand All @@ -88,10 +92,6 @@ class LengthError(SDKException):
"""Received data packet length error."""


class DNSLookupError(SDKException):
"""Failed to obtain local IP address."""
felipediel marked this conversation as resolved.
Show resolved Hide resolved


class NetworkTimeoutError(SDKException):
"""Network timeout error."""

Expand All @@ -113,11 +113,11 @@ class UnknownError(BroadlinkException):
-9: (WriteError, "Write error"),
-10: (ReadError, "Read error"),
-11: (SSIDNotFoundError, "SSID could not be found in AP configuration"),
# DNASDK related errors are generated by this module.
# SDK related errors are generated by this module.
-2040: (DeviceInformationError, "Device information is not intact"),
-4000: (NetworkTimeoutError, "Network timeout"),
-4007: (LengthError, "Received data packet length error"),
-4008: (ChecksumError, "Received data packet check error"),
-4013: (DNSLookupError, "Failed to obtain local IP address"),
}


Expand Down
19 changes: 0 additions & 19 deletions broadlink/helpers.py
@@ -1,24 +1,5 @@
"""Helper functions."""
from ctypes import c_ushort
import socket

from .exceptions import exception


def get_local_ip() -> str:
"""Try to determine the local IP address of the machine."""
# Useful for VPNs.
try:
local_ip_address = socket.gethostbyname(socket.gethostname())
if not local_ip_address.startswith('127.'):
return local_ip_address
except socket.gaierror:
raise exception(-4013) # DNS Error

# Connecting to UDP address does not send packets.
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('8.8.8.8', 53))
return s.getsockname()[0]


def calculate_crc16(input_data: bytes) -> int:
Expand Down