-
Notifications
You must be signed in to change notification settings - Fork 814
/
cancellable.py
136 lines (110 loc) · 4.61 KB
/
cancellable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
import psutil
import subprocess
from patroni.exceptions import PostgresException
from patroni.utils import polling_loop
from threading import Lock
from typing import Any, Dict, List, Optional, Union
logger = logging.getLogger(__name__)
class CancellableExecutor(object):
"""
There must be only one such process so that AsyncExecutor can easily cancel it.
"""
def __init__(self) -> None:
self._process = None
self._process_cmd = None
self._process_children: List[psutil.Process] = []
self._lock = Lock()
def _start_process(self, cmd: List[str], *args: Any, **kwargs: Any) -> Optional[bool]:
"""This method must be executed only when the `_lock` is acquired"""
try:
self._process_children = []
self._process_cmd = cmd
self._process = psutil.Popen(cmd, *args, **kwargs)
except Exception:
return logger.exception('Failed to execute %s', cmd)
return True
def _kill_process(self) -> None:
with self._lock:
if self._process is not None and self._process.is_running() and not self._process_children:
try:
self._process.suspend() # Suspend the process before getting list of children
except psutil.Error as e:
logger.info('Failed to suspend the process: %s', e.msg)
try:
self._process_children = self._process.children(recursive=True)
except psutil.Error:
pass
try:
self._process.kill()
logger.warning('Killed %s because it was still running', self._process_cmd)
except psutil.NoSuchProcess:
pass
except psutil.AccessDenied as e:
logger.warning('Failed to kill the process: %s', e.msg)
def _kill_children(self) -> None:
waitlist: List[psutil.Process] = []
with self._lock:
for child in self._process_children:
try:
child.kill()
except psutil.NoSuchProcess:
continue
except psutil.AccessDenied as e:
logger.info('Failed to kill child process: %s', e.msg)
waitlist.append(child)
psutil.wait_procs(waitlist)
class CancellableSubprocess(CancellableExecutor):
def __init__(self) -> None:
super(CancellableSubprocess, self).__init__()
self._is_cancelled = False
def call(self, *args: Any, **kwargs: Union[Any, Dict[str, str]]) -> Optional[int]:
for s in ('stdin', 'stdout', 'stderr'):
kwargs.pop(s, None)
communicate: Optional[Dict[str, str]] = kwargs.pop('communicate', None)
input_data = None
if isinstance(communicate, dict):
input_data = communicate.get('input')
if input_data:
if input_data[-1] != '\n':
input_data += '\n'
input_data = input_data.encode('utf-8')
kwargs['stdin'] = subprocess.PIPE
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
try:
with self._lock:
if self._is_cancelled:
raise PostgresException('cancelled')
self._is_cancelled = False
started = self._start_process(*args, **kwargs)
if started and self._process is not None:
if isinstance(communicate, dict):
communicate['stdout'], communicate['stderr'] = \
self._process.communicate(input_data) # pyright: ignore [reportGeneralTypeIssues]
return self._process.wait()
finally:
with self._lock:
self._process = None
self._kill_children()
def reset_is_cancelled(self) -> None:
with self._lock:
self._is_cancelled = False
@property
def is_cancelled(self) -> bool:
with self._lock:
return self._is_cancelled
def cancel(self, kill: bool = False) -> None:
with self._lock:
self._is_cancelled = True
if self._process is None or not self._process.is_running():
return
logger.info('Terminating %s', self._process_cmd)
self._process.terminate()
for _ in polling_loop(10):
with self._lock:
if self._process is None or not self._process.is_running():
return
if kill:
break
self._kill_process()