Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions switchbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
DEVICE_SET_MODE_KEY = "5703"
DEVICE_SET_EXTENDED_KEY = "570f"

# Plug Mini keys
PLUG_ON_KEY = "570f50010180"
PLUG_OFF_KEY = "570f50010100"

# Bot keys
PRESS_KEY = "570100"
ON_KEY = "570101"
Expand Down Expand Up @@ -56,7 +60,7 @@ def _sb_uuid(comms_type: str = "service") -> UUID | str:
return "Incorrect type, choose between: tx, rx or service"


def _process_wohand(data: bytes) -> dict[str, bool | int]:
def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woHand/Bot services data."""
_switch_mode = bool(data[1] & 0b10000000)

Expand All @@ -69,7 +73,9 @@ def _process_wohand(data: bytes) -> dict[str, bool | int]:
return _bot_data


def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
def _process_wocurtain(
data: bytes, mfr_data: bytes | None, reverse: bool = True
) -> dict[str, bool | int]:
"""Process woCurtain/Curtain services data."""

_position = max(min(data[3] & 0b01111111, 100), 0)
Expand All @@ -86,7 +92,7 @@ def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | in
return _curtain_data


def _process_wosensorth(data: bytes) -> dict[str, object]:
def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
"""Process woSensorTH/Temp sensor services data."""

_temp_sign = 1 if data[4] & 0b10000000 else -1
Expand All @@ -104,7 +110,7 @@ def _process_wosensorth(data: bytes) -> dict[str, object]:
return _wosensorth_data


def _process_wocontact(data: bytes) -> dict[str, bool | int]:
def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woContact Sensor services data."""
return {
"tested": bool(data[1] & 0b10000000),
Expand All @@ -117,6 +123,15 @@ def _process_wocontact(data: bytes) -> dict[str, bool | int]:
}


def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process plug mini."""
return {
"switchMode": True,
"isOn": mfr_data[7] == 0x80,
"wifi_rssi": mfr_data[9],
}


@dataclass
class SwitchBotAdvertisement:
"""Switchbot advertisement."""
Expand All @@ -131,9 +146,12 @@ def parse_advertisement_data(
) -> SwitchBotAdvertisement | None:
"""Parse advertisement data."""
_services = list(advertisement_data.service_data.values())
_mgr_datas = list(advertisement_data.manufacturer_data.values())

if not _services:
return
_service_data = _services[0]
_mfr_data = _mgr_datas[0] if _mgr_datas else None
_model = chr(_service_data[0] & 0b01111111)

supported_types: dict[str, dict[str, Any]] = {
Expand All @@ -142,6 +160,7 @@ def parse_advertisement_data(
"c": {"modelName": "WoCurtain", "func": _process_wocurtain},
"T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
"i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
"g": {"modelName": "WoPlug", "func": _process_woplugmini},
}

data = {
Expand All @@ -159,7 +178,7 @@ def parse_advertisement_data(
"isEncrypted": bool(_service_data[0] & 0b10000000),
"model": _model,
"modelName": supported_types[_model]["modelName"],
"data": supported_types[_model]["func"](_service_data),
"data": supported_types[_model]["func"](_service_data, _mfr_data),
}
)

Expand Down Expand Up @@ -719,3 +738,34 @@ def is_calibrated(self) -> Any:
"""Return True curtain is calibrated."""
# To get actual light level call update() first.
return self._get_adv_value("calibration")


class SwitchbotPlugMini(SwitchbotDevice):
"""Representation of a Switchbot plug mini."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot plug mini constructor."""
super().__init__(*args, **kwargs)
self._settings: dict[str, Any] = {}

async def update(self, interface: int | None = None) -> None:
"""Update state of device."""
await self.get_device_data(retry=self._retry_count, interface=interface)

async def turn_on(self) -> bool:
"""Turn device on."""
result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
return result[1] == 0x80

async def turn_off(self) -> bool:
"""Turn device off."""
result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
return result[1] == 0x00

def is_on(self) -> Any:
"""Return switch state from cache."""
# To get actual position call update() first.
value = self._get_adv_value("isOn")
if value is None:
return None
return value