Skip to content

Commit

Permalink
Use process tree and kill tree instead of parent only. Fix #90 (#92)
Browse files Browse the repository at this point in the history
* Use process tree and kill tree instead of parent only. Fix #90

Bump to 1.4.1

Signed-off-by: Alexey Stepanov <penguinolog@gmail.com>

* Force to close FIFO buffers instead of waiting for garbage collector

Signed-off-by: Alexey Stepanov <penguinolog@gmail.com>
  • Loading branch information
penguinolog committed Oct 23, 2018
1 parent c4ecd03 commit 0e3879d
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 60 deletions.
2 changes: 1 addition & 1 deletion exec_helpers/__init__.py
Expand Up @@ -51,7 +51,7 @@
"ExecResult",
)

__version__ = "1.4.0"
__version__ = "1.4.1"
__author__ = "Alexey Stepanov"
__author_email__ = "penguinolog@gmail.com"
__maintainers__ = {
Expand Down
19 changes: 3 additions & 16 deletions exec_helpers/_ssh_client_base.py
Expand Up @@ -22,7 +22,7 @@

import abc
import base64
import collections

# noinspection PyCompatibility
import concurrent.futures
import copy
Expand Down Expand Up @@ -98,19 +98,6 @@ class _MemorizedSSH(abc.ABCMeta):

__cache = {} # type: typing.Dict[typing.Tuple[str, int], SSHClientBase]

@classmethod
def __prepare__(
mcs, # type: typing.Type[_MemorizedSSH]
name, # type: str
bases, # type: typing.Iterable[typing.Type]
**kwargs # type: typing.Any
): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument
"""Metaclass magic for object storage.
.. versionadded:: 1.2.0
"""
return collections.OrderedDict() # pragma: no cover

def __call__( # type: ignore
cls, # type: _MemorizedSSH
host, # type: str
Expand Down Expand Up @@ -650,7 +637,7 @@ def poll_streams(): # type: () -> None
if stderr and interface.recv_stderr_ready():
result.read_stderr(src=stderr, log=self.logger, verbose=verbose)

@threaded.threadpooled # type: ignore
@threaded.threadpooled
def poll_pipes(stop,): # type: (threading.Event) -> None
"""Polling task for FIFO buffers.
Expand Down Expand Up @@ -807,7 +794,7 @@ def execute_together(
.. versionchanged:: 1.2.0 default timeout 1 hour
.. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd
"""
@threaded.threadpooled # type: ignore
@threaded.threadpooled
def get_result(remote): # type: (SSHClientBase) -> exec_result.ExecResult
"""Get result from remote call."""
async_result = remote.execute_async(command, **kwargs) # type: SshExecuteAsyncResult
Expand Down
2 changes: 1 addition & 1 deletion exec_helpers/proc_enums.py
Expand Up @@ -155,7 +155,7 @@ def exit_code_to_enum(code): # type: (typing.Union[int, ExitCodes]) -> typing.U
"""Convert exit code to enum if possible."""
if isinstance(code, int) and code in ExitCodes.__members__.values():
return ExitCodes(code)
return code
return code # pragma: no cover


def exit_codes_to_enums(
Expand Down
6 changes: 2 additions & 4 deletions exec_helpers/ssh_auth.py
Expand Up @@ -209,15 +209,13 @@ def __ne__(self, other): # type: (typing.Any) -> bool

def __deepcopy__(self, memo): # type: (typing.Any) -> SSHAuth
"""Helper for copy.deepcopy."""
return self.__class__( # type: ignore
return self.__class__(
username=self.username, password=self.__password, key=self.__key, keys=copy.deepcopy(self.__keys)
)

def __copy__(self): # type: () -> SSHAuth
"""Copy self."""
return self.__class__( # type: ignore
username=self.username, password=self.__password, key=self.__key, keys=self.__keys
)
return self.__class__(username=self.username, password=self.__password, key=self.__key, keys=self.__keys)

def __repr__(self): # type: (...) -> str
"""Representation for debug purposes."""
Expand Down
69 changes: 52 additions & 17 deletions exec_helpers/subprocess_runner.py
Expand Up @@ -21,17 +21,19 @@
from __future__ import unicode_literals

import abc
import collections

# noinspection PyCompatibility
import concurrent.futures
import errno
import logging
import os
import platform
import subprocess # nosec # Expected usage
import threading
import typing # noqa: F401 # pylint: disable=unused-import

import six
import psutil # type: ignore
import threaded

from exec_helpers import api
Expand All @@ -44,6 +46,39 @@
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+


# Adopt from:
# https://stackoverflow.com/questions/1230669/subprocess-deleting-child-processes-in-windows
def kill_proc_tree(pid, including_parent=True): # type: (int, bool) -> None # pragma: no cover
"""Kill process tree.
:param pid: PID of parent process to kill
:type pid: int
:param including_parent: kill also parent process
:type including_parent: bool
"""
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children: # type: psutil.Process
child.kill()
_, alive = psutil.wait_procs(children, timeout=5)
for proc in alive: # type: psutil.Process
proc.kill() # 2nd shot
if including_parent:
parent.kill()
parent.wait(5)


# Subprocess extra arguments.
# Flags from:
# https://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated
kw = {} # type: typing.Dict[str, typing.Any]
if "Windows" == platform.system(): # pragma: no cover
kw["creationflags"] = 0x00000200
else: # pragma: no cover
kw["preexec_fn"] = os.setsid


# noinspection PyTypeHints
class SubprocessExecuteAsyncResult(api.ExecuteAsyncResult):
"""Override original NamedTuple with proper typing."""

Expand All @@ -70,19 +105,6 @@ def __call__(cls, *args, **kwargs): # type: (SingletonMeta, typing.Any, typing.
cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
return cls._instances[cls]

@classmethod
def __prepare__(
mcs,
name, # type: str
bases, # type: typing.Iterable[typing.Type]
**kwargs # type: typing.Any
): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument
"""Metaclass magic for object storage.
.. versionadded:: 1.2.0
"""
return collections.OrderedDict() # pragma: no cover


class Subprocess(six.with_metaclass(SingletonMeta, api.ExecHelper)):
"""Subprocess helper with timeouts and lock-free FIFO."""
Expand Down Expand Up @@ -138,17 +160,24 @@ def _exec_command(
.. versionadded:: 1.2.0
"""
@threaded.threadpooled # type: ignore
@threaded.threadpooled
def poll_stdout(): # type: () -> None
"""Sync stdout poll."""
result.read_stdout(src=stdout, log=logger, verbose=verbose)
interface.wait() # wait for the end of execution

@threaded.threadpooled # type: ignore
@threaded.threadpooled
def poll_stderr(): # type: () -> None
"""Sync stderr poll."""
result.read_stderr(src=stderr, log=logger, verbose=verbose)

def close_streams(): # type: () -> None
"""Enforce FIFO closure."""
if stdout is not None and not stdout.closed:
stdout.close()
if stderr is not None and not stderr.closed:
stderr.close()

# Store command with hidden data
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)

Expand All @@ -167,11 +196,13 @@ def poll_stderr(): # type: () -> None
# Process closed?
if exit_code is not None:
result.exit_code = exit_code
close_streams()
return result
# Kill not ended process and wait for close
try:
kill_proc_tree(interface.pid, including_parent=False) # kill -9 for all subprocesses
interface.kill() # kill -9
concurrent.futures.wait([stdout_future, stderr_future], timeout=5)
concurrent.futures.wait([stdout_future, stderr_future], timeout=0.5)
# Force stop cycle if no exit code after kill
stdout_future.cancel()
stderr_future.cancel()
Expand All @@ -182,6 +213,8 @@ def poll_stderr(): # type: () -> None
result.exit_code = exit_code
return result
raise # Some other error
finally:
close_streams()

wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
logger.debug(wait_err_msg)
Expand Down Expand Up @@ -244,6 +277,7 @@ def execute_async(
cwd=kwargs.get("cwd", None),
env=kwargs.get("env", None),
universal_newlines=False,
**kw
)

if stdin is None:
Expand All @@ -264,6 +298,7 @@ def execute_async(
elif exc.errno in (errno.EPIPE, errno.ESHUTDOWN): # pragma: no cover
self.logger.warning("STDIN Send failed: broken PIPE")
else:
kill_proc_tree(process.pid)
process.kill()
raise
try:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -7,3 +7,4 @@ advanced-descriptors>=1.0 # Apache-2.0
typing >= 3.6 # PSF
futures>=3.1; python_version == "2.7"
enum34>=1.1; python_version == "2.7"
psutil >= 5.0

0 comments on commit 0e3879d

Please sign in to comment.