diff --git a/TheengsGateway/__init__.py b/TheengsGateway/__init__.py index 2474c012..27ba3760 100644 --- a/TheengsGateway/__init__.py +++ b/TheengsGateway/__init__.py @@ -51,6 +51,7 @@ "time_sync": [], "time_format": 0, "publish_advdata": 0, + "bindkeys": {}, } conf_path = os.path.expanduser("~") + "/theengsgw.conf" @@ -210,6 +211,15 @@ def main() -> None: type=int, help="Publish advertising and advanced data (1) or not (0) (default: 0)", ) + parser.add_argument( + "-bk", + "--bindkeys", + nargs="+", + metavar=("ADDRESS", "BINDKEY"), + dest="bindkeys", + default={}, + help="Device addresses and their bindkeys: ADDR1 KEY1 ADDR2 KEY2", + ) args = parser.parse_args() @@ -304,6 +314,11 @@ def main() -> None: if args.publish_advdata is not None: config["publish_advdata"] = args.publish_advdata + if args.bindkeys: + config["bindkeys"].update( + dict(zip(args.bindkeys[::2], args.bindkeys[1::2])) + ) + if not config["host"]: sys.exit("Invalid MQTT host") diff --git a/TheengsGateway/ble_gateway.py b/TheengsGateway/ble_gateway.py index ee0c7802..512e046b 100644 --- a/TheengsGateway/ble_gateway.py +++ b/TheengsGateway/ble_gateway.py @@ -21,6 +21,7 @@ # mypy: disable-error-code="name-defined,attr-defined" import asyncio +import binascii import json import logging import platform @@ -41,6 +42,7 @@ from bluetooth_clocks.scanners import find_clock from bluetooth_numbers import company from bluetooth_numbers.exceptions import UnknownCICError +from Cryptodome.Cipher import AES from paho.mqtt import client as mqtt_client from TheengsDecoder import decodeBLE @@ -362,6 +364,69 @@ def detection_callback( if gw.presence: self.hass_presence(decoded_json) + # Handle encrypted payload + if ( + decoded_json.get("encr", False) + and decoded_json["model_id"] == "SBBT_002C_ENCR" + ): + try: + bindkey = bytes.fromhex( + gw.bindkeys[device.address] + ) + nonce = binascii.unhexlify( + "".join( + [ + device.address.replace(":", ""), + "d2fc", + decoded_json["servicedata"][:2], + decoded_json["ctr"], + ] + ) + ) + cipher = AES.new( + bindkey, AES.MODE_CCM, nonce=nonce, mac_len=4 + ) + payload = bytes.fromhex(decoded_json["cipher"]) + mic = bytes.fromhex(decoded_json["mic"]) + decrypted_data = cipher.decrypt_and_verify( + payload, mic + ) + + # Clear encryption and MAC included bits in device info + # See https://bthome.io/format/ + device_info = bytes.fromhex( + decoded_json["servicedata"][:2] + ) + mask = 0b11111100 + masked_device_info = ( + int.from_bytes(device_info, "big") & mask + ) + bthome_service_data = bytearray( + masked_device_info.to_bytes(1, "big") + ) + + # Replace encrypted data by decrypted payload + bthome_service_data.extend(decrypted_data) + data_json[ + "servicedata" + ] = bthome_service_data.hex() + + decoded_json = decodeBLE(json.dumps(data_json)) + if decoded_json: + decoded_json = json.loads(decoded_json) + else: + logger.exception( + "Decrypted payload not supported: `%s`", + data_json["servicedata"], + ) + + except KeyError: + logger.exception( + "Can't find bindkey for %s.", device.address + ) + except ValueError: + logger.exception("Decryption failed") + # Remove advanced data if not gw.pubadvdata: for key in ( @@ -372,6 +437,7 @@ def detection_callback( "acts", "cont", "track", + "encr", ): decoded_json.pop(key, None) @@ -470,6 +536,7 @@ def run(arg: str) -> None: gw.time_sync = config["time_sync"] gw.time_format = bool(config["time_format"]) gw.pubadvdata = bool(config["publish_advdata"]) + gw.bindkeys = config["bindkeys"] logging.basicConfig() logger.setLevel(log_level) diff --git a/TheengsGateway/diagnose.py b/TheengsGateway/diagnose.py index 941bbbb0..c0da64a9 100644 --- a/TheengsGateway/diagnose.py +++ b/TheengsGateway/diagnose.py @@ -36,6 +36,11 @@ def _anonymize_addresses(addresses: List[str]) -> List[str]: return [_anonymize_address(address) for address in addresses] +def _anonymize_bindkeys(bindkeys: Dict[str, str]) -> Dict[str, str]: + """Anonymize the addresses and bindkeys in a dictionary.""" + return {_anonymize_address(address): "***" for address in bindkeys} + + # This function is taken from Textual def _section(title: str, values: Dict[str, str]) -> None: """Print a collection of named values within a titled section.""" @@ -117,6 +122,7 @@ def _config() -> None: config = json.load(config_file) _anonymize_strings(["user", "pass"], config) config["time_sync"] = _anonymize_addresses(config["time_sync"]) + config["bindkeys"] = _anonymize_bindkeys(config["bindkeys"]) print("```") print(json.dumps(config, sort_keys=True, indent=4)) print("```") diff --git a/docs/use/use.md b/docs/use/use.md index e1b88882..67eca927 100644 --- a/docs/use/use.md +++ b/docs/use/use.md @@ -51,6 +51,7 @@ usage: -m [-h] [-H HOST] [-P PORT] [-u USER] [-p PWD] [-pt PUB_TOPIC] [-Lt LWT_T [-prt PRESENCE_TOPIC] [-pr PUBLISH_PRESENCE] [-a ADAPTER] [-s {active,passive}] [-ts TIME_SYNC [TIME_SYNC ...]] [-tf TIME_FORMAT] [-padv PUBLISH_ADVDATA] + [-bk ADDRESS [BINDKEY ...]] optional arguments: -h, --help show this help message and exit @@ -100,6 +101,8 @@ optional arguments: -padv PUBLISH_ADVDATA, --publish_advdata PUBLISH_ADVDATA Publish advertising and advanced data (1) or not (0) (default: 0) + -bk ADDRESS [BINDKEY ...], --bindkeys ADDRESS [BINDKEY ...] + Device addresses and their bindkeys: ADDR1 KEY1 ADDR2 KEY2 ``` ### For a Docker container @@ -219,3 +222,12 @@ bluetooth-clocks discover ``` The `bluetooth-clocks` command is installed as part of Theengs Gateway. + +## Reading encrypted advertisements +If you want to read encrypted advertisements, you need to add a bindkey for each device address with the `--bindkeys` argument. For example: + +``` +TheengsGateway --bindkeys 00:11:22:33:44:55:66 0dc540f3025b474b9ef1085e051b1add AA:BB:CC:DD:EE:FF 6385424e1b0341109942ad2a6bb42e58 +``` + +Theengs Gateway will then use the bindkey 0dc540f3025b474b9ef1085e051b1add to decrypt all advertisements from device 00:11:22:33:44:55:66 and bindkey 6385424e1b0341109942ad2a6bb42e58 for all advertisements from device AA:BB:CC:DD:EE:FF. diff --git a/pyproject.toml b/pyproject.toml index a4bc21ae..5dda7621 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,7 @@ line-length = 79 include = 'TheengsGateway\/.*\.pyi?$' [[tool.mypy.overrides]] module = [ + "Cryptodome.Cipher", "paho.mqtt", "setuptools", "TheengsDecoder", diff --git a/setup.py b/setup.py index 0ca06016..0a316f0c 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,7 @@ "bluetooth-numbers>=1.0,<2.0", "importlib-metadata", "paho-mqtt>=1.6.1", - "TheengsDecoder>=1.5.0", + "pycryptodomex>=3.18.0", + "TheengsDecoder>=1.5.5", ], )