|
2 | 2 | import struct
|
3 | 3 | import threading
|
4 | 4 | import time
|
5 |
| -from typing import Callable, Optional, TYPE_CHECKING |
| 5 | +from typing import Callable, Dict, Final, List, Optional, TYPE_CHECKING |
6 | 6 |
|
7 | 7 | import canopen.network
|
8 | 8 |
|
|
12 | 12 |
|
13 | 13 | logger = logging.getLogger(__name__)
|
14 | 14 |
|
15 |
| -NMT_STATES = { |
| 15 | +NMT_STATES: Final[Dict[int, str]] = { |
16 | 16 | 0: 'INITIALISING',
|
17 | 17 | 4: 'STOPPED',
|
18 | 18 | 5: 'OPERATIONAL',
|
|
21 | 21 | 127: 'PRE-OPERATIONAL'
|
22 | 22 | }
|
23 | 23 |
|
24 |
| -NMT_COMMANDS = { |
| 24 | +NMT_COMMANDS: Final[Dict[str, int]] = { |
25 | 25 | 'OPERATIONAL': 1,
|
26 | 26 | 'STOPPED': 2,
|
27 | 27 | 'SLEEP': 80,
|
|
32 | 32 | 'RESET COMMUNICATION': 130
|
33 | 33 | }
|
34 | 34 |
|
35 |
| -COMMAND_TO_STATE = { |
| 35 | +COMMAND_TO_STATE: Final[Dict[int, int]] = { |
36 | 36 | 1: 5,
|
37 | 37 | 2: 4,
|
38 | 38 | 80: 80,
|
@@ -117,7 +117,7 @@ def __init__(self, node_id: int):
|
117 | 117 | #: Timestamp of last heartbeat message
|
118 | 118 | self.timestamp: Optional[float] = None
|
119 | 119 | self.state_update = threading.Condition()
|
120 |
| - self._callbacks = [] |
| 120 | + self._callbacks: List[Callable[[int], None]] = [] |
121 | 121 |
|
122 | 122 | def on_heartbeat(self, can_id, data, timestamp):
|
123 | 123 | with self.state_update:
|
@@ -186,7 +186,8 @@ def start_node_guarding(self, period: float):
|
186 | 186 | :param period:
|
187 | 187 | Period (in seconds) at which the node guarding should be advertised to the slave node.
|
188 | 188 | """
|
189 |
| - if self._node_guarding_producer : self.stop_node_guarding() |
| 189 | + if self._node_guarding_producer: |
| 190 | + self.stop_node_guarding() |
190 | 191 | self._node_guarding_producer = self.network.send_periodic(0x700 + self.id, None, period, True)
|
191 | 192 |
|
192 | 193 | def stop_node_guarding(self):
|
|
0 commit comments