Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Tornado 16X SQ air conditioner (0x4E2A) #520

Merged
merged 15 commits into from
Apr 17, 2024
13 changes: 10 additions & 3 deletions broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .climate import hvac, hysen
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
Expand Down Expand Up @@ -177,6 +177,9 @@
0xA59C: ("S3", "Broadlink"),
0xA64D: ("S3", "Broadlink"),
},
hvac: {
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: {
0x4EAD: ("HY02/HY03", "Hysen"),
},
Expand Down Expand Up @@ -258,7 +261,9 @@ def discover(
discover_ip_port: int = DEFAULT_PORT,
) -> t.List[Device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses]


Expand All @@ -272,7 +277,9 @@ def xdiscover(

This function returns a generator that yields devices instantly.
"""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses:
yield gendevice(*resp)

Expand Down
262 changes: 248 additions & 14 deletions broadlink/climate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Support for HVAC units."""
import typing as t
"""Support for climate control."""
import enum
import struct
from typing import List, Sequence

from . import exceptions as e
from .device import Device
Expand All @@ -19,7 +21,7 @@ class hysen(Device):

TYPE = "HYS"

def send_request(self, request: t.Sequence[int]) -> bytes:
def send_request(self, request: Sequence[int]) -> bytes:
"""Send a request to the device."""
packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little"))
Expand All @@ -31,15 +33,15 @@ def send_request(self, request: t.Sequence[int]) -> bytes:
payload = self.decrypt(response[0x38:])

p_len = int.from_bytes(payload[:0x02], "little")
if p_len + 2 > len(payload):
raise ValueError(
"hysen_response_error", "first byte of response is not length"
)

nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])

if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)

return payload[0x02:p_len]

Expand Down Expand Up @@ -74,7 +76,7 @@ def get_full_status(self) -> dict:
data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF
data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
Expand Down Expand Up @@ -125,7 +127,9 @@ def get_full_status(self) -> dict:
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
Expand Down Expand Up @@ -206,7 +210,19 @@ def set_power(
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
[
0x01,
0x10,
0x00,
0x08,
0x00,
0x02,
0x04,
hour,
minute,
second,
day
]
)

# Set timer schedule
Expand All @@ -215,7 +231,7 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule."""
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]

Expand All @@ -238,3 +254,221 @@ def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
request.append(int(weekend[i]["temp"] * 2))

self.send_request(request)


class hvac(Device):
"""Controls a HVAC.

Supported models:
- Tornado SMART X SQ series
- Aux ASW-H12U3/JIR1DI-US
- Aux ASW-H36U2/LFR1DI-US
"""

TYPE = "HVAC"

@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""

AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4

@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""

HIGH = 1
MID = 2
LOW = 3
AUTO = 5

@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""

NORMAL = 0
TURBO = 1
MUTE = 2

@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""

ON = 0
OFF = 7

@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""

ON = 0
POS1 = 1
POS2 = 2
POS3 = 3
POS4 = 4
POS5 = 5
OFF = 7

def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 10 + len(data)
struct.pack_into(
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
)
packet += data
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
packet += crc.to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)

if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)

d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]

def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
prefix = bytes([((command << 4) | 1), 1])
packet = self._encode(prefix + data)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x02:]

def _parse_state(self, data: bytes) -> dict:
"""Parse state."""
state = {}
state["power"] = bool(data[0x08] & 1 << 5)
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
state["mode"] = self.Mode(data[0x05] >> 5)
state["speed"] = self.Speed(data[0x03] >> 5)
state["preset"] = self.Preset(data[0x04] >> 6)
state["sleep"] = bool(data[0x05] & 1 << 2)
state["ifeel"] = bool(data[0x05] & 1 << 3)
state["health"] = bool(data[0x08] & 1 << 1)
state["clean"] = bool(data[0x08] & 1 << 2)
state["display"] = bool(data[0x0A] & 1 << 4)
state["mildew"] = bool(data[0x0A] & 1 << 3)
return state

def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: Mode,
speed: Speed,
preset: Preset,
swing_h: SwHoriz,
swing_v: SwVert,
sleep: bool,
ifeel: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> dict:
"""Set the state of the device."""
# TODO: decode unknown bits
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101

target_temp = round(target_temp * 2) / 2

if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW

elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH

data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2

resp = self._send(0, data)
return self._parse_state(resp)

def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.

Returns:
dict:
power (bool):
target_temp (float): temperature set point 16<n<32
mode (hvac.Mode):
speed (hvac.Speed):
preset (hvac.Preset):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
ifeel (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(1)

if len(resp) < 13:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 15 bytes and received {len(resp) + 2}",
)

return self._parse_state(resp)

def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.

Returns:
dict:
power (bool): power
ambient_temp (float): ambient temperature
"""
resp = self._send(2)

if len(resp) < 22:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 24 bytes and received {len(resp) + 2}",
)

ac_info = {}
ac_info["power"] = resp[0x1] & 1

ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0

return ac_info
Loading