From 0c48746eb00f42cb22bb5bc07718a8fd732d9f07 Mon Sep 17 00:00:00 2001 From: Alexey Stepanov Date: Fri, 30 Mar 2018 13:26:39 +0200 Subject: [PATCH] Return nonblocking mode for subprocess pipe polling Use windows specific calls on windows to unblock pipe Checked: linux & Windows 10x64 --- exec_helpers/_ssh_client_base.py | 10 ++++-- exec_helpers/subprocess_runner.py | 53 +++++++++++++++++++++++++++++-- test/test_subprocess_runner.py | 17 ++++++---- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index da7178b..a8c22d4 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -134,7 +134,10 @@ def __call__( logger.debug('Reconnect {}'.format(ssh)) ssh.reconnect() return ssh - if CPYTHON and sys.getrefcount(cls.__cache[key]) == 2: + if ( + CPYTHON and + sys.getrefcount(cls.__cache[key]) == 2 + ): # pragma: no cover # If we have only cache reference and temporary getrefcount # reference: close connection before deletion logger.debug('Closing {} as unused'.format(cls.__cache[key])) @@ -161,7 +164,10 @@ def clear_cache(mcs): # type: () -> None # PY3: cache, ssh, temporary # PY4: cache, values mapping, ssh, temporary for ssh in mcs.__cache.values(): - if CPYTHON and sys.getrefcount(ssh) == n_count: + if ( + CPYTHON and + sys.getrefcount(ssh) == n_count + ): # pragma: no cover logger.debug('Closing {} as unused'.format(ssh)) ssh.close() mcs.__cache = {} diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index 15e21f5..0d32860 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -40,9 +40,19 @@ devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+ _win = sys.platform == "win32" +_posix = 'posix' in sys.builtin_module_names _type_exit_codes = typing.Union[int, proc_enums.ExitCodes] _type_expected = typing.Optional[typing.Iterable[_type_exit_codes]] +if _posix: # pragma: no cover + import fcntl # pylint: disable=import-error + +elif _win: # pragma: no cover + import msvcrt # pylint: disable=import-error + import ctypes + from ctypes import wintypes # pylint: disable=import-error + from ctypes import windll # pylint: disable=import-error + class SingletonMeta(type): """Metaclass for Singleton. @@ -51,12 +61,15 @@ class SingletonMeta(type): """ _instances = {} + _lock = threading.RLock() def __call__(cls, *args, **kwargs): """Singleton.""" - if cls not in cls._instances: - cls._instances[cls] = super( - SingletonMeta, cls).__call__(*args, **kwargs) + with cls._lock: + if cls not in cls._instances: + cls._instances[cls] = super( + SingletonMeta, cls + ).__call__(*args, **kwargs) return cls._instances[cls] @@ -78,6 +91,35 @@ def _py2_str(src): # pragma: no cover ) +def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None + """Set PIPE unblocked to allow polling of all pipes in parallel.""" + descriptor = pipe.fileno() + + if _posix: # pragma: no cover + # Get flags + flags = fcntl.fcntl(descriptor, fcntl.F_GETFL) + + # Set nonblock mode + fcntl.fcntl(descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + elif _win: # pragma: no cover + SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState + SetNamedPipeHandleState.argtypes = [ + wintypes.HANDLE, + wintypes.LPDWORD, + wintypes.LPDWORD, + wintypes.LPDWORD + ] + SetNamedPipeHandleState.restype = wintypes.BOOL + PIPE_NOWAIT = wintypes.DWORD(0x00000001) + handle = msvcrt.get_osfhandle(descriptor) + + windll.kernel32.SetNamedPipeHandleState( + handle, + ctypes.byref(PIPE_NOWAIT), None, None + ) + + class Subprocess(BaseSingleton): """Subprocess helper with timeouts and lock-free FIFO.""" @@ -227,6 +269,11 @@ def poll_pipes( ) # Poll output + + if open_stdout: + set_nonblocking_pipe(self.__process.stdout) + if open_stderr: + set_nonblocking_pipe(self.__process.stderr) # pylint: disable=assignment-from-no-return poll_thread = poll_pipes( result, diff --git a/test/test_subprocess_runner.py b/test/test_subprocess_runner.py index 3308a22..483f9fe 100644 --- a/test/test_subprocess_runner.py +++ b/test/test_subprocess_runner.py @@ -51,6 +51,9 @@ def fileno(self): @mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) @mock.patch('select.select', autospec=True) +@mock.patch( + 'exec_helpers.subprocess_runner.set_nonblocking_pipe', autospec=True +) @mock.patch('subprocess.Popen', autospec=True, name='subprocess.Popen') class TestSubprocessRunner(unittest.TestCase): @staticmethod @@ -100,7 +103,7 @@ def gen_cmd_result_log_message(result): return ("Command exit code '{code!s}':\n{cmd!s}\n" .format(cmd=result.cmd.rstrip(), code=result.exit_code)) - def test_call(self, popen, select, logger): + def test_call(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -147,7 +150,7 @@ def test_call(self, popen, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_call_verbose(self, popen, select, logger): + def test_call_verbose(self, popen, _, select, logger): popen_obj, _ = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -175,7 +178,7 @@ def test_call_verbose(self, popen, select, logger): msg=self.gen_cmd_result_log_message(result)), ]) - def test_context_manager(self, popen, select, logger): + def test_context_manager(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -200,7 +203,7 @@ def test_context_manager(self, popen, select, logger): def test_execute_timeout_fail( self, sleep, - popen, select, logger + popen, _, select, logger ): popen_obj, exp_result = self.prepare_close(popen) popen_obj.configure_mock(returncode=None) @@ -227,7 +230,7 @@ def test_execute_timeout_fail( ), )) - def test_execute_no_stdout(self, popen, select, logger): + def test_execute_no_stdout(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen, open_stdout=False) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -265,7 +268,7 @@ def test_execute_no_stdout(self, popen, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_no_stderr(self, popen, select, logger): + def test_execute_no_stderr(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen, open_stderr=False) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -304,7 +307,7 @@ def test_execute_no_stderr(self, popen, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_no_stdout_stderr(self, popen, select, logger): + def test_execute_no_stdout_stderr(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close( popen, open_stdout=False,