-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathcoroutinecommand.py
139 lines (106 loc) · 4.14 KB
/
coroutinecommand.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from functools import wraps
from typing import Any, Callable, Generator, List, Union, Optional, overload
from ._impl import CommandBase, Subsystem
import inspect
from typing_extensions import TypeGuard
Coroutine = Generator[None, None, None]
CoroutineFunction = Callable[[], Generator[None, None, None]]
Coroutineable = Union[Callable[[], None], CoroutineFunction]
def is_coroutine(func: Any) -> TypeGuard[Coroutine]:
return inspect.isgenerator(func)
def is_coroutine_function(func: Any) -> TypeGuard[CoroutineFunction]:
return inspect.isgeneratorfunction(func)
def is_coroutineable(func: Any) -> TypeGuard[Coroutineable]:
return is_coroutine_function(func) or callable(func)
def ensure_generator_function(func: Coroutineable) -> Callable[..., Coroutine]:
if is_coroutine_function(func):
return func
@wraps(func)
def wrapper(*args, **kwargs):
func(*args, **kwargs)
yield
return wrapper
class CoroutineCommand(CommandBase):
"""
A class that wraps a coroutine function into a command.
"""
coroutine: Optional[Coroutine]
coroutine_function: Optional[Coroutineable]
is_finished: bool
def __init__(
self,
coroutine: Union[Coroutine, Coroutineable],
requirements: Optional[List[Subsystem]] = None,
runs_when_disabled: bool = False,
) -> None:
"""
Creates a CoroutineCommand than can be used as a command.
:param coroutine: The coroutine or coroutine function to bind.
:param requirements: The subsystems that this command requires.
:param runs_when_disabled: Whether or not this command runs when the robot is disabled.
"""
self.coroutine = None
self.coroutine_function = None
self.runsWhenDisabled = lambda: runs_when_disabled
if is_coroutine(coroutine):
self.coroutine = coroutine
elif is_coroutineable(coroutine):
self.coroutine_function = coroutine
else:
raise TypeError("The coroutine must be a coroutine or a coroutine function")
if requirements is not None:
self.addRequirements(requirements)
self.is_finished = False
def initialize(self) -> None:
if self.coroutine_function:
self.coroutine = ensure_generator_function(self.coroutine_function)()
elif self.coroutine and self.is_finished:
RuntimeError("Generator objects cannot be reused.")
self.is_finished = False
def execute(self):
try:
if not self.is_finished:
if not self.coroutine:
raise TypeError("This command was not properly initialized")
next(self.coroutine)
except StopIteration:
self.is_finished = True
def isFinished(self):
return self.is_finished
@overload
def commandify(
*, requirements: Optional[List[Subsystem]] = None, runs_when_disabled: bool = False
) -> Callable[[Coroutineable], Callable[..., CoroutineCommand]]:
"""
A decorator that turns a coroutine function into a command.
A def should be under this.
:param requirements: The subsystems that this command requires.
:param runs_when_disabled: Whether or not this command runs when the robot is disabled.
"""
@overload
def commandify(coroutine: Coroutineable, /) -> Callable[..., CoroutineCommand]:
"""
A decorator that turns a coroutine function into a command.
A def should be under this.
"""
def commandify(
coroutine: Optional[Coroutineable] = None,
/,
*,
requirements: Optional[List[Subsystem]] = None,
runs_when_disabled: bool = False,
) -> Union[
Callable[[Coroutineable], Callable[..., CoroutineCommand]],
Callable[..., CoroutineCommand],
]:
def wrapper(func: Coroutineable) -> Callable[..., CoroutineCommand]:
@wraps(func)
def arg_accepter(*args, **kwargs) -> CoroutineCommand:
return CoroutineCommand(
lambda: ensure_generator_function(func)(*args, **kwargs),
requirements,
)
return arg_accepter
if coroutine is None:
return wrapper
return wrapper(coroutine)