Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions exec_helpers/exec_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ def __poll_stream(
if log:
log.log(
level=logging.INFO if verbose else logging.DEBUG,
msg=line.decode(
'utf-8',
errors='backslashreplace'
).rstrip()
msg=line.decode('utf-8', errors='backslashreplace').rstrip()
)
except IOError:
pass
Expand Down
214 changes: 37 additions & 177 deletions exec_helpers/subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
import errno
import logging
import os
import select
import sys
import subprocess # nosec # Expected usage
import threading
import time
import typing # noqa # pylint: disable=unused-import

import six
Expand All @@ -45,19 +42,6 @@
# noinspection PyUnresolvedReferences
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+

_win = sys.platform == "win32" # type: bool
_posix = 'posix' in sys.builtin_module_names # type: bool

if _posix: # pragma: no cover
import fcntl # pylint: disable=import-error

elif _win: # pragma: no cover
import ctypes
from ctypes import wintypes # pylint: disable=import-error
from ctypes import windll # pylint: disable=import-error
# noinspection PyUnresolvedReferences
import msvcrt # pylint: disable=import-error


class SingletonMeta(type):
"""Metaclass for Singleton.
Expand All @@ -72,9 +56,8 @@ def __call__(cls, *args, **kwargs):
"""Singleton."""
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super(
SingletonMeta, cls
).__call__(*args, **kwargs)
# noinspection PySuperArguments
cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
return cls._instances[cls]

@classmethod
Expand All @@ -91,77 +74,6 @@ def __prepare__(
return collections.OrderedDict() # pragma: no cover


def set_nonblocking_pipe(pipe): # type: (typing.Any) -> None
"""Set PIPE unblocked to allow polling of all pipes in parallel."""
if pipe is None: # pragma: no cover
return

descriptor = pipe.fileno() # pragma: no cover

if _posix: # pragma: no cover
# Get flags
flags = fcntl.fcntl(descriptor, fcntl.F_GETFL)

# Set nonblock mode
fcntl.fcntl(descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK)

elif _win: # pragma: no cover
# noinspection PyPep8Naming
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [
wintypes.HANDLE, # hNamedPipe
wintypes.LPDWORD, # lpMode
wintypes.LPDWORD, # lpMaxCollectionCount
wintypes.LPDWORD, # lpCollectDataTimeout
]
SetNamedPipeHandleState.restype = wintypes.BOOL
# noinspection PyPep8Naming
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
handle = msvcrt.get_osfhandle(descriptor)

windll.kernel32.SetNamedPipeHandleState(
handle,
ctypes.byref(PIPE_NOWAIT), None, None
)


def set_blocking_pipe(pipe): # type: (typing.Any) -> None
"""Set pipe blocking mode for final read on process close.

This will allow to read pipe until closed on remote side.
"""
if pipe is None: # pragma: no cover
return

descriptor = pipe.fileno() # pragma: no cover

if _posix: # pragma: no cover
# Get flags
flags = fcntl.fcntl(descriptor, fcntl.F_GETFL)

# Set block mode
fcntl.fcntl(descriptor, fcntl.F_SETFL, flags & (flags ^ os.O_NONBLOCK))

elif _win: # pragma: no cover
# noinspection PyPep8Naming
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [
wintypes.HANDLE, # hNamedPipe
wintypes.LPDWORD, # lpMode
wintypes.LPDWORD, # lpMaxCollectionCount
wintypes.LPDWORD, # lpCollectDataTimeout
]
SetNamedPipeHandleState.restype = wintypes.BOOL
# noinspection PyPep8Naming
PIPE_WAIT = wintypes.DWORD(0x00000000)
handle = msvcrt.get_osfhandle(descriptor)

windll.kernel32.SetNamedPipeHandleState(
handle,
ctypes.byref(PIPE_WAIT), None, None
)


class Subprocess(six.with_metaclass(SingletonMeta, api.ExecHelper)):
"""Subprocess helper with timeouts and lock-free FIFO."""

Expand All @@ -179,10 +91,7 @@ def __init__(

.. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd
"""
super(Subprocess, self).__init__(
logger=logger,
log_mask_re=log_mask_re
)
super(Subprocess, self).__init__(logger=logger, log_mask_re=log_mask_re)
self.__process = None

def _exec_command(
Expand Down Expand Up @@ -218,98 +127,57 @@ def _exec_command(

.. versionadded:: 1.2.0
"""
def poll_streams():
"""Poll streams to the result object."""
if _win: # pragma: no cover
# select.select is not supported on windows
result.read_stdout(src=stdout, log=logger, verbose=verbose)
result.read_stderr(src=stderr, log=logger, verbose=verbose)
else: # pragma: no cover
rlist, _, _ = select.select(
[item for item in (stdout, stderr) if item is not None],
[],
[])
if rlist:
if stdout in rlist:
result.read_stdout(
src=stdout,
log=logger,
verbose=verbose
)
if stderr in rlist:
result.read_stderr(
src=stderr,
log=logger,
verbose=verbose
)
@threaded.threadpooled
def poll_stdout():
"""Sync stdout poll."""
result.read_stdout(
src=stdout,
log=logger,
verbose=verbose
)

@threaded.threadpooled
def poll_pipes(stop, ): # type: (threading.Event) -> None
"""Polling task for FIFO buffers.

:type stop: Event
"""
while not stop.is_set():
time.sleep(0.1)
if stdout or stderr:
poll_streams()

interface.poll()

if interface.returncode is not None:
set_blocking_pipe(stdout)
set_blocking_pipe(stderr)
result.read_stdout(
src=stdout,
log=logger,
verbose=verbose
)
result.read_stderr(
src=stderr,
log=logger,
verbose=verbose
)
result.exit_code = interface.returncode

stop.set()
def poll_stderr():
"""Sync stderr poll."""
result.read_stderr(
src=stderr,
log=logger,
verbose=verbose
)

# Store command with hidden data
cmd_for_log = self._mask_command(
cmd=command,
log_mask_re=log_mask_re
)
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)

result = exec_result.ExecResult(cmd=cmd_for_log)
stop_event = threading.Event()

# pylint: disable=assignment-from-no-return
future = poll_pipes(stop_event) # type: concurrent.futures.Future
stdout_future = poll_stdout() # type: concurrent.futures.Future
stderr_future = poll_stderr() # type: concurrent.futures.Future
# pylint: enable=assignment-from-no-return
# wait for process close

concurrent.futures.wait([future], timeout)
concurrent.futures.wait([stdout_future, stderr_future], timeout=timeout) # Wait real timeout here
exit_code = interface.poll() # Update exit code

# Process closed?
if stop_event.is_set():
if exit_code is not None:
result.exit_code = exit_code
return result
# Kill not ended process and wait for close
try:
interface.kill() # kill -9
stop_event.wait(5)
concurrent.futures.wait([stdout_future, stderr_future], timeout=5)
# Force stop cycle if no exit code after kill
stop_event.set()
future.cancel()
stdout_future.cancel()
stderr_future.cancel()
except OSError:
# Nothing to kill
logger.warning(
u"{!s} has been completed just after timeout: "
"please validate timeout.".format(command))
return result

wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(
result=result,
timeout=timeout
)
exit_code = interface.poll()
if exit_code is not None: # Nothing to kill
logger.warning("{!s} has been completed just after timeout: please validate timeout.".format(command))
result.exit_code = exit_code
return result
raise # Some other error

wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
logger.debug(wait_err_msg)
raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout)

Expand Down Expand Up @@ -347,10 +215,7 @@ def execute_async(

.. versionadded:: 1.2.0
"""
cmd_for_log = self._mask_command(
cmd=command,
log_mask_re=log_mask_re
)
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)

self.logger.log(
level=logging.INFO if verbose else logging.DEBUG,
Expand Down Expand Up @@ -395,9 +260,4 @@ def execute_async(
process.kill()
raise

if open_stdout:
set_nonblocking_pipe(process.stdout)
if open_stderr:
set_nonblocking_pipe(process.stderr)

return process, None, process.stderr, process.stdout
9 changes: 0 additions & 9 deletions exec_helpers/subprocess_runner.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ from exec_helpers import exec_result, api
logger: logging.Logger
devnull: typing.IO

_win: bool
_posix: bool

class SingletonMeta(type):
_instances: typing.Dict[typing.Type, typing.Any] = ...
_lock: threading.RLock = ...
Expand All @@ -26,12 +23,6 @@ class SingletonMeta(type):
**kwargs: typing.Dict
) -> collections.OrderedDict: ...


def set_nonblocking_pipe(pipe: typing.Any) -> None: ...

def set_blocking_pipe(pipe: typing.Any) -> None: ...


class Subprocess(api.ExecHelper, metaclass=SingletonMeta):
def __init__(self, log_mask_re: typing.Optional[str] = ...) -> None: ...

Expand Down
Loading