forked from robotpy/robotpy-commands-v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproxycommand.py
90 lines (68 loc) · 2.79 KB
/
proxycommand.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import annotations
from typing import Callable, overload
from .command import Command
from .commandgroup import *
from .util import format_args_kwargs
class ProxyCommand(Command):
"""
Schedules the given command when this command is initialized, and ends when it ends. Useful for
forking off from CommandGroups. If this command is interrupted, it will cancel the command.
"""
_supplier: Callable[[], Command]
@overload
def __init__(self, supplier: Callable[[], Command]):
"""
Creates a new ProxyCommand that schedules the supplied command when initialized, and ends when
it is no longer scheduled. Useful for lazily creating commands at runtime.
:param supplier: the command supplier"""
...
@overload
def __init__(self, command: Command):
"""
Creates a new ProxyCommand that schedules the given command when initialized, and ends when it
is no longer scheduled.
:param command: the command to run by proxy"""
...
def __init__(self, *args, **kwargs):
super().__init__()
def init_supplier(supplier: Callable[[], Command]):
self._supplier = supplier
def init_command(command: Command):
self._supplier = lambda: command
num_args = len(args) + len(kwargs)
if num_args == 1 and len(kwargs) == 1:
if "supplier" in kwargs:
return init_supplier(kwargs["supplier"])
elif "command" in kwargs:
return init_command(kwargs["command"])
elif num_args == 1 and len(args) == 1:
if isinstance(args[0], Command):
return init_command(args[0])
elif callable(args[0]):
return init_supplier(args[0])
raise TypeError(
f"""
TypeError: ProxyCommand(): incompatible function arguments. The following argument types are supported:
1. (self: ProxyCommand, supplier: () -> Command)
2. (self: ProxyCommand, command: Command)
Invoked with: {format_args_kwargs(self, *args, **kwargs)}
"""
)
def initialize(self):
self._command = self._supplier()
self._command.schedule()
def end(self, interrupted: bool):
if interrupted and self._command is not None:
self._command.cancel()
self._command = None
def execute(self):
pass
def isFinished(self) -> bool:
return self._command is None or not self._command.isScheduled()
def runsWhenDisabled(self) -> bool:
"""
Whether the given command should run when the robot is disabled. Override to return true if the
command should run when disabled.
:returns: true. Otherwise, this proxy would cancel commands that do run when disabled.
"""
return True