/
jk232.py
155 lines (138 loc) · 5.85 KB
/
jk232.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
from .abstractprotocol import AbstractProtocol
from .protocol_helpers import crcJK232 as crc
log = logging.getLogger("jk232")
# Read basic information and status
# DD A5 03 00 FF FD 77
# start bit 0xDD
# status 0xA5 means read, status 0x5A means write.
# command code 0x03
# Data length: 1 byte, indicating the effective length of the data carried in the frame.
# Data content: N bytes, the content carried by the frame data, when the data length is 0, there is no such part.
# Verification: 2 bytes,
# the verification field is "command code + length byte + data segment content",
# the verification method is thesum of the above fields and then the inverse plus 1, the high bit is in the front and the low bit is in the back.
# Stop bit: 1 byte, indicating the end of a frame of data, fixed as 0x77;
COMMANDS = {
"getBalancerData": {
"name": "getBalancerData",
"command_code": "03",
"description": "Get Balancer Data",
"help": " -- Get Balancer Data",
"type": "QUERY",
"checksum_required": "True",
"response_type": "POSITIONAL",
"response": [
["Hex2Str", 1, "Start Byte", ""],
["Hex2Str", 1, "Command Code", ""],
["Hex2Str", 1, "Status", ""],
["Hex2Int", 1, "Data Length", ""],
["BigHex2Short:r/100", 2, "Total Battery Voltage", "V"],
["BigHex2Short:r/100", 2, "Total Current", "A"],
["BigHex2Short:r/100", 2, "Remaining Capacity", "Ah"],
["BigHex2Short:r/100", 2, "Nominal Capacity", "Ah"],
["BigHex2Short", 2, "Cycles", "cycles"],
["Hex2Str", 2, "Production Date", ""],
["Hex2Str", 2, "Equilibrium State (TODO)", ""],
["Hex2Str", 2, "Equilibrium State 2 (TODO)", ""],
["Hex2Str", 2, "Protection State (TODO)", ""],
["Hex2Str", 1, "Keep", ""],
["Hex2Int", 1, "Remaining Battery", "%"],
["Hex2Str", 1, "FET Control Status", ""],
["Hex2Int", 1, "Number of Battery Strings", ""],
["Hex2Int", 1, "Number of NTC", ""],
["BigHex2Short:(r-2731)/10", 2, "NTC 1", "°C"],
["BigHex2Short:(r-2731)/10", 2, "NTC 2", "°C"],
["Hex2Str", 2, "Checksum", ""],
["Hex2Str", 1, "End Byte", ""],
],
"test_responses": [
bytes.fromhex(
"DD 03 00 1B 17 00 00 00 02 D0 03 E8 00 00 20 78 00 00 00 00 00 00 10 48 03 0F 02 0B 76 0B 82 FB FF 77"
),
],
},
}
class jk232(AbstractProtocol):
def __str__(self):
return "JKBMS RS232 serial communication protocol handler"
def __init__(self, *args, **kwargs) -> None:
super().__init__()
self._protocol_id = b"JK232"
self.COMMANDS = COMMANDS
self.STATUS_COMMANDS = [
"getBalancerData",
]
self.SETTINGS_COMMANDS = [
"",
]
self.DEFAULT_COMMAND = "getBalancerData"
def get_full_command(self, command) -> bytes:
"""
Override the default get_full_command as its different
"""
log.info(f"Using protocol {self._protocol_id} with {len(self.COMMANDS)} commands")
# These need to be set to allow other functions to work`
self._command = command
self._command_defn = self.get_command_defn(command)
# End of required variables setting
if self._command_defn is None:
# Maybe return a default here?
return None
if "command_code" in self._command_defn:
# Read basic information and status
# DD A5 03 00 FF FD 77
# full command is 7 bytes long
cmd = bytearray(7)
# start bit 0xDD
cmd[0] = 0xDD
log.debug(f"cmd with start bit: {cmd}")
# status 0xA5 means read, status 0x5A means write.
if self._command_defn["type"] == "SETTER":
cmd[1] = 0x5A
else:
cmd[1] = 0xA5
# command code 0x03
command_code = int(self._command_defn["command_code"], 16)
# Data length: 1 byte, indicating the effective length of the data carried in the frame.
# Data content: N bytes, the content carried by the frame data, when the data length is 0, there is no such part.
data = ""
# TODO: data stuff here
data_len = len(data)
if data_len == 0:
crc_high, crc_low = crc([command_code, data_len])
cmd[2] = command_code
cmd[3] = data_len
cmd[4] = crc_high
cmd[5] = crc_low
cmd[6] = 0x77
log.debug(f"cmd with crc: {cmd}")
return cmd
def get_responses(self, response):
"""
Override the default get_responses as its different
"""
responses = []
# remove \n
# response = response.replace(b"\n", b"")
if self._command_defn is not None and self._command_defn["response_type"] == "POSITIONAL":
# Have a POSITIONAL type response, so need to break it up...
# example defn :
# "response": [
# ["discard", 1, "start flag", ""],
# ["discard", 1, "module address", ""],
# ["discard", 1, "command id", ""],
# ["discard", 1, "data length", ""],
# ]
# example response data b"\xa5\x01\x90\x08\x02\x10\x00\x00uo\x03\xbc\xf3",
for defn in self._command_defn["response"]:
size = defn[1]
item = response[:size]
responses.append(item)
response = response[size:]
if response:
responses.append(response)
log.debug(f"get_responses: responses {responses}")
return responses
else:
return bytearray(response)