/
app_aria.py
122 lines (100 loc) · 3.37 KB
/
app_aria.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black
# Packet parser for App_ARIA (ARIA mode)
from datetime import datetime
from typing import Any, final
from overrides import override
from pydantic import Field, field_serializer
from .. import common
@final
class ParsedPacket(common.ParsedPacketBase):
"""Dataclass for parsed packets from App_ARIA
Attributes
----------
temp_100x: common.Int16
100x temperature in °C
humid_100x: common.UInt16
100x humidity in RH%
magnet_state: common.MagnetState
Magnet state
magnet_state_changed: bool
True if the magnet state was changed
"""
temp_100x: common.Int16 = Field(default=0, ge=-4000, le=12500)
humid_100x: common.UInt16 = Field(default=0, ge=0, le=10000)
magnet_state: common.MagnetState = Field(default=common.MagnetState.NOT_DETECTED)
magnet_state_changed: bool = Field(default=False)
@field_serializer("magnet_state")
def serialize_magnet_state(self, magnet_state: common.MagnetState) -> str:
"""Print magnet_state in readable names for JSON or something
Parameters
----------
magnet_state : common.MagnetState
Magnet state
Returns
-------
str
Serialized text for JSON or something
"""
return magnet_state.name
@final
class PacketParser(common.PacketParserBase):
"""Packet parser for App_ARIA"""
@staticmethod
@override
def is_valid(bare_packet: common.BarePacket) -> bool:
"""Check the given bare packet is valid or not
Parameters
----------
bare_packet : common.BarePacket
Bare packet content
Returns
-------
bool
True if valid
Notes
-----
Static overridden method
"""
if (
(bare_packet.u8_at(0) & 0x80) == 0x80
and (bare_packet.u8_at(7) & 0x80) == 0x80
and bare_packet.u8_at(12) == 0x80
and bare_packet.u8_at(13) == 0x06
and len(bare_packet.payload) == 60
):
return True
return False
@staticmethod
@override
def parse(bare_packet: common.BarePacket) -> ParsedPacket | None:
"""Try to parse the given bare packet
Parameters
----------
bare_packet : common.BarePacket
Bare packet content
Returns
-------
ParsedPacket | None
Parsed packet data if valid else None
Notes
-----
Static overridden method
"""
if not PacketParser.is_valid(bare_packet):
return None
parsed_packet_data: dict[str, Any] = {
"time_parsed": datetime.now(common.Timezone),
"packet_type": common.PacketType.APP_ARIA,
"sequence_number": bare_packet.u16_at(5),
"source_serial_id": bare_packet.u32_at(7),
"source_logical_id": bare_packet.u8_at(11),
"lqi": bare_packet.u8_at(4),
"supply_voltage": bare_packet.u16_at(34),
"temp_100x": bare_packet.i16_at(51),
"humid_100x": bare_packet.u16_at(57),
"magnet_state": common.MagnetState(bare_packet.u8_at(46) & 0x0F),
"magnet_state_changed": False if bare_packet.u8_at(46) & 0x80 else True,
}
return ParsedPacket(**parsed_packet_data)