-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathsubsystem.py
181 lines (137 loc) · 7.04 KB
/
subsystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# Don't import stuff from the package here, it'll cause a circular import
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Optional
from typing_extensions import Self
if TYPE_CHECKING:
from .command import Command
from .commandscheduler import CommandScheduler
from wpiutil import Sendable, SendableBuilder, SendableRegistry
class Subsystem(Sendable):
"""
A robot subsystem. Subsystems are the basic unit of robot organization in the Command-based
framework; they encapsulate low-level hardware objects (motor controllers, sensors, etc.) and
provide methods through which they can be used by Commands. Subsystems are used by the
CommandScheduler's resource management system to ensure multiple robot actions are not
"fighting" over the same hardware; Commands that use a subsystem should include that subsystem in
their Command#getRequirements() method, and resources used within a subsystem should
generally remain encapsulated and not be shared by other parts of the robot.
Subsystems must be registered with the scheduler with the {@link
CommandScheduler#registerSubsystem(Subsystem...)} method in order for the {@link
Subsystem#periodic()} method to be called. It is recommended that this method be called from the
constructor of users' Subsystem implementations. The SubsystemBase class offers a simple
base for user implementations that handles this.
"""
def __new__(cls, *arg, **kwargs) -> Self:
instance = super().__new__(cls)
super().__init__(instance)
SendableRegistry.addLW(instance, cls.__name__, cls.__name__)
# add to the scheduler
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().registerSubsystem(instance)
return instance
def __init__(self) -> None:
pass
def periodic(self) -> None:
"""
This method is called periodically by the CommandScheduler. Useful for updating
subsystem-specific state that you don't want to offload to a Command. Teams should try
to be consistent within their own codebases about which responsibilities will be handled by
Commands, and which will be handled here."""
pass
def simulationPeriodic(self) -> None:
"""
This method is called periodically by the CommandScheduler. Useful for updating
subsystem-specific state that needs to be maintained for simulations, such as for updating
edu.wpi.first.wpilibj.simulation classes and setting simulated sensor readings.
"""
pass
def setDefaultCommand(self, command: Command) -> None:
"""
Sets the default Command of the subsystem. The default command will be automatically
scheduled when no other commands are scheduled that require the subsystem. Default commands
should generally not end on their own, i.e. their Command#isFinished() method should
always return false. Will automatically register this subsystem with the {@link
CommandScheduler}.
:param defaultCommand: the default command to associate with this subsystem"""
from .commandscheduler import CommandScheduler
CommandScheduler.getInstance().setDefaultCommand(self, command)
def removeDefaultCommand(self) -> None:
"""
Removes the default command for the subsystem. This will not cancel the default command if it
is currently running."""
CommandScheduler.getInstance().removeDefaultCommand(self)
def getDefaultCommand(self) -> Optional[Command]:
"""
Gets the default command for this subsystem. Returns null if no default command is currently
associated with the subsystem.
:returns: the default command associated with this subsystem"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().getDefaultCommand(self)
def getCurrentCommand(self) -> Optional[Command]:
"""
Returns the command currently running on this subsystem. Returns null if no command is
currently scheduled that requires this subsystem.
:returns: the scheduled command currently requiring this subsystem"""
from .commandscheduler import CommandScheduler
return CommandScheduler.getInstance().requiring(self)
def runOnce(self, action: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action once and finishes. Requires this subsystem.
:param action: the action to run
:return: the command"""
from .cmd import runOnce
return runOnce(action, self)
def run(self, action: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted. Requires this
subsystem.
:param action: the action to run
:returns: the command"""
from .cmd import run
return run(action, self)
def startEnd(self, start: Callable[[], None], end: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action once and another action when the command is
interrupted. Requires this subsystem.
:param start: the action to run on start
:param end: the action to run on interrupt
:returns: the command"""
from .cmd import startEnd
return startEnd(start, end, self)
def runEnd(self, run: Callable[[], None], end: Callable[[], None]) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted, and then runs a
second action. Requires this subsystem.
:param run: the action to run every iteration
:param end: the action to run on interrupt
:returns: the command"""
from .cmd import runEnd
return runEnd(run, end, self)
def getName(self) -> str:
return SendableRegistry.getName(self)
def setName(self, name: str) -> None:
SendableRegistry.setName(self, name)
def getSubsystem(self) -> str:
return SendableRegistry.getSubsystem(self)
def addChild(self, name: str, child: Sendable) -> None:
SendableRegistry.addLW(child, self.getSubsystem(), name)
def initSendable(self, builder: SendableBuilder) -> None:
builder.setSmartDashboardType("Subsystem")
builder.addBooleanProperty(
".hasDefault", lambda: self.getDefaultCommand() is not None, lambda _: None
)
def get_default():
command = self.getDefaultCommand()
if command is not None:
return command.getName()
return "none"
builder.addStringProperty(".default", get_default, lambda _: None)
builder.addBooleanProperty(
".hasCommand", lambda: self.getCurrentCommand() is not None, lambda _: None
)
def get_current():
command = self.getCurrentCommand()
if command is not None:
return command.getName()
return "none"
builder.addStringProperty(".command", get_current, lambda _: None)