-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathparallelcommandgroup.py
95 lines (76 loc) · 3.43 KB
/
parallelcommandgroup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# validated: 2024-01-19 DS aaea85ff1656 ParallelCommandGroup.java
from __future__ import annotations
from typing import Dict
from .command import Command, InterruptionBehavior
from .commandscheduler import CommandScheduler
from .exceptions import IllegalCommandUse
from .util import flatten_args_commands
class ParallelCommandGroup(Command):
"""
A command composition that runs a set of commands in parallel, ending when the last command ends.
The rules for command compositions apply: command instances that are passed to it cannot be
added to any other composition or scheduled individually, and the composition requires all
subsystems its components require.
"""
def __init__(self, *commands: Command):
"""
Creates a new ParallelCommandGroup. The given commands will be executed simultaneously. The
command composition will finish when the last command finishes. If the composition is
interrupted, only the commands that are still running will be interrupted.
:param commands: the commands to include in this composition.
"""
super().__init__()
self._commands: Dict[Command, bool] = {}
self._runsWhenDisabled = True
self._interruptBehavior = InterruptionBehavior.kCancelIncoming
self.addCommands(*commands)
def addCommands(self, *commands: Command):
"""
Adds the given commands to the group.
:param commands: Commands to add to the group
"""
commands = flatten_args_commands(commands)
if True in self._commands.values():
raise IllegalCommandUse(
"Commands cannot be added to a composition while it is running"
)
CommandScheduler.getInstance().registerComposedCommands(commands)
for command in commands:
in_common = command.getRequirements().intersection(self.requirements)
if in_common:
raise IllegalCommandUse(
"Multiple commands in a parallel composition cannot require the same subsystems.",
common=in_common,
)
self._commands[command] = False
self.requirements.update(command.getRequirements())
self._runsWhenDisabled = (
self._runsWhenDisabled and command.runsWhenDisabled()
)
if command.getInterruptionBehavior() == InterruptionBehavior.kCancelSelf:
self._interruptBehavior = InterruptionBehavior.kCancelSelf
def initialize(self):
for command in self._commands:
command.initialize()
self._commands[command] = True
def execute(self):
for command, isRunning in self._commands.items():
if not isRunning:
continue
command.execute()
if command.isFinished():
command.end(False)
self._commands[command] = False
def end(self, interrupted: bool):
if interrupted:
for command, isRunning in self._commands.items():
if not isRunning:
continue
command.end(True)
self._commands[command] = False
def isFinished(self) -> bool:
return True not in self._commands.values()
def runsWhenDisabled(self) -> bool:
return self._runsWhenDisabled
def getInterruptionBehavior(self) -> InterruptionBehavior:
return self._interruptBehavior