Skip to content
3 changes: 3 additions & 0 deletions switchbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .adv_parser import SwitchbotSupportedType, parse_advertisement_data
from .const import SwitchbotModel
from .devices.bot import Switchbot
from .devices.bulb import ColorMode, SwitchbotBulb
from .devices.curtain import SwitchbotCurtain
from .devices.device import SwitchbotDevice
from .devices.plug import SwitchbotPlugMini
Expand All @@ -14,6 +15,8 @@
"parse_advertisement_data",
"GetSwitchbotDevices",
"SwitchBotAdvertisement",
"ColorMode",
"SwitchbotBulb",
"SwitchbotDevice",
"SwitchbotCurtain",
"Switchbot",
Expand Down
2 changes: 1 addition & 1 deletion switchbot/adv_parsers/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool |
"brightness": mfr_data[7] & 0b01111111,
"delay": bool(mfr_data[8] & 0b10000000),
"preset": bool(mfr_data[8] & 0b00001000),
"light_state": mfr_data[8] & 0b00000111,
"color_mode": mfr_data[8] & 0b00000111,
"speed": mfr_data[9] & 0b01111111,
"loop_index": mfr_data[10] & 0b11111110,
}
28 changes: 9 additions & 19 deletions switchbot/devices/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def update(self, interface: int | None = None) -> None:

async def turn_on(self) -> bool:
"""Turn device on."""
result = await self._sendcommand(ON_KEY, self._retry_count)
result = await self._sendcommand(ON_KEY)

if result[0] == 1:
return True
Expand All @@ -47,7 +47,7 @@ async def turn_on(self) -> bool:

async def turn_off(self) -> bool:
"""Turn device off."""
result = await self._sendcommand(OFF_KEY, self._retry_count)
result = await self._sendcommand(OFF_KEY)
if result[0] == 1:
return True

Expand All @@ -61,7 +61,7 @@ async def turn_off(self) -> bool:

async def hand_up(self) -> bool:
"""Raise device arm."""
result = await self._sendcommand(UP_KEY, self._retry_count)
result = await self._sendcommand(UP_KEY)
if result[0] == 1:
return True

Expand All @@ -73,7 +73,7 @@ async def hand_up(self) -> bool:

async def hand_down(self) -> bool:
"""Lower device arm."""
result = await self._sendcommand(DOWN_KEY, self._retry_count)
result = await self._sendcommand(DOWN_KEY)
if result[0] == 1:
return True

Expand All @@ -85,7 +85,7 @@ async def hand_down(self) -> bool:

async def press(self) -> bool:
"""Press command to device."""
result = await self._sendcommand(PRESS_KEY, self._retry_count)
result = await self._sendcommand(PRESS_KEY)
if result[0] == 1:
return True

Expand All @@ -102,27 +102,17 @@ async def set_switch_mode(
mode_key = format(switch_mode, "b") + format(inverse, "b")
strength_key = f"{strength:0{2}x}" # to hex with padding to double digit

result = await self._sendcommand(
DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count
)
result = await self._sendcommand(DEVICE_SET_MODE_KEY + strength_key + mode_key)

if result[0] == 1:
return True

return False
return result[0] == 1

async def set_long_press(self, duration: int = 0) -> bool:
"""Set bot long press duration."""
duration_key = f"{duration:0{2}x}" # to hex with padding to double digit

result = await self._sendcommand(
DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count
)
result = await self._sendcommand(DEVICE_SET_EXTENDED_KEY + "08" + duration_key)

if result[0] == 1:
return True

return False
return result[0] == 1

async def get_basic_info(self) -> dict[str, Any] | None:
"""Get device basic settings."""
Expand Down
155 changes: 155 additions & 0 deletions switchbot/devices/bulb.py
Original file line number Diff line number Diff line change
@@ -1 +1,156 @@
from __future__ import annotations

import asyncio
import logging
from enum import Enum
from typing import Any

from switchbot.models import SwitchBotAdvertisement

from .device import SwitchbotDevice

REQ_HEADER = "570f"
BULB_COMMMAND_HEADER = "4701"
BULB_REQUEST = f"{REQ_HEADER}4801"

BULB_COMMAND = f"{REQ_HEADER}{BULB_COMMMAND_HEADER}"
# Bulb keys
BULB_ON_KEY = f"{BULB_COMMAND}01"
BULB_OFF_KEY = f"{BULB_COMMAND}02"
RGB_BRIGHTNESS_KEY = f"{BULB_COMMAND}12"
CW_BRIGHTNESS_KEY = f"{BULB_COMMAND}13"
BRIGHTNESS_KEY = f"{BULB_COMMAND}14"
RGB_KEY = f"{BULB_COMMAND}16"
CW_KEY = f"{BULB_COMMAND}17"

_LOGGER = logging.getLogger(__name__)


class ColorMode(Enum):

OFF = 0
COLOR_TEMP = 1
RGB = 2
EFFECT = 3


class SwitchbotBulb(SwitchbotDevice):
"""Representation of a Switchbot bulb."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot bulb constructor."""
super().__init__(*args, **kwargs)
self._state: dict[str, Any] = {}

@property
def on(self) -> bool | None:
"""Return if bulb is on."""
return self.is_on()

@property
def rgb(self) -> tuple[int, int, int] | None:
"""Return the current rgb value."""
if "r" not in self._state or "g" not in self._state or "b" not in self._state:
return None
return self._state["r"], self._state["g"], self._state["b"]

@property
def color_temp(self) -> int | None:
"""Return the current color temp value."""
return self._state.get("cw") or self.min_temp

@property
def brightness(self) -> int | None:
"""Return the current brightness value."""
return self._get_adv_value("brightness") or 0

@property
def color_mode(self) -> ColorMode:
"""Return the current color mode."""
return ColorMode(self._get_adv_value("color_mode") or 0)

@property
def min_temp(self) -> int:
"""Return minimum color temp."""
return 2700

@property
def max_temp(self) -> int:
"""Return maximum color temp."""
return 6500

async def update(self) -> None:
"""Update state of device."""
result = await self._sendcommand(BULB_REQUEST)
self._update_state(result)

async def turn_on(self) -> bool:
"""Turn device on."""
result = await self._sendcommand(BULB_ON_KEY)
self._update_state(result)
return result[1] == 0x80

async def turn_off(self) -> bool:
"""Turn device off."""
result = await self._sendcommand(BULB_OFF_KEY)
self._update_state(result)
return result[1] == 0x00

async def set_brightness(self, brightness: int) -> bool:
"""Set brightness."""
assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
result = await self._sendcommand(f"{BRIGHTNESS_KEY}{brightness:02X}")
self._update_state(result)
return result[1] == 0x80

async def set_color_temp(self, brightness: int, color_temp: int) -> bool:
"""Set color temp."""
assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
assert 2700 <= color_temp <= 6500, "Color Temp must be between 0 and 100"
result = await self._sendcommand(
f"{CW_BRIGHTNESS_KEY}{brightness:02X}{color_temp:04X}"
)
self._update_state(result)
return result[1] == 0x80

async def set_rgb(self, brightness: int, r: int, g: int, b: int) -> bool:
"""Set rgb."""
assert 0 <= brightness <= 100, "Brightness must be between 0 and 100"
assert 0 <= r <= 255, "r must be between 0 and 255"
assert 0 <= g <= 255, "g must be between 0 and 255"
assert 0 <= b <= 255, "b must be between 0 and 255"
result = await self._sendcommand(
f"{RGB_BRIGHTNESS_KEY}{brightness:02X}{r:02X}{g:02X}{b:02X}"
)
self._update_state(result)
return result[1] == 0x80

def is_on(self) -> bool | None:
"""Return bulb state from cache."""
return self._get_adv_value("isOn")

def _update_state(self, result: bytes) -> None:
"""Update device state."""
self._state["r"] = result[3]
self._state["g"] = result[4]
self._state["b"] = result[5]
self._state["cw"] = int(result[6:8].hex(), 16)
_LOGGER.debug(
"%s: Bulb update state: %s = %s", self.name, result.hex(), self._state
)
self._fire_callbacks()

def update_from_advertisement(self, advertisement: SwitchBotAdvertisement) -> None:
"""Update device data from advertisement."""
current_state = self._get_adv_value("sequence_number")
super().update_from_advertisement(advertisement)
new_state = self._get_adv_value("sequence_number")
_LOGGER.debug(
"%s: Bulb update advertisement: %s (seq before: %s) (seq after: %s)",
self.name,
advertisement,
current_state,
new_state,
)
if current_state != new_state:
asyncio.ensure_future(self.update())
36 changes: 10 additions & 26 deletions switchbot/devices/curtain.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,37 +41,25 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:

async def open(self) -> bool:
"""Send open command."""
result = await self._sendcommand(OPEN_KEY, self._retry_count)
if result[0] == 1:
return True

return False
result = await self._sendcommand(OPEN_KEY)
return result[0] == 1

async def close(self) -> bool:
"""Send close command."""
result = await self._sendcommand(CLOSE_KEY, self._retry_count)
if result[0] == 1:
return True

return False
result = await self._sendcommand(CLOSE_KEY)
return result[0] == 1

async def stop(self) -> bool:
"""Send stop command to device."""
result = await self._sendcommand(STOP_KEY, self._retry_count)
if result[0] == 1:
return True

return False
result = await self._sendcommand(STOP_KEY)
return result[0] == 1

async def set_position(self, position: int) -> bool:
"""Send position command (0-100) to device."""
position = (100 - position) if self._reverse else position
hex_position = "%0.2X" % position
result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count)
if result[0] == 1:
return True

return False
result = await self._sendcommand(POSITION_KEY + hex_position)
return result[0] == 1

async def update(self, interface: int | None = None) -> None:
"""Update position, battery percent and light level of device."""
Expand Down Expand Up @@ -107,9 +95,7 @@ async def get_basic_info(self) -> dict[str, Any] | None:

async def get_extended_info_summary(self) -> dict[str, Any] | None:
"""Get basic info for all devices in chain."""
_data = await self._sendcommand(
key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count
)
_data = await self._sendcommand(key=CURTAIN_EXT_SUM_KEY)

if _data in (b"\x07", b"\x00"):
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
Expand Down Expand Up @@ -140,9 +126,7 @@ async def get_extended_info_summary(self) -> dict[str, Any] | None:
async def get_extended_info_adv(self) -> dict[str, Any] | None:
"""Get advance page info for device chain."""

_data = await self._sendcommand(
key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count
)
_data = await self._sendcommand(key=CURTAIN_EXT_ADV_KEY)

if _data in (b"\x07", b"\x00"):
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
Expand Down
Loading