-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathcmd.py
191 lines (144 loc) · 5.52 KB
/
cmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from typing import Any, Callable, Dict, Hashable
from .command import Command
from .conditionalcommand import ConditionalCommand
from .functionalcommand import FunctionalCommand
from .instantcommand import InstantCommand
from .parallelcommandgroup import ParallelCommandGroup
from .paralleldeadlinegroup import ParallelDeadlineGroup
from .parallelracegroup import ParallelRaceGroup
from .printcommand import PrintCommand
from .runcommand import RunCommand
from .selectcommand import SelectCommand
from .sequentialcommandgroup import SequentialCommandGroup
from .startendcommand import StartEndCommand
from .subsystem import Subsystem
from .waitcommand import WaitCommand
from .waituntilcommand import WaitUntilCommand
# Is this whole file just to avoid the `new` keyword in Java?
def none() -> Command:
"""
Constructs a command that does nothing, finishing immediately.
:returns: the command
"""
return InstantCommand()
def runOnce(action: Callable[[], Any], *requirements: Subsystem) -> Command:
"""
Constructs a command that runs an action once and finishes.
:param action: the action to run
:param requirements: subsystems the action requires
:returns: the command
"""
return InstantCommand(action, *requirements)
def run(action: Callable[[], Any], *requirements: Subsystem) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted.
:param action: the action to run
:param requirements: subsystems the action requires
:returns: the command
"""
return RunCommand(action, *requirements)
def startEnd(
run: Callable[[], Any], end: Callable[[], Any], *requirements: Subsystem
) -> Command:
"""
Constructs a command that runs an action once and another action when the command is
interrupted.
:param start: the action to run on start
:param end: the action to run on interrupt
:param requirements: subsystems the action requires
:returns: the command
"""
return StartEndCommand(run, lambda: end(), *requirements)
def runEnd(
run: Callable[[], Any], end: Callable[[], Any], *requirements: Subsystem
) -> Command:
"""
Constructs a command that runs an action every iteration until interrupted, and then runs a
second action.
:param run: the action to run every iteration
:param end: the action to run on interrupt
:param requirements: subsystems the action requires
:returns: the command
"""
return FunctionalCommand(
lambda: None, run, lambda interrupted: end(), lambda: False, *requirements
)
def print_(message: str) -> Command:
"""
Constructs a command that prints a message and finishes.
:param message: the message to print
:returns: the command
"""
return PrintCommand(message)
def waitSeconds(seconds: float) -> Command:
"""
Constructs a command that does nothing, finishing after a specified duration.
:param seconds: after how long the command finishes
:returns: the command
"""
return WaitCommand(seconds)
def waitUntil(condition: Callable[[], bool]) -> Command:
"""
Constructs a command that does nothing, finishing once a condition becomes true.
:param condition: the condition
:returns: the command
"""
return WaitUntilCommand(condition)
def either(onTrue: Command, onFalse: Command, selector: Callable[[], bool]) -> Command:
"""
Runs one of two commands, based on the boolean selector function.
:param onTrue: the command to run if the selector function returns true
:param onFalse: the command to run if the selector function returns false
:param selector: the selector function
:returns: the command
"""
return ConditionalCommand(onTrue, onFalse, selector)
def select(
commands: Dict[Hashable, Command], selector: Callable[[], Hashable]
) -> Command:
"""
Runs one of several commands, based on the selector function.
:param selector: the selector function
:param commands: map of commands to select from
:returns: the command
"""
return SelectCommand(commands, selector)
def sequence(*commands: Command) -> Command:
"""
Runs a group of commands in series, one after the other.
:param commands: the commands to include
:returns: the command group
"""
return SequentialCommandGroup(*commands)
def repeatingSequence(*commands: Command) -> Command:
"""
Runs a group of commands in series, one after the other. Once the last command ends, the group
is restarted.
:param commands: the commands to include
:returns: the command group
"""
return sequence(*commands).repeatedly()
def parallel(*commands: Command) -> Command:
"""
Runs a group of commands at the same time. Ends once all commands in the group finish.
:param commands: the commands to include
:returns: the command
"""
return ParallelCommandGroup(*commands)
def race(*commands: Command) -> Command:
"""
Runs a group of commands at the same time. Ends once any command in the group finishes, and
cancels the others.
:param commands: the commands to include
:returns: the command group
"""
return ParallelRaceGroup(*commands)
def deadline(deadline: Command, *commands: Command) -> Command:
"""
Runs a group of commands at the same time. Ends once a specific command finishes, and cancels
the others.
:param deadline: the deadline command
:param commands: the commands to include
:returns: the command group
"""
return ParallelDeadlineGroup(deadline, *commands)