diff --git a/.travis.yml b/.travis.yml index 9c8a2e4..c11c126 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,8 @@ python: - 3.4 - 3.5 - &mainstream_python 3.6 -- pypy2.7-5.8.0 -- pypy3.5-5.8.0 +- pypy +- pypy3.5 install: - &upgrade_python_toolset pip install --upgrade pip setuptools wheel - pip install tox-travis diff --git a/README.rst b/README.rst index 2af10a1..65aac1d 100644 --- a/README.rst +++ b/README.rst @@ -153,6 +153,8 @@ This methods are almost the same for `SSHCleint` and `Subprocess`, except specif raise_on_err=True, # type: bool ) +If no STDOUT or STDERR required, it is possible to disable this FIFO pipes via `**kwargs` with flags `open_stdout=False` and `open_stderr=False`. + The next command level uses lower level and kwargs are forwarded, so expected exit codes are forwarded from `check_stderr`. Implementation specific flags are always set via kwargs. diff --git a/doc/source/ExecResult.rst b/doc/source/ExecResult.rst index bdc517e..d0bad19 100644 --- a/doc/source/ExecResult.rst +++ b/doc/source/ExecResult.rst @@ -92,24 +92,28 @@ API: ExecResult ``typing.Any`` YAML from stdout. - .. py:method:: read_stdout(src, log=None, verbose=False) + .. py:method:: read_stdout(src=None, log=None, verbose=False) Read stdout file-like object to stdout. :param src: source - :type src: ``typing.Iterable`` + :type src: ``typing.Optional[typing.Iterable]`` :param log: logger :type log: ``typing.Optional[logging.Logger]`` :param verbose: use log.info instead of log.debug :type verbose: ``bool`` - .. py:method:: read_stderr(src, log=None, verbose=False) + .. versionchanged:: 1.2.0 - src can be None + + .. py:method:: read_stderr(src=None, log=None, verbose=False) Read stderr file-like object to stderr. :param src: source - :type src: ``typing.Iterable`` + :type src: ``typing.Optional[typing.Iterable]`` :param log: logger :type log: ``typing.Optional[logging.Logger]`` :param verbose: use log.info instead of log.debug :type verbose: ``bool`` + + .. versionchanged:: 1.2.0 - src can be None diff --git a/doc/source/SSHClient.rst b/doc/source/SSHClient.rst index 9681da0..9f35c9b 100644 --- a/doc/source/SSHClient.rst +++ b/doc/source/SSHClient.rst @@ -95,7 +95,7 @@ API: SSHClient and SSHAuth. :param enforce: Enforce sudo enabled or disabled. By default: None :type enforce: ``typing.Optional[bool]`` - .. py:method:: execute_async(command, get_pty=False, **kwargs) + .. py:method:: execute_async(command, get_pty=False, open_stdout=True, open_stderr=True, **kwargs) Execute command in async mode and return channel with IO objects. @@ -103,8 +103,14 @@ API: SSHClient and SSHAuth. :type command: ``str`` :param get_pty: open PTY on remote machine :type get_pty: ``bool`` + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool :rtype: ``typing.Tuple[paramiko.Channel, paramiko.ChannelFile, paramiko.ChannelFile, paramiko.ChannelFile]`` + .. versionchanged:: 1.2.0 - open_stdout and open_stderr flags + .. py:method:: execute(command, verbose=False, timeout=None, **kwargs) Execute command and wait for return code. diff --git a/doc/source/Subprocess.rst b/doc/source/Subprocess.rst index d2ef881..cf09b98 100644 --- a/doc/source/Subprocess.rst +++ b/doc/source/Subprocess.rst @@ -38,6 +38,7 @@ API: Subprocess :raises: ExecHelperTimeoutError .. versionchanged:: 1.1.0 - make method + .. versionchanged:: 1.2.0 - open_stdout and open_stderr flags .. py:method:: check_call(command, verbose=False, timeout=None, error_info=None, expected=None, raise_on_err=True, **kwargs) diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 32aff61..da7178b 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -507,6 +507,8 @@ def execute_async( self, command, # type: str get_pty=False, # type: bool + open_stdout=True, # type: bool + open_stderr=True, # type: bool **kwargs ): # type: (...) -> _type_execute_async """Execute command in async mode and return channel with IO objects. @@ -515,12 +517,18 @@ def execute_async( :type command: str :param get_pty: open PTY on remote machine :type get_pty: bool + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool :rtype: typing.Tuple[ paramiko.Channel, paramiko.ChannelFile, - paramiko.ChannelFile, - paramiko.ChannelFile, + typing.Optional[paramiko.ChannelFile], + typing.Optional[paramiko.ChannelFile], ] + + .. versionchanged:: 1.2.0 - open_stdout and open_stderr flags """ message = _log_templates.CMD_EXEC.format(cmd=command.rstrip()) self.logger.debug(message) @@ -536,8 +544,8 @@ def execute_async( ) stdin = chan.makefile('wb') - stdout = chan.makefile('rb') - stderr = chan.makefile_stderr('rb') + stdout = chan.makefile('rb') if open_stdout else None + stderr = chan.makefile_stderr('rb') if open_stderr else None cmd = "{command}\n".format(command=command) if self.sudo_mode: encoded_cmd = base64.b64encode(cmd.encode('utf-8')).decode('utf-8') @@ -581,13 +589,13 @@ def poll_streams( stderr, # type: paramiko.channel.ChannelFile ): """Poll FIFO buffers if data available.""" - if channel.recv_ready(): + if stdout and channel.recv_ready(): result.read_stdout( src=stdout, log=self.logger, verbose=verbose ) - if channel.recv_stderr_ready(): + if stderr and channel.recv_stderr_ready(): result.read_stderr( src=stderr, log=self.logger, @@ -612,12 +620,13 @@ def poll_pipes( """ while not stop.isSet(): time.sleep(0.1) - poll_streams( - result=result, - channel=channel, - stdout=stdout, - stderr=stderr, - ) + if stdout or stderr: + poll_streams( + result=result, + channel=channel, + stdout=stdout, + stderr=stderr, + ) if channel.status_event.is_set(): result.read_stdout( diff --git a/exec_helpers/exec_result.py b/exec_helpers/exec_result.py index 6e308ad..4ba7f0c 100644 --- a/exec_helpers/exec_result.py +++ b/exec_helpers/exec_result.py @@ -180,28 +180,32 @@ def __poll_stream( def read_stdout( self, - src, # type: typing.Iterable + src=None, # type: typing.Optional[typing.Iterable] log=None, # type: typing.Optional[logging.Logger] verbose=False # type: bool ): """Read stdout file-like object to stdout. :param src: source - :type src: typing.Iterable + :type src: typing.Optional[typing.Iterable] :param log: logger :type log: typing.Optional[logging.Logger] :param verbose: use log.info instead of log.debug :type verbose: bool + + .. versionchanged:: 1.2.0 - src can be None """ if self.timestamp: raise RuntimeError('Final exit code received.') + if not src: + return with self.lock: self.__stdout_str = self.__stdout_brief = None self.__stdout += tuple(self.__poll_stream(src, log, verbose)) def read_stderr( self, - src, # type: typing.Iterable + src=None, # type: typing.Optional[typing.Iterable] log=None, # type: typing.Optional[logging.Logger] verbose=False # type: bool ): @@ -213,9 +217,13 @@ def read_stderr( :type log: typing.Optional[logging.Logger] :param verbose: use log.info instead of log.debug :type verbose: bool + + .. versionchanged:: 1.2.0 - src can be None """ if self.timestamp: raise RuntimeError('Final exit code received.') + if not src: + return with self.lock: self.__stderr_str = self.__stderr_brief = None self.__stderr += tuple(self.__poll_stream(src, log, verbose)) diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index f472246..15e21f5 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -20,6 +20,7 @@ from __future__ import unicode_literals import logging +import os import select import sys import subprocess # nosec # Expected usage @@ -36,6 +37,7 @@ from exec_helpers import proc_enums logger = logging.getLogger(__name__) +devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+ _win = sys.platform == "win32" _type_exit_codes = typing.Union[int, proc_enums.ExitCodes] @@ -122,7 +124,9 @@ def __exec_command( cwd=None, # type: typing.Optional[str] env=None, # type: typing.Optional[typing.Dict[str, typing.Any]] timeout=None, # type: typing.Optional[int] - verbose=False # type: bool + verbose=False, # type: bool + open_stdout=True, # type: bool + open_stderr=True, # type: bool ): """Command executor helper. @@ -130,7 +134,13 @@ def __exec_command( :type cwd: str :type env: dict :type timeout: int + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool :rtype: ExecResult + + .. versionchanged:: 1.2.0 - open_stdout and open_stderr flags """ def poll_streams( result, # type: exec_result.ExecResult @@ -142,9 +152,9 @@ def poll_streams( # select.select is not supported on windows result.read_stdout(src=stdout, log=logger, verbose=verbose) result.read_stderr(src=stderr, log=logger, verbose=verbose) - else: + else: # pragma: no cover rlist, _, _ = select.select( - [stdout, stderr], + [item for item in (stdout, stderr) if item is not None], [], []) if rlist: @@ -173,11 +183,12 @@ def poll_pipes( """ while not stop.isSet(): time.sleep(0.1) - poll_streams( - result=result, - stdout=self.__process.stdout, - stderr=self.__process.stderr, - ) + if open_stdout or open_stderr: + poll_streams( + result=result, + stdout=self.__process.stdout, + stderr=self.__process.stderr, + ) self.__process.poll() @@ -209,8 +220,8 @@ def poll_pipes( self.__process = subprocess.Popen( args=[command], stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=subprocess.PIPE if open_stdout else devnull, + stderr=subprocess.PIPE if open_stderr else devnull, shell=True, cwd=cwd, env=env, universal_newlines=False, ) @@ -234,6 +245,8 @@ def poll_pipes( try: self.__process.kill() # kill -9 stop_event.wait(5) + # Force stop cycle if no exit code after kill + stop_event.set() poll_thread.join(5) except OSError: # Nothing to kill diff --git a/test/test_ssh_client.py b/test/test_ssh_client.py index 0372621..57bc104 100644 --- a/test/test_ssh_client.py +++ b/test/test_ssh_client.py @@ -155,6 +155,57 @@ def test_execute_async_pty(self, client, policy, logger): log.mock_calls ) + def test_execute_async_no_stdout_stderr(self, client, policy, logger): + chan = mock.Mock() + open_session = mock.Mock(return_value=chan) + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async( + command=command, + open_stdout=False + ) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{}\n'.format(command)) + )) + + chan.reset_mock() + result = ssh.execute_async( + command=command, + open_stderr=False + ) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.exec_command('{}\n'.format(command)) + )) + + chan.reset_mock() + result = ssh.execute_async( + command=command, + open_stdout=False, + open_stderr=False + ) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.exec_command('{}\n'.format(command)) + )) + def test_execute_async_sudo(self, client, policy, logger): chan = mock.Mock() open_session = mock.Mock(return_value=chan) @@ -334,7 +385,12 @@ def test_execute_async_sudo_password( ) @staticmethod - def get_patched_execute_async_retval(ec=0, stderr_val=None): + def get_patched_execute_async_retval( + ec=0, + stderr_val=None, + open_stdout=True, + open_stderr=True, + ): """get patched execute_async retval :rtype: @@ -345,20 +401,23 @@ def get_patched_execute_async_retval(ec=0, stderr_val=None): FakeStream, FakeStream) """ - out = stdout_list - err = stderr_list if stderr_val is None else [] - - stdout = FakeStream(*out) - stderr = FakeStream(*err) + if open_stdout: + out = stdout_list + stdout = FakeStream(*out) + else: + stdout = out = None + if open_stderr: + err = stderr_list if stderr_val is None else [] + stderr = FakeStream(*err) + else: + stderr = err = None exit_code = ec chan = mock.Mock() - recv_exit_status = mock.Mock(return_value=exit_code) - chan.attach_mock(recv_exit_status, 'recv_exit_status') + chan.attach_mock(mock.Mock(return_value=exit_code), 'recv_exit_status') - wait = mock.Mock() status_event = mock.Mock() - status_event.attach_mock(wait, 'wait') + status_event.attach_mock(mock.Mock(), 'wait') chan.attach_mock(status_event, 'status_event') chan.configure_mock(exit_status=exit_code) @@ -372,12 +431,12 @@ def get_patched_execute_async_retval(ec=0, stderr_val=None): return chan, '', exp_result, stderr, stdout - @mock.patch( - 'exec_helpers.ssh_client.SSHClient.execute_async') + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') def test_execute( - self, - execute_async, - client, policy, logger): + self, + execute_async, + client, policy, logger + ): ( chan, _stdin, exp_result, stderr, stdout ) = self.get_patched_execute_async_retval() @@ -468,6 +527,143 @@ def test_execute_verbose( ] ) + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') + def test_execute_no_stdout( + self, + execute_async, + client, policy, logger + ): + ( + chan, _stdin, exp_result, stderr, stdout + ) = self.get_patched_execute_async_retval(open_stdout=False) + chan.status_event.attach_mock(mock.Mock(return_value=True), 'is_set') + + execute_async.return_value = chan, _stdin, stderr, stdout + + ssh = self.get_ssh() + + logger.reset_mock() + + # noinspection PyTypeChecker + result = ssh.execute( + command=command, + verbose=False, + open_stdout=False + ) + + self.assertEqual( + result, + exp_result + ) + execute_async.assert_called_once_with(command, open_stdout=False) + message = self.gen_cmd_result_log_message(result) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log + log.assert_has_calls( + [ + mock.call(level=logging.DEBUG, msg=command_log), + ] + [ + mock.call( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stderr_list + ] + [ + mock.call(level=logging.DEBUG, msg=message), + ] + ) + + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') + def test_execute_no_stderr( + self, + execute_async, + client, policy, logger + ): + ( + chan, _stdin, exp_result, stderr, stdout + ) = self.get_patched_execute_async_retval(open_stderr=False) + chan.status_event.attach_mock(mock.Mock(return_value=True), 'is_set') + + execute_async.return_value = chan, _stdin, stderr, stdout + + ssh = self.get_ssh() + + logger.reset_mock() + + # noinspection PyTypeChecker + result = ssh.execute( + command=command, + verbose=False, + open_stderr=False + ) + + self.assertEqual( + result, + exp_result + ) + execute_async.assert_called_once_with(command, open_stderr=False) + message = self.gen_cmd_result_log_message(result) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log + log.assert_has_calls( + [ + mock.call(level=logging.DEBUG, msg=command_log), + ] + [ + mock.call( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stdout_list + ] + [ + mock.call(level=logging.DEBUG, msg=message), + ] + ) + + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') + def test_execute_no_stdout_stderr( + self, + execute_async, + client, policy, logger + ): + ( + chan, _stdin, exp_result, stderr, stdout + ) = self.get_patched_execute_async_retval( + open_stdout=False, + open_stderr=False + ) + chan.status_event.attach_mock(mock.Mock(return_value=True), 'is_set') + + execute_async.return_value = chan, _stdin, stderr, stdout + + ssh = self.get_ssh() + + logger.reset_mock() + + # noinspection PyTypeChecker + result = ssh.execute( + command=command, + verbose=False, + open_stdout=False, + open_stderr=False, + ) + + self.assertEqual( + result, + exp_result + ) + execute_async.assert_called_once_with( + command, + open_stdout=False, + open_stderr=False + ) + message = self.gen_cmd_result_log_message(result) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log + log.assert_has_calls( + [ + mock.call(level=logging.DEBUG, msg=command_log), + ] + [ + mock.call(level=logging.DEBUG, msg=message), + ] + ) + @mock.patch('time.sleep', autospec=True) @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') def test_execute_timeout( @@ -502,10 +698,11 @@ def test_execute_timeout( log.mock_calls ) + @mock.patch('time.sleep', autospec=True) @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') def test_execute_timeout_fail( self, - execute_async, + execute_async, sleep, client, policy, logger): ( chan, _stdin, _, stderr, stdout diff --git a/test/test_subprocess_runner.py b/test/test_subprocess_runner.py index a4e6bb3..3308a22 100644 --- a/test/test_subprocess_runner.py +++ b/test/test_subprocess_runner.py @@ -54,16 +54,33 @@ def fileno(self): @mock.patch('subprocess.Popen', autospec=True, name='subprocess.Popen') class TestSubprocessRunner(unittest.TestCase): @staticmethod - def prepare_close(popen, stderr_val=None, ec=0): - stdout_lines = stdout_list - stderr_lines = stderr_list if stderr_val is None else [] - - stdout = FakeFileStream(*stdout_lines) - stderr = FakeFileStream(*stderr_lines) + def prepare_close( + popen, + stderr_val=None, + ec=0, + open_stdout=True, + open_stderr=True, + ): + if open_stdout: + stdout_lines = stdout_list + stdout = FakeFileStream(*stdout_lines) + else: + stdout = stdout_lines = None + if open_stderr: + stderr_lines = stderr_list if stderr_val is None else [] + stderr = FakeFileStream(*stderr_lines) + else: + stderr = stderr_lines = None popen_obj = mock.Mock() - popen_obj.attach_mock(stdout, 'stdout') - popen_obj.attach_mock(stderr, 'stderr') + if stdout: + popen_obj.attach_mock(stdout, 'stdout') + else: + popen_obj.configure_mock(stdout=None) + if stderr: + popen_obj.attach_mock(stderr, 'stderr') + else: + popen_obj.configure_mock(stderr=None) popen_obj.configure_mock(returncode=ec) popen.return_value = popen_obj @@ -179,6 +196,151 @@ def test_context_manager(self, popen, select, logger): subprocess_runner.SingletonMeta._instances.clear() + @mock.patch('time.sleep', autospec=True) + def test_execute_timeout_fail( + self, + sleep, + popen, select, logger + ): + popen_obj, exp_result = self.prepare_close(popen) + popen_obj.configure_mock(returncode=None) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + + with self.assertRaises(exec_helpers.ExecHelperTimeoutError): + # noinspection PyTypeChecker + runner.execute(command, timeout=1) + + popen.assert_has_calls(( + mock.call( + args=[command], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + def test_execute_no_stdout(self, popen, select, logger): + popen_obj, exp_result = self.prepare_close(popen, open_stdout=False) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(command, open_stdout=False) + self.assertEqual(result, exp_result) + popen.assert_has_calls(( + mock.call( + args=[command], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess_runner.devnull, + universal_newlines=False, + ), + )) + logger.assert_has_calls( + [ + mock.call.debug(command_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8'))) + for x in stderr_list + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=self.gen_cmd_result_log_message(result)), + ]) + self.assertIn( + mock.call.poll(), popen_obj.mock_calls + ) + + def test_execute_no_stderr(self, popen, select, logger): + popen_obj, exp_result = self.prepare_close(popen, open_stderr=False) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(command, open_stderr=False) + self.assertEqual(result, exp_result) + popen.assert_has_calls(( + mock.call( + args=[command], + cwd=None, + env=None, + shell=True, + stderr=subprocess_runner.devnull, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + logger.assert_has_calls( + [ + mock.call.debug(command_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stdout_list + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=self.gen_cmd_result_log_message(result)), + ]) + self.assertIn( + mock.call.poll(), popen_obj.mock_calls + ) + + def test_execute_no_stdout_stderr(self, popen, select, logger): + popen_obj, exp_result = self.prepare_close( + popen, + open_stdout=False, + open_stderr=False + ) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(command, open_stdout=False, open_stderr=False) + self.assertEqual(result, exp_result) + popen.assert_has_calls(( + mock.call( + args=[command], + cwd=None, + env=None, + shell=True, + stderr=subprocess_runner.devnull, + stdin=subprocess.PIPE, + stdout=subprocess_runner.devnull, + universal_newlines=False, + ), + )) + logger.assert_has_calls( + [ + mock.call.debug(command_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=self.gen_cmd_result_log_message(result)), + ]) + self.assertIn( + mock.call.poll(), popen_obj.mock_calls + ) + @mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) class TestSubprocessRunnerHelpers(unittest.TestCase): diff --git a/tox.ini b/tox.ini index ccaac71..235ecd3 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ deps = commands = py.test -vv --junitxml=unit_result.xml --html=report.html --cov-config .coveragerc --cov-report html --cov=exec_helpers {posargs:test} - coverage report --fail-under 89 + coverage report --fail-under 95 [testenv:py27-nocov] usedevelop = False