-
Notifications
You must be signed in to change notification settings - Fork 9
/
worker.py
228 lines (186 loc) · 7.54 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# -*- coding: utf-8 -*-
# (The MIT License)
#
# Copyright (c) 2013-2020 Kura
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Provides functionality to manage child processes from the supervisor."""
import asyncio
import logging
import os
import signal
import time
from . import protocols
from .child import Child
from .control import setgid, setuid
from .streams import StreamProtocol
try:
import setproctitle
except ImportError:
setproctitle = None
__all__ = ("Worker",)
logger = logging.getLogger("blackhole.worker")
class Worker:
"""
A worker.
Providers functionality to manage a single child process. The worker is
responsible for communicating heartbeat information with it's child
process and starting, stopping and restarting a child as required or
instructed.
"""
_started = False
ping_count = 0
def __init__(self, idx, socks, loop=None):
"""
Initialise the worker.
:param str idx: The number reference of the worker and child.
:param list socks: Sockets to listen for connections on.
:param loop: The event loop to use.
:type loop: :py:class:`asyncio.unix_events._UnixSelectorEventLoop` or
:py:obj:`None` to get the current event loop using
:py:func:`asyncio.get_event_loop`.
"""
self.loop = loop if loop is not None else asyncio.get_event_loop()
self.socks = socks
self.idx = idx
self.start()
def start(self):
"""Create and fork off a child process for the current worker."""
assert not self._started
self._started = True
self.up_read, self.up_write = os.pipe()
self.down_read, self.down_write = os.pipe()
self.pid = os.fork()
if self.pid > 0: # Parent
asyncio.ensure_future(self.connect())
else: # Child
self.setup_child()
def setup_child(self):
"""Basic setup for the child process and starting it."""
setgid()
setuid()
asyncio.set_event_loop(None)
if setproctitle:
setproctitle.setproctitle("blackhole: worker")
process = Child(self.up_read, self.down_write, self.socks, self.idx)
process.start()
def restart_child(self):
"""Restart the child process."""
self.kill_child()
self.pid = os.fork()
if self.pid > 0:
self.ping_count = 0
self.ping = time.monotonic()
else:
self.setup_child()
def kill_child(self):
"""Kill the child process."""
try:
os.kill(self.pid, signal.SIGTERM)
os.wait()
except ProcessLookupError:
pass
async def heartbeat(self, writer):
"""
Handle heartbeat between a worker and child.
If a child process stops communicating with it's worker, it will be
killed, the worker managing it will also be removed and a new worker
and child will be spawned.
:param asyncio.StreamWriter writer: An object for writing data to the
pipe.
.. note::
3 bytes are used in the communication channel.
- b'x01' -- :const:`blackhole.protocols.PING`
- b'x02' -- :const:`blackhole.protocols.PONG`
The worker will sleep for 15 seconds, before requesting a ping from
the child. If we go for over 30 seconds waiting for a ping, the
worker will restart itself and the child bound to it.
These message values are defined in the :mod:`blackhole.protocols`
schema. Documentation is available at --
https://kura.github.io/blackhole/api-protocols.html
"""
while self._started:
await asyncio.sleep(15)
if (time.monotonic() - self.ping) < 30:
writer.write(protocols.PING)
else:
if self._started:
logger.debug(
f"worker.{self.idx}.heartbeat: Communication failed. "
"Restarting worker"
)
self.restart_child()
async def chat(self, reader):
"""
Communicate between a worker and child.
If communication with the child fails the worker is shutdown and the
child is killed.
:param asyncio.StreamReader reader: An object for reading data from
the pipe.
.. note::
3 bytes are used in the communication channel.
- b'x01' -- :const:`blackhole.protocols.PING`
- b'x02' -- :const:`blackhole.protocols.PONG`
Read data coming in from the child. If a PONG is received, we'll
update the worker, setting this PONG as a 'PING' from the child.
These message values are defined in the :mod:`blackhole.protocols`
schema. Documentation is available at --
https://kura.github.io/blackhole/api-protocols.html
"""
while self._started:
try:
msg = await reader.read(3)
if msg == protocols.PONG:
logger.debug(
f"worker.{self.idx}.chat: Pong received from child"
)
self.ping = time.monotonic()
self.ping_count += 1
except: # noqa
self.stop()
await asyncio.sleep(5)
async def connect(self):
"""
Connect the child and worker so they can communicate.
:param int up_write: A file descriptor.
:param int down_read: A file descriptor.
"""
read_fd = os.fdopen(self.down_read, "rb")
r_trans, r_proto = await self.loop.connect_read_pipe(
StreamProtocol, read_fd
)
write_fd = os.fdopen(self.up_write, "wb")
w_trans, w_proto = await self.loop.connect_write_pipe(
StreamProtocol, write_fd
)
reader = r_proto.reader
writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop)
self.ping = time.monotonic()
self.rtransport = r_trans
self.wtransport = w_trans
self.chat_task = asyncio.ensure_future(self.chat(reader))
self.heartbeat_task = asyncio.ensure_future(self.heartbeat(writer))
def stop(self):
"""Terminate the worker and it's respective child process."""
self.kill_child()
self._started = False
self.chat_task.cancel()
self.heartbeat_task.cancel()
self.rtransport.close()
self.wtransport.close()