Skip to content

Commit

Permalink
Paint it black
Browse files Browse the repository at this point in the history
  • Loading branch information
felipediel committed Apr 17, 2024
1 parent 855254e commit bcde9f6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 46 deletions.
2 changes: 1 addition & 1 deletion broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@
0xA64D: ("S3", "Broadlink"),
},
hvac: {
0X4E2A: ("HVAC", "Licensed manufacturer"),
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: {
0x4EAD: ("HY02/HY03", "Hysen"),
Expand Down
104 changes: 59 additions & 45 deletions broadlink/climate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ def send_request(self, request: Sequence[int]) -> bytes:
"hysen_response_error", "first byte of response is not length"
)

nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
raise ValueError(
"hysen_response_error", "CRC check on response failed"
)

return payload[0x02:p_len]

Expand Down Expand Up @@ -77,7 +79,7 @@ def get_full_status(self) -> dict:
data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF
data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
Expand Down Expand Up @@ -128,7 +130,9 @@ def get_full_status(self) -> dict:
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
Expand Down Expand Up @@ -255,6 +259,7 @@ class hvac(Device):
@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""

AUTO = 0
COOL = 1
DRY = 2
Expand All @@ -264,6 +269,7 @@ class Mode(enum.IntEnum):
@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""

HIGH = 1
MID = 2
LOW = 3
Expand All @@ -272,19 +278,22 @@ class Speed(enum.IntEnum):
@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""

NORMAL = 0
TURBO = 1
MUTE = 2

@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""

ON = 0
OFF = 7

@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""

ON = 0
POS1 = 1
POS2 = 2
Expand Down Expand Up @@ -315,26 +324,27 @@ def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x2], "little")
p_len = int.from_bytes(payload[:0x02], "little")
checksum = int.from_bytes(payload[p_len:p_len+2], "little")

if checksum != self._crc(payload[0x2:p_len]):
if checksum != self._crc(payload[0x02:p_len]):
logging.debug(
"Checksum incorrect (calculated %s actual %s).",
checksum.hex(), payload[p_len:p_len+2].hex()
checksum.hex(),
payload[p_len:p_len+2].hex(),
)

d_len = int.from_bytes(payload[0x8:0xA], "little")
return payload[0xA:0xA+d_len]
d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]

def _send(self, command: int, data: bytes = b'') -> bytes:
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
command = bytes([((command << 4) | 1), 1])
packet = self._encode(command + data)
logging.debug("Payload:\n%s", packet.hex(' '))
response = self.send_packet(0x6a, packet)
logging.debug("Payload:\n%s", packet.hex(" "))
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x2:]
return self._decode(response)[0x02:]

def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.
Expand All @@ -356,26 +366,30 @@ def get_state(self) -> dict:
"""
resp = self._send(0x1)

if (len(resp) != 0xF):
if len(resp) != 0x0F:
raise ValueError(f"unexpected resp size: {len(resp)}")

logging.debug("Received resp:\n%s", resp.hex(' '))
logging.debug("0b[R] mask: %x, 0c[R] mask: %x, cmnd_16: %x",
resp[0x3] & 0xF, resp[0x4] & 0xF, resp[0x4])
logging.debug("Received resp:\n%s", resp.hex(" "))
logging.debug(
"0b[R] mask: %x, 0c[R] mask: %x, cmnd_16: %x",
resp[0x03] & 0x0F,
resp[0x04] & 0x0F,
resp[0x04],
)

state = {}
state['power'] = resp[0x8] & 1 << 5
state['target_temp'] = 8 + (resp[0x0] >> 3) + (resp[0x4] >> 7) * 0.5
state['swing_v'] = self.SwVert(resp[0x0] & 0b111)
state['swing_h'] = self.SwHoriz(resp[0x1] >> 5)
state['mode'] = self.Mode(resp[0x5] >> 5)
state['speed'] = self.Speed(resp[0x3] >> 5)
state['preset'] = self.Preset(resp[0x4] >> 6)
state['sleep'] = bool(resp[0x5] & 1 << 2)
state['health'] = bool(resp[0x8] & 1 << 1)
state['clean'] = bool(resp[0x8] & 1 << 2)
state['display'] = bool(resp[0xA] & 1 << 4)
state['mildew'] = bool(resp[0xA] & 1 << 3)
state["power"] = resp[0x08] & 1 << 5
state["target_temp"] = 8 + (resp[0x00] >> 3) + (resp[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(resp[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(resp[0x01] >> 5)
state["mode"] = self.Mode(resp[0x05] >> 5)
state["speed"] = self.Speed(resp[0x03] >> 5)
state["preset"] = self.Preset(resp[0x04] >> 6)
state["sleep"] = bool(resp[0x05] & 1 << 2)
state["health"] = bool(resp[0x08] & 1 << 1)
state["clean"] = bool(resp[0x08] & 1 << 2)
state["display"] = bool(resp[0x0A] & 1 << 4)
state["mildew"] = bool(resp[0x0A] & 1 << 3)

logging.debug("State: %s", state)

Expand All @@ -390,15 +404,15 @@ def get_ac_info(self) -> dict:
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if (len(resp) != 0x18):
if len(resp) != 0x18:
raise ValueError(f"unexpected resp size: {len(resp)}")

logging.debug("Received resp:\n%s", resp.hex(' '))
logging.debug("Received resp:\n%s", resp.hex(" "))

ac_info = {}
ac_info["power"] = resp[0x1] & 1

ambient_temp = resp[0x5] & 0b11111, resp[0x15] & 0b11111
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0

Expand Down Expand Up @@ -427,7 +441,7 @@ def set_state(
UNK2 = 0b101

target_temp = round(target_temp * 2) / 2
if not (16 <= target_temp <= 32):
if not 16 <= target_temp <= 32:
raise ValueError(f"target_temp out of range: {target_temp}")

if preset == self.Preset.MUTE:
Expand All @@ -440,18 +454,18 @@ def set_state(
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH

data = bytearray(0xD)
data[0x0] = (int(target_temp) - 8 << 3) | swing_v
data[0x1] = (swing_h << 5) | UNK0
data[0x2] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x3] = speed << 5
data[0x4] = preset << 6
data[0x5] = mode << 5 | (sleep << 2)
data[0x8] = (power << 5 | clean << 2 | health * 0b11)
data[0xA] = display << 4 | mildew << 3
data[0xC] = UNK2
data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | (sleep << 2)
data[0x08] = power << 5 | clean << 2 | health and 0b11
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2

logging.debug("Constructed payload data:\n%s", data.hex(' '))
logging.debug("Constructed payload data:\n%s", data.hex(" "))

response_payload = self._send(0, data)
logging.debug("Response payload:\n%s", response_payload.hex(' '))
logging.debug("Response payload:\n%s", response_payload.hex(" "))

0 comments on commit bcde9f6

Please sign in to comment.