Skip to content

Commit

Permalink
PipeLogger: non-blocking write to pipe (bug 709746)
Browse files Browse the repository at this point in the history
Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico@gentoo.org>
  • Loading branch information
zmedico committed Feb 24, 2020
1 parent 5c40c3e commit 2771265
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 14 deletions.
41 changes: 40 additions & 1 deletion lib/portage/tests/process/test_PopenProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures._asyncio.streams import _reader
from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from _emerge.PipeReader import PipeReader

class PopenPipeTestCase(TestCase):
Expand Down Expand Up @@ -73,8 +75,41 @@ def _testPipeLogger(self, test_string):

return content.decode('ascii', 'replace')

@coroutine
def _testPipeLoggerToPipe(self, test_string, loop=None):
"""
Test PipeLogger writing to a pipe connected to a PipeReader.
This verifies that PipeLogger does not deadlock when writing
to a pipe that's drained by a PipeReader running in the same
process (requires non-blocking write).
"""

producer = PopenProcess(proc=subprocess.Popen(
["bash", "-c", self._echo_cmd % test_string],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
scheduler=loop)

pr, pw = os.pipe()

consumer = producer.pipe_reader = PipeLogger(background=True,
input_fd=producer.proc.stdout,
log_file_path=os.fdopen(pw, 'wb', 0))

reader = _reader(pr, loop=loop)
yield producer.async_start()
content = yield reader
yield producer.async_wait()
yield consumer.async_wait()

self.assertEqual(producer.returncode, os.EX_OK)
self.assertEqual(consumer.returncode, os.EX_OK)

coroutine_return(content.decode('ascii', 'replace'))

def testPopenPipe(self):
for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
loop = global_event_loop()

for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16):
test_string = x * "a"
output = self._testPipeReader(test_string)
self.assertEqual(test_string, output,
Expand All @@ -83,3 +118,7 @@ def testPopenPipe(self):
output = self._testPipeLogger(test_string)
self.assertEqual(test_string, output,
"x = %s, len(output) = %s" % (x, len(output)))

output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
self.assertEqual(test_string, output,
"x = %s, len(output) = %s" % (x, len(output)))
67 changes: 54 additions & 13 deletions lib/portage/util/_async/PipeLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

import portage
from portage import os, _encodings, _unicode_encode
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine
from portage.util.futures.unix_events import _set_nonblocking
from _emerge.AbstractPollTask import AbstractPollTask

class PipeLogger(AbstractPollTask):
Expand All @@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
"""

__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
("_log_file", "_log_file_real")
("_io_loop_task", "_log_file", "_log_file_real")

def _start(self):

log_file_path = self.log_file_path
if log_file_path is not None:

if hasattr(log_file_path, 'write'):
self._log_file = log_file_path
_set_nonblocking(self._log_file.fileno())
elif log_file_path is not None:
self._log_file = open(_unicode_encode(log_file_path,
encoding=_encodings['fs'], errors='strict'), mode='ab')
if log_file_path.endswith('.gz'):
Expand Down Expand Up @@ -57,16 +62,17 @@ def _start(self):
fcntl.fcntl(fd, fcntl.F_SETFD,
fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)

self.scheduler.add_reader(fd, self._output_handler, fd)
self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
self._io_loop_task.add_done_callback(self._io_loop_done)
self._registered = True

def _cancel(self):
self._unregister()
if self.returncode is None:
self.returncode = self._cancelled_returncode

def _output_handler(self, fd):

@coroutine
def _io_loop(self, fd):
background = self.background
stdout_fd = self.stdout_fd
log_file = self._log_file
Expand All @@ -76,14 +82,18 @@ def _output_handler(self, fd):

if buf is None:
# not a POLLIN event, EAGAIN, etc...
break
future = self.scheduler.create_future()
self.scheduler.add_reader(fd, future.set_result, None)
try:
yield future
finally:
self.scheduler.remove_reader(fd)
future.done() or future.cancel()
continue

if not buf:
# EOF
self._unregister()
self.returncode = self.returncode or os.EX_OK
self._async_wait()
break
return

else:
if not background and stdout_fd is not None:
Expand Down Expand Up @@ -120,8 +130,34 @@ def _output_handler(self, fd):
fcntl.F_GETFL) ^ os.O_NONBLOCK)

if log_file is not None:
log_file.write(buf)
log_file.flush()
write_buf = buf
while True:
try:
if write_buf is not None:
log_file.write(write_buf)
write_buf = None
log_file.flush()
except EnvironmentError as e:
if e.errno != errno.EAGAIN:
raise
future = self.scheduler.create_future()
self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
try:
yield future
finally:
self.scheduler.remove_writer(self._log_file.fileno())
future.done() or future.cancel()
else:
break

def _io_loop_done(self, future):
try:
future.result()
except asyncio.CancelledError:
self.cancel()
self._was_cancelled()
self.returncode = self.returncode or os.EX_OK
self._async_wait()

def _unregister(self):
if self.input_fd is not None:
Expand All @@ -133,11 +169,16 @@ def _unregister(self):
self.input_fd.close()
self.input_fd = None

if self._io_loop_task is not None:
self._io_loop_task.done() or self._io_loop_task.cancel()
self._io_loop_task = None

if self.stdout_fd is not None:
os.close(self.stdout_fd)
self.stdout_fd = None

if self._log_file is not None:
self.scheduler.remove_writer(self._log_file.fileno())
self._log_file.close()
self._log_file = None

Expand Down

0 comments on commit 2771265

Please sign in to comment.