-
Notifications
You must be signed in to change notification settings - Fork 0
/
target.py
113 lines (87 loc) · 2.35 KB
/
target.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import abc
import atexit
from typing import Optional, Type, Dict, Any
from types import TracebackType
from mypy_extensions import trait
from typing_extensions import Self, dataclass_transform
from .util import register_target, register_target_method, mark_as_target
@register_target
@dataclass_transform()
@trait
class Target:
def __post_init__(self) -> None:
atexit.register(self.on_exit)
self.on_init()
@register_target_method("forward")
def on_init(self) -> None:
pass
@register_target_method("forward")
def pre_startup(self) -> None:
pass
@register_target_method("forward")
async def on_startup(self) -> None:
pass
@register_target_method("backward")
async def on_shutdown(self) -> None:
pass
@register_target_method("backward")
def post_shutdown(self) -> None:
pass
@register_target_method("backward")
def on_exit(self) -> None:
pass
def __enter__(self: Self) -> Self:
self.pre_startup()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
self.post_shutdown()
return True
async def __aenter__(self: Self) -> Self:
await self.on_startup()
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
await self.on_shutdown()
return True
@mark_as_target
@trait
class ProcessTargetABC: # (metaclass=abc.ABCMeta):
@abc.abstractmethod
def main_sync(self) -> None:
pass
@abc.abstractmethod
def run_sync(self) -> None:
pass
@abc.abstractmethod
def reload(self) -> None:
pass
@classmethod
def launch(cls, **kwargs: Dict[str, Any]) -> None:
self = cls(**kwargs)
self.run_sync()
@mark_as_target
@trait
class DaemonTargetABC: # (metaclass=abc.ABCMeta):
@abc.abstractmethod
async def main_async(self) -> None:
pass
@abc.abstractmethod
async def run_async(self) -> None:
pass
@abc.abstractmethod
def stop(self) -> None:
pass
__all__ = (
"Target",
"ProcessTargetABC",
"DaemonTargetABC",
)