diff --git a/exec_helpers/__init__.py b/exec_helpers/__init__.py index 06f3003..b4a4012 100644 --- a/exec_helpers/__init__.py +++ b/exec_helpers/__init__.py @@ -51,7 +51,7 @@ "ExecResult", ) -__version__ = "1.4.0" +__version__ = "1.4.1" __author__ = "Alexey Stepanov" __author_email__ = "penguinolog@gmail.com" __maintainers__ = { diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 5eeacfe..b8111dc 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -22,7 +22,7 @@ import abc import base64 -import collections + # noinspection PyCompatibility import concurrent.futures import copy @@ -98,19 +98,6 @@ class _MemorizedSSH(abc.ABCMeta): __cache = {} # type: typing.Dict[typing.Tuple[str, int], SSHClientBase] - @classmethod - def __prepare__( - mcs, # type: typing.Type[_MemorizedSSH] - name, # type: str - bases, # type: typing.Iterable[typing.Type] - **kwargs # type: typing.Any - ): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument - """Metaclass magic for object storage. - - .. versionadded:: 1.2.0 - """ - return collections.OrderedDict() # pragma: no cover - def __call__( # type: ignore cls, # type: _MemorizedSSH host, # type: str @@ -650,7 +637,7 @@ def poll_streams(): # type: () -> None if stderr and interface.recv_stderr_ready(): result.read_stderr(src=stderr, log=self.logger, verbose=verbose) - @threaded.threadpooled # type: ignore + @threaded.threadpooled def poll_pipes(stop,): # type: (threading.Event) -> None """Polling task for FIFO buffers. @@ -807,7 +794,7 @@ def execute_together( .. versionchanged:: 1.2.0 default timeout 1 hour .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ - @threaded.threadpooled # type: ignore + @threaded.threadpooled def get_result(remote): # type: (SSHClientBase) -> exec_result.ExecResult """Get result from remote call.""" async_result = remote.execute_async(command, **kwargs) # type: SshExecuteAsyncResult diff --git a/exec_helpers/proc_enums.py b/exec_helpers/proc_enums.py index 4d4656d..7be3b36 100644 --- a/exec_helpers/proc_enums.py +++ b/exec_helpers/proc_enums.py @@ -155,7 +155,7 @@ def exit_code_to_enum(code): # type: (typing.Union[int, ExitCodes]) -> typing.U """Convert exit code to enum if possible.""" if isinstance(code, int) and code in ExitCodes.__members__.values(): return ExitCodes(code) - return code + return code # pragma: no cover def exit_codes_to_enums( diff --git a/exec_helpers/ssh_auth.py b/exec_helpers/ssh_auth.py index 8507017..67ba5d4 100644 --- a/exec_helpers/ssh_auth.py +++ b/exec_helpers/ssh_auth.py @@ -209,15 +209,13 @@ def __ne__(self, other): # type: (typing.Any) -> bool def __deepcopy__(self, memo): # type: (typing.Any) -> SSHAuth """Helper for copy.deepcopy.""" - return self.__class__( # type: ignore + return self.__class__( username=self.username, password=self.__password, key=self.__key, keys=copy.deepcopy(self.__keys) ) def __copy__(self): # type: () -> SSHAuth """Copy self.""" - return self.__class__( # type: ignore - username=self.username, password=self.__password, key=self.__key, keys=self.__keys - ) + return self.__class__(username=self.username, password=self.__password, key=self.__key, keys=self.__keys) def __repr__(self): # type: (...) -> str """Representation for debug purposes.""" diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index 3598b9d..df1d77d 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -21,17 +21,19 @@ from __future__ import unicode_literals import abc -import collections + # noinspection PyCompatibility import concurrent.futures import errno import logging import os +import platform import subprocess # nosec # Expected usage import threading import typing # noqa: F401 # pylint: disable=unused-import import six +import psutil # type: ignore import threaded from exec_helpers import api @@ -44,6 +46,39 @@ devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+ +# Adopt from: +# https://stackoverflow.com/questions/1230669/subprocess-deleting-child-processes-in-windows +def kill_proc_tree(pid, including_parent=True): # type: (int, bool) -> None # pragma: no cover + """Kill process tree. + + :param pid: PID of parent process to kill + :type pid: int + :param including_parent: kill also parent process + :type including_parent: bool + """ + parent = psutil.Process(pid) + children = parent.children(recursive=True) + for child in children: # type: psutil.Process + child.kill() + _, alive = psutil.wait_procs(children, timeout=5) + for proc in alive: # type: psutil.Process + proc.kill() # 2nd shot + if including_parent: + parent.kill() + parent.wait(5) + + +# Subprocess extra arguments. +# Flags from: +# https://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated +kw = {} # type: typing.Dict[str, typing.Any] +if "Windows" == platform.system(): # pragma: no cover + kw["creationflags"] = 0x00000200 +else: # pragma: no cover + kw["preexec_fn"] = os.setsid + + +# noinspection PyTypeHints class SubprocessExecuteAsyncResult(api.ExecuteAsyncResult): """Override original NamedTuple with proper typing.""" @@ -70,19 +105,6 @@ def __call__(cls, *args, **kwargs): # type: (SingletonMeta, typing.Any, typing. cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs) return cls._instances[cls] - @classmethod - def __prepare__( - mcs, - name, # type: str - bases, # type: typing.Iterable[typing.Type] - **kwargs # type: typing.Any - ): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument - """Metaclass magic for object storage. - - .. versionadded:: 1.2.0 - """ - return collections.OrderedDict() # pragma: no cover - class Subprocess(six.with_metaclass(SingletonMeta, api.ExecHelper)): """Subprocess helper with timeouts and lock-free FIFO.""" @@ -138,17 +160,24 @@ def _exec_command( .. versionadded:: 1.2.0 """ - @threaded.threadpooled # type: ignore + @threaded.threadpooled def poll_stdout(): # type: () -> None """Sync stdout poll.""" result.read_stdout(src=stdout, log=logger, verbose=verbose) interface.wait() # wait for the end of execution - @threaded.threadpooled # type: ignore + @threaded.threadpooled def poll_stderr(): # type: () -> None """Sync stderr poll.""" result.read_stderr(src=stderr, log=logger, verbose=verbose) + def close_streams(): # type: () -> None + """Enforce FIFO closure.""" + if stdout is not None and not stdout.closed: + stdout.close() + if stderr is not None and not stderr.closed: + stderr.close() + # Store command with hidden data cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re) @@ -167,11 +196,13 @@ def poll_stderr(): # type: () -> None # Process closed? if exit_code is not None: result.exit_code = exit_code + close_streams() return result # Kill not ended process and wait for close try: + kill_proc_tree(interface.pid, including_parent=False) # kill -9 for all subprocesses interface.kill() # kill -9 - concurrent.futures.wait([stdout_future, stderr_future], timeout=5) + concurrent.futures.wait([stdout_future, stderr_future], timeout=0.5) # Force stop cycle if no exit code after kill stdout_future.cancel() stderr_future.cancel() @@ -182,6 +213,8 @@ def poll_stderr(): # type: () -> None result.exit_code = exit_code return result raise # Some other error + finally: + close_streams() wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout) logger.debug(wait_err_msg) @@ -244,6 +277,7 @@ def execute_async( cwd=kwargs.get("cwd", None), env=kwargs.get("env", None), universal_newlines=False, + **kw ) if stdin is None: @@ -264,6 +298,7 @@ def execute_async( elif exc.errno in (errno.EPIPE, errno.ESHUTDOWN): # pragma: no cover self.logger.warning("STDIN Send failed: broken PIPE") else: + kill_proc_tree(process.pid) process.kill() raise try: diff --git a/requirements.txt b/requirements.txt index 746aefe..3f2b2bd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ advanced-descriptors>=1.0 # Apache-2.0 typing >= 3.6 # PSF futures>=3.1; python_version == "2.7" enum34>=1.1; python_version == "2.7" +psutil >= 5.0 diff --git a/test/test_subprocess_runner.py b/test/test_subprocess_runner.py index 144c58d..f3a3c44 100644 --- a/test/test_subprocess_runner.py +++ b/test/test_subprocess_runner.py @@ -22,6 +22,8 @@ import errno import logging +import os +import platform import subprocess import unittest @@ -40,6 +42,7 @@ class FakeFileStream(object): def __init__(self, *args): self.__src = list(args) + self.closed = False def __iter__(self): for _ in range(len(self.__src)): @@ -48,9 +51,23 @@ def __iter__(self): def fileno(self): return hash(tuple(self.__src)) + def close(self): + self.closed = True -@mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) -@mock.patch('subprocess.Popen', autospec=True, name='subprocess.Popen') + +# Subprocess extra arguments. +# Flags from: +# https://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated +kw = {} +if 'Windows' == platform.system(): + kw['creationflags'] = 0x00000200 +else: + kw["preexec_fn"] = os.setsid + + +@mock.patch("psutil.Process", autospec=True) +@mock.patch("exec_helpers.subprocess_runner.logger", autospec=True) +@mock.patch("subprocess.Popen", autospec=True, name="subprocess.Popen") class TestSubprocessRunner(unittest.TestCase): def setUp(self): subprocess_runner.SingletonMeta._instances.clear() @@ -108,7 +125,8 @@ def gen_cmd_result_log_message(result): def test_001_call( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): # type: (...) -> None popen_obj, exp_result = self.prepare_close(popen) @@ -127,6 +145,7 @@ def test_001_call( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -145,7 +164,8 @@ def test_001_call( def test_002_call_verbose( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): # type: (...) -> None popen_obj, _ = self.prepare_close(popen) @@ -166,7 +186,7 @@ def test_002_call_verbose( def test_003_context_manager( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None popen_obj, exp_result = self.prepare_close(popen) @@ -193,7 +213,7 @@ def test_004_execute_timeout_fail( self, _, # type: mock.MagicMock popen, # type: mock.MagicMock - __ # type: mock.MagicMock + *args # type: mock.MagicMock ): popen_obj, exp_result = self.prepare_close(popen) popen_obj.poll.return_value = None @@ -222,13 +242,15 @@ def test_004_execute_timeout_fail( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) def test_005_execute_no_stdout( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): popen_obj, exp_result = self.prepare_close(popen, open_stdout=False) @@ -247,6 +269,7 @@ def test_005_execute_no_stdout( stdin=subprocess.PIPE, stdout=subprocess_runner.devnull, universal_newlines=False, + **kw ), )) logger.assert_has_calls( @@ -269,7 +292,8 @@ def test_005_execute_no_stdout( def test_006_execute_no_stderr( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): popen_obj, exp_result = self.prepare_close(popen, open_stderr=False) @@ -288,6 +312,7 @@ def test_006_execute_no_stderr( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) logger.assert_has_calls( @@ -311,7 +336,8 @@ def test_006_execute_no_stderr( def test_007_execute_no_stdout_stderr( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): popen_obj, exp_result = self.prepare_close( popen, @@ -334,6 +360,7 @@ def test_007_execute_no_stdout_stderr( stdin=subprocess.PIPE, stdout=subprocess_runner.devnull, universal_newlines=False, + **kw ), )) logger.assert_has_calls( @@ -351,7 +378,8 @@ def test_007_execute_no_stdout_stderr( def test_008_execute_mask_global( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): cmd = "USE='secret=secret_pass' do task" log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" @@ -381,6 +409,7 @@ def test_008_execute_mask_global( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -400,7 +429,8 @@ def test_008_execute_mask_global( def test_009_execute_mask_local( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): cmd = "USE='secret=secret_pass' do task" log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" @@ -428,6 +458,7 @@ def test_009_execute_mask_local( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) self.assertEqual( @@ -445,7 +476,7 @@ def test_009_execute_mask_local( def test_004_check_stdin_str( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None stdin = u'this is a line' @@ -470,6 +501,7 @@ def test_004_check_stdin_str( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -481,7 +513,7 @@ def test_004_check_stdin_str( def test_005_check_stdin_bytes( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None stdin = b'this is a line' @@ -506,6 +538,7 @@ def test_005_check_stdin_bytes( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -517,7 +550,7 @@ def test_005_check_stdin_bytes( def test_006_check_stdin_bytearray( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None stdin = bytearray(b'this is a line') @@ -542,6 +575,7 @@ def test_006_check_stdin_bytearray( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -553,7 +587,8 @@ def test_006_check_stdin_bytearray( def test_008_check_stdin_fail_closed_win( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): # type: (...) -> None stdin = b'this is a line' @@ -581,6 +616,7 @@ def test_008_check_stdin_fail_closed_win( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -593,7 +629,7 @@ def test_008_check_stdin_fail_closed_win( def test_009_check_stdin_fail_write( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None stdin = b'this is a line' @@ -615,7 +651,8 @@ def test_009_check_stdin_fail_write( def test_011_check_stdin_fail_close_pipe_win( self, popen, # type: mock.MagicMock - logger # type: mock.MagicMock + logger, # type: mock.MagicMock + *args ): # type: (...) -> None stdin = b'this is a line' @@ -643,6 +680,7 @@ def test_011_check_stdin_fail_close_pipe_win( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -655,7 +693,7 @@ def test_011_check_stdin_fail_close_pipe_win( def test_012_check_stdin_fail_close( self, popen, # type: mock.MagicMock - _ # type: mock.MagicMock + *args # type: mock.MagicMock ): # type: (...) -> None stdin = b'this is a line' @@ -679,7 +717,7 @@ def test_013_execute_timeout_done( self, _, # type: mock.MagicMock popen, # type: mock.MagicMock - __ # type: mock.MagicMock + *args # type: mock.MagicMock ): popen_obj, exp_result = self.prepare_close(popen, ec=exec_helpers.ExitCodes.EX_INVALID) @@ -704,6 +742,7 @@ def test_013_execute_timeout_done( stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, + **kw ), )) @@ -711,7 +750,7 @@ def test_013_execute_timeout_done( @mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) @mock.patch('exec_helpers.subprocess_runner.Subprocess.execute') class TestSubprocessRunnerHelpers(unittest.TestCase): - def test_001_check_call(self, execute, logger): + def test_001_check_call(self, execute, *args): exit_code = 0 return_value = exec_helpers.ExecResult( cmd=command, @@ -745,7 +784,7 @@ def test_001_check_call(self, execute, logger): runner.check_call(command=command, verbose=verbose, timeout=None) execute.assert_called_once_with(command, verbose, None) - def test_002_check_call_expected(self, execute, logger): + def test_002_check_call_expected(self, execute, *args): exit_code = 0 return_value = exec_helpers.ExecResult( cmd=command,