diff --git a/.editorconfig b/.editorconfig index afdae29..c13093d 100644 --- a/.editorconfig +++ b/.editorconfig @@ -9,7 +9,7 @@ insert_final_newline = true trim_trailing_whitespace = true [*.{py,ini}] -max_line_length = 79 +max_line_length = 120 [*.{yml,rst}] indent_size = 2 diff --git a/.pylintrc b/.pylintrc index 8bf1f73..9e6d02f 100644 --- a/.pylintrc +++ b/.pylintrc @@ -273,7 +273,7 @@ logging-modules=logging [FORMAT] # Maximum number of characters on a single line. -max-line-length=80 +max-line-length=120 # Regexp for a line that is allowed to be longer than the limit. ignore-long-lines=^\s*(# )??$ diff --git a/doc/source/SSHClient.rst b/doc/source/SSHClient.rst index 28f030d..61d8834 100644 --- a/doc/source/SSHClient.rst +++ b/doc/source/SSHClient.rst @@ -101,24 +101,28 @@ API: SSHClient and SSHAuth. :param enforce: Enforce sudo enabled or disabled. By default: None :type enforce: ``typing.Optional[bool]`` - .. py:method:: execute_async(command, get_pty=False, open_stdout=True, open_stderr=True, stdin=None, **kwargs) + .. py:method:: execute_async(command, stdin=None, open_stdout=True, open_stderr=True, verbose=False, log_mask_re=None, **kwargs) Execute command in async mode and return channel with IO objects. :param command: Command for execution :type command: ``str`` - :param get_pty: open PTY on remote machine - :type get_pty: ``bool`` :param stdin: pass STDIN text to the process - :type stdin: ``typing.Union[six.text_type, six.binary_type, None]`` + :type stdin: ``typing.Union[six.text_type, six.binary_type, bytearray, None]`` :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool + :param verbose: produce verbose log record on command call + :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] :rtype: ``typing.Tuple[paramiko.Channel, paramiko.ChannelFile, paramiko.ChannelFile, paramiko.ChannelFile]`` .. versionchanged:: 1.2.0 open_stdout and open_stderr flags .. versionchanged:: 1.2.0 stdin data + .. versionchanged:: 1.2.0 get_pty moved to `**kwargs` .. py:method:: execute(command, verbose=False, timeout=1*60*60, **kwargs) diff --git a/doc/source/Subprocess.rst b/doc/source/Subprocess.rst index 9aed138..eb0116b 100644 --- a/doc/source/Subprocess.rst +++ b/doc/source/Subprocess.rst @@ -39,6 +39,27 @@ API: Subprocess .. versionchanged:: 1.1.0 release lock on exit + .. py:method:: execute_async(command, stdin=None, open_stdout=True, open_stderr=True, verbose=False, log_mask_re=None, **kwargs) + + Execute command in async mode and return Popen with IO objects. + + :param command: Command for execution + :type command: str + :param stdin: pass STDIN text to the process + :type stdin: typing.Union[six.text_type, six.binary_type, bytearray, None] + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool + :param verbose: produce verbose log record on command call + :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + :rtype: ``typing.Tuple[subprocess.Popen, None, typing.Optional[io.TextIOWrapper], typing.Optional[io.TextIOWrapper], ]`` + + .. versionadded:: 1.2.0 + .. py:method:: execute(command, verbose=False, timeout=1*60*60, **kwargs) Execute command and wait for return code. diff --git a/exec_helpers/_api.py b/exec_helpers/_api.py index 35aaf48..77cb01f 100644 --- a/exec_helpers/_api.py +++ b/exec_helpers/_api.py @@ -22,11 +22,16 @@ from __future__ import division from __future__ import unicode_literals +import logging import re import threading +import typing # noqa # pylint: disable=unused-import + +import six # noqa # pylint: disable=unused-import from exec_helpers import constants from exec_helpers import exceptions +from exec_helpers import exec_result # noqa # pylint: disable=unused-import from exec_helpers import proc_enums from exec_helpers import _log_templates @@ -44,7 +49,7 @@ def __init__( self, logger, # type: logging.Logger log_mask_re=None, # type: typing.Optional[str] - ): + ): # type: (...) -> None """ExecHelper global API. :param log_mask_re: regex lookup rule to mask command for logger. @@ -126,6 +131,78 @@ def mask(text, rules): # type: (str, str) -> str return cmd + def execute_async( + self, + command, # type: str + stdin=None, # type: typing.Union[six.text_type, six.binary_type, bytearray, None] + open_stdout=True, # type: bool + open_stderr=True, # type: bool + verbose=False, # type: bool + log_mask_re=None, # type: typing.Optional[str] + **kwargs + ): + """Execute command in async mode and return remote interface with IO objects. + + :param command: Command for execution + :type command: str + :param stdin: pass STDIN text to the process + :type stdin: typing.Union[six.text_type, six.binary_type, bytearray, None] + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool + :param verbose: produce verbose log record on command call + :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + :rtype: typing.Tuple[ + typing.Any, + typing.Any, + typing.Any, + typing.Any, + ] + + .. versionchanged:: 1.2.0 open_stdout and open_stderr flags + .. versionchanged:: 1.2.0 stdin data + """ + raise NotImplementedError # pragma: no cover + + def _exec_command( + self, + command, # type: str + interface, # type: typing.Any, + stdout, # type: typing.Any, + stderr, # type: typing.Any, + timeout, # type: int + verbose=False, # type: bool + log_mask_re=None, # type: typing.Optional[str] + **kwargs + ): # type: (...) -> exec_result.ExecResult + """Get exit status from channel with timeout. + + :param command: Command for execution + :type command: str + :param interface: Control interface + :type interface: typing.Any + :param stdout: STDOUT pipe or file-like object + :type stdout: typing.Any + :param stderr: STDERR pipe or file-like object + :type stderr: typing.Any + :param timeout: Timeout for command execution + :type timeout: int + :param verbose: produce verbose log record on command call + :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + :rtype: ExecResult + :raises ExecHelperTimeoutError: Timeout exceeded + + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd + """ + raise NotImplementedError # pragma: no cover + def execute( self, command, # type: str @@ -148,7 +225,33 @@ def execute( .. versionchanged:: 1.2.0 default timeout 1 hour """ - raise NotImplementedError() # pragma: no cover + with self.lock: + ( + iface, # type: typing.Any + _, + stderr, # type: typing.Any + stdout, # type: typing.Any + ) = self.execute_async( + command, + verbose=verbose, + **kwargs + ) + + result = self._exec_command( + command=command, + interface=iface, + stdout=stdout, + stderr=stderr, + timeout=timeout, + verbose=verbose, + **kwargs + ) + message = _log_templates.CMD_RESULT.format(result=result) + self.logger.log( + level=logging.INFO if verbose else logging.DEBUG, + msg=message + ) + return result def check_call( self, diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 074d4ae..4e83dd2 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -114,7 +114,7 @@ def __prepare__( .. versionadded:: 1.2.0 """ - return collections.OrderedDict() + return collections.OrderedDict() # pragma: no cover def __call__( cls, @@ -225,7 +225,7 @@ def __init__( self, ssh, # type: SSHClientBase enforce=None # type: typing.Optional[bool] - ): + ): # type: (...) -> None """Context manager for call commands with sudo. :type ssh: SSHClient @@ -259,7 +259,7 @@ def __init__( password=None, # type: typing.Optional[str] private_keys=None, # type: typing.Optional[_type_RSAKeys] auth=None, # type: typing.Optional[ssh_auth.SSHAuth] - ): + ): # type: (...) -> None """SSHClient helper. :param host: remote hostname @@ -488,24 +488,28 @@ def sudo( def execute_async( self, command, # type: str - get_pty=False, # type: bool - stdin=None, # type: typing.Union[six.text_type, six.binary_type, None] + stdin=None, # type: typing.Union[six.text_type, six.binary_type, bytearray, None] open_stdout=True, # type: bool open_stderr=True, # type: bool + verbose=False, # type: bool + log_mask_re=None, # type: typing.Optional[str] **kwargs ): # type: (...) -> _type_execute_async """Execute command in async mode and return channel with IO objects. :param command: Command for execution :type command: str - :param get_pty: open PTY on remote machine - :type get_pty: bool :param stdin: pass STDIN text to the process - :type stdin: typing.Union[six.text_type, six.binary_type, None] + :type stdin: typing.Union[six.text_type, six.binary_type, bytearray, None] :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool + :param verbose: produce verbose log record on command call + :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] :rtype: typing.Tuple[ paramiko.Channel, paramiko.ChannelFile, @@ -515,20 +519,21 @@ def execute_async( .. versionchanged:: 1.2.0 open_stdout and open_stderr flags .. versionchanged:: 1.2.0 stdin data + .. versionchanged:: 1.2.0 get_pty moved to `**kwargs` """ cmd_for_log = self._mask_command( cmd=command, - log_mask_re=kwargs.get('log_mask_re', None) + log_mask_re=log_mask_re ) self.logger.log( - level=logging.INFO if kwargs.get('verbose') else logging.DEBUG, + level=logging.INFO if verbose else logging.DEBUG, msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) ) chan = self._ssh.get_transport().open_session() - if get_pty: + if kwargs.get('get_pty', False): # Open PTY chan.get_pty( term='vt100', @@ -536,45 +541,45 @@ def execute_async( width_pixels=0, height_pixels=0 ) - _stdin = chan.makefile('wb') + _stdin = chan.makefile('wb') # type: paramiko.ChannelFile stdout = chan.makefile('rb') if open_stdout else None stderr = chan.makefile_stderr('rb') if open_stderr else None + cmd = "{command}\n".format(command=command) if self.sudo_mode: encoded_cmd = base64.b64encode(cmd.encode('utf-8')).decode('utf-8') - cmd = ( - "sudo -S bash -c 'eval \"$(base64 -d <(echo \"{0}\"))\"'" - ).format( - encoded_cmd - ) + cmd = "sudo -S bash -c 'eval \"$(base64 -d <(echo \"{0}\"))\"'".format(encoded_cmd) chan.exec_command(cmd) # nosec # Sanitize on caller side if stdout.channel.closed is False: self.auth.enter_password(_stdin) _stdin.flush() else: chan.exec_command(cmd) # nosec # Sanitize on caller side + if stdin is not None: - if not isinstance(stdin, six.binary_type): - stdin = stdin.encode(encoding='utf-8') - _stdin.write('{}\n'.format(stdin)) - _stdin.flush() + if not _stdin.channel.closed: + _stdin.write('{stdin}\n'.format(stdin=stdin)) + _stdin.flush() + else: + self.logger.warning('STDIN Send failed: closed channel') return chan, _stdin, stderr, stdout - def __exec_command( + def _exec_command( self, command, # type: str - channel, # type: paramiko.channel.Channel + interface, # type: paramiko.channel.Channel stdout, # type: paramiko.channel.ChannelFile stderr, # type: paramiko.channel.ChannelFile timeout, # type: int verbose=False, # type: bool log_mask_re=None, # type: typing.Optional[str] + **kwargs ): # type: (...) -> exec_result.ExecResult """Get exit status from channel with timeout. :type command: str - :type channel: paramiko.channel.Channel + :type interface: paramiko.channel.Channel :type stdout: paramiko.channel.ChannelFile :type stderr: paramiko.channel.ChannelFile :type timeout: int @@ -589,18 +594,15 @@ def __exec_command( """ def poll_streams( result, # type: exec_result.ExecResult - channel, # type: paramiko.channel.Channel - stdout, # type: paramiko.channel.ChannelFile - stderr, # type: paramiko.channel.ChannelFile ): """Poll FIFO buffers if data available.""" - if stdout and channel.recv_ready(): + if stdout and interface.recv_ready(): result.read_stdout( src=stdout, log=self.logger, verbose=verbose ) - if stderr and channel.recv_stderr_ready(): + if stderr and interface.recv_stderr_ready(): result.read_stderr( src=stderr, log=self.logger, @@ -609,11 +611,8 @@ def poll_streams( @threaded.threadpooled def poll_pipes( - stdout, # type: paramiko.channel.ChannelFile - stderr, # type: paramiko.channel.ChannelFile result, # type: exec_result.ExecResult stop, # type: threading.Event - channel # type: paramiko.channel.Channel ): """Polling task for FIFO buffers. @@ -626,14 +625,9 @@ def poll_pipes( while not stop.isSet(): time.sleep(0.1) if stdout or stderr: - poll_streams( - result=result, - channel=channel, - stdout=stdout, - stderr=stderr, - ) + poll_streams(result=result) - if channel.status_event.is_set(): + if interface.status_event.is_set(): result.read_stdout( src=stdout, log=self.logger, @@ -643,7 +637,7 @@ def poll_pipes( log=self.logger, verbose=verbose ) - result.exit_code = channel.exit_status + result.exit_code = interface.exit_status stop.set() @@ -660,11 +654,8 @@ def poll_pipes( # pylint: disable=assignment-from-no-return future = poll_pipes( - stdout=stdout, - stderr=stderr, result=result, stop=stop_event, - channel=channel ) # type: concurrent.futures.Future # pylint: enable=assignment-from-no-return @@ -673,11 +664,11 @@ def poll_pipes( # Process closed? if stop_event.isSet(): stop_event.clear() - channel.close() + interface.close() return result stop_event.set() - channel.close() + interface.close() future.cancel() wait_err_msg = _log_templates.CMD_WAIT_ERROR.format( @@ -687,49 +678,6 @@ def poll_pipes( self.logger.debug(wait_err_msg) raise exceptions.ExecHelperTimeoutError(wait_err_msg) - def execute( - self, - command, # type: str - verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] - **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command and wait for return code. - - :param command: Command for execution - :type command: str - :param verbose: Produce log.info records for command call and output - :type verbose: bool - :param timeout: Timeout for command execution. - :type timeout: typing.Optional[int] - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded - - .. versionchanged:: 1.2.0 default timeout 1 hour - """ - ( - chan, # type: paramiko.channel.Channel - _, - stderr, # type: paramiko.channel.ChannelFile - stdout, # type: paramiko.channel.ChannelFile - ) = self.execute_async( - command, - verbose=verbose, - **kwargs - ) - - result = self.__exec_command( - command, chan, stdout, stderr, timeout, - verbose=verbose, - log_mask_re=kwargs.get('log_mask_re', None), - ) - message = _log_templates.CMD_RESULT.format(result=result) - self.logger.log( - level=logging.INFO if verbose else logging.DEBUG, - msg=message - ) - return result - def execute_through_host( self, hostname, # type: str @@ -767,7 +715,7 @@ def execute_through_host( cmd=command, log_mask_re=kwargs.get('log_mask_re', None) ) - logger.log( + self.logger.log( level=logging.INFO if verbose else logging.DEBUG, msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) ) @@ -801,7 +749,7 @@ def execute_through_host( channel.exec_command(command) # nosec # Sanitize on caller side # noinspection PyDictCreation - result = self.__exec_command( + result = self._exec_command( command, channel, stdout, stderr, timeout, verbose=verbose, log_mask_re=kwargs.get('log_mask_re', None), ) diff --git a/exec_helpers/exec_result.py b/exec_helpers/exec_result.py index 924497d..a14499f 100644 --- a/exec_helpers/exec_result.py +++ b/exec_helpers/exec_result.py @@ -51,7 +51,7 @@ class ExecResult(object): def __init__( self, cmd, # type: str - stdin=None, # type: typing.Union[six.text_type, six.binary_type, None] + stdin=None, # type: typing.Union[six.text_type, six.binary_type, bytearray, None] stdout=None, # type: typing.Optional[typing.Iterable[bytes]] stderr=None, # type: typing.Optional[typing.Iterable[bytes]] exit_code=proc_enums.ExitCodes.EX_INVALID # type: _type_exit_codes @@ -61,7 +61,7 @@ def __init__( :param cmd: command :type cmd: str :param stdin: string STDIN - :type stdin: typing.Union[six.text_type, six.binary_type, None] + :type stdin: typing.Union[six.text_type, six.binary_type, bytearray, None] :param stdout: binary STDOUT :type stdout: typing.Optional[typing.Iterable[bytes]] :param stderr: binary STDERR @@ -72,7 +72,9 @@ def __init__( self.__lock = threading.RLock() self.__cmd = cmd - if stdin is not None and not isinstance(stdin, six.text_type): + if isinstance(stdin, six.binary_type): + stdin = self._get_str_from_bin(bytearray(stdin)) + elif isinstance(stdin, bytearray): stdin = self._get_str_from_bin(stdin) self.__stdin = stdin self.__stdout = tuple(stdout) if stdout is not None else () @@ -148,7 +150,7 @@ def cmd(self): # type: () -> str return self.__cmd @property - def stdin(self): # type: () -> str + def stdin(self): # type: () -> typing.Optional[str] """Stdin input as string. :rtype: str diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index 49492f2..4de232d 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -21,6 +21,8 @@ from __future__ import unicode_literals import collections +import errno +import io import logging import os import select @@ -34,20 +36,23 @@ import threaded from exec_helpers import _api -from exec_helpers import constants from exec_helpers import exec_result from exec_helpers import exceptions -from exec_helpers import proc_enums from exec_helpers import _log_templates logger = logging.getLogger(__name__) # noinspection PyUnresolvedReferences devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+ +_type_execute_async = typing.Tuple[ + subprocess.Popen, + None, + typing.Optional[io.TextIOWrapper], + typing.Optional[io.TextIOWrapper], +] + _win = sys.platform == "win32" _posix = 'posix' in sys.builtin_module_names -_type_exit_codes = typing.Union[int, proc_enums.ExitCodes] -_type_expected = typing.Optional[typing.Iterable[_type_exit_codes]] if _posix: # pragma: no cover import fcntl # pylint: disable=import-error @@ -89,12 +94,12 @@ def __prepare__( .. versionadded:: 1.2.0 """ - return collections.OrderedDict() + return collections.OrderedDict() # pragma: no cover def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None """Set PIPE unblocked to allow polling of all pipes in parallel.""" - descriptor = pipe.fileno() + descriptor = pipe.fileno() # pragma: no cover if _posix: # pragma: no cover # Get flags @@ -126,14 +131,10 @@ def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None class Subprocess(six.with_metaclass(SingletonMeta, _api.ExecHelper)): """Subprocess helper with timeouts and lock-free FIFO.""" - __slots__ = ( - '__process', - ) - def __init__( self, log_mask_re=None, # type: typing.Optional[str] - ): + ): # type: (...) -> None """Subprocess helper with timeouts and lock-free FIFO. For excluding race-conditions we allow to run 1 command simultaneously @@ -150,55 +151,41 @@ def __init__( ) self.__process = None - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager usage.""" - if self.__process: - self.__process.kill() - super(Subprocess, self).__exit__(exc_type, exc_val, exc_tb) - - def __del__(self): - """Destructor. Kill running subprocess, if it running.""" - if self.__process: - self.__process.kill() - - def __exec_command( + def _exec_command( self, command, # type: str - cwd=None, # type: typing.Optional[str] - env=None, # type: typing.Optional[typing.Dict[str, typing.Any]] - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] + interface, # type: subprocess.Popen, + stdout, # type: typing.Optional[io.TextIOWrapper], + stderr, # type: typing.Optional[io.TextIOWrapper], + timeout, # type: int verbose=False, # type: bool log_mask_re=None, # type: typing.Optional[str] - stdin=None, # type: typing.Union[six.text_type, six.binary_type, None] - open_stdout=True, # type: bool - open_stderr=True, # type: bool - ): - """Command executor helper. + **kwargs + ): # type: (...) -> exec_result.ExecResult + """Get exit status from channel with timeout. + :param command: Command for execution :type command: str - :type cwd: str - :type env: dict + :param interface: Control interface + :type interface: subprocess.Popen + :param stdout: STDOUT pipe or file-like object + :type stdout: typing.Any + :param stderr: STDERR pipe or file-like object + :type stderr: typing.Any + :param timeout: Timeout for command execution :type timeout: int - :param verbose: use INFO log level instead of DEBUG - :type verbose: str + :param verbose: produce verbose log record on command call + :type verbose: bool :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: typing.Optional[str] - :type stdin: typing.Union[six.text_type, six.binary_type, None] - :param open_stdout: open STDOUT stream for read - :type open_stdout: bool - :param open_stderr: open STDERR stream for read - :type open_stderr: bool :rtype: ExecResult + :raises ExecHelperTimeoutError: Timeout exceeded - .. versionchanged:: 1.2.0 open_stdout and open_stderr flags - .. versionchanged:: 1.2.0 default timeout 1 hour - .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd + .. versionadded:: 1.2.0 """ def poll_streams( result, # type: exec_result.ExecResult - stdout, # type: io.TextIOWrapper - stderr, # type: io.TextIOWrapper ): """Poll streams to the result object.""" if _win: # pragma: no cover @@ -227,141 +214,163 @@ def poll_streams( @threaded.threaded(started=True, daemon=True) def poll_pipes( result, # type: exec_result.ExecResult - stop # type: threading.Event + stop, # type: threading.Event ): """Polling task for FIFO buffers. - :type result: exec_result.ExecResult - :type stop: threading.Event + :type result: ExecResult + :type stop: Event """ while not stop.isSet(): time.sleep(0.1) - if open_stdout or open_stderr: - poll_streams( - result=result, - stdout=self.__process.stdout, - stderr=self.__process.stderr, - ) + if stdout or stderr: + poll_streams(result=result) - self.__process.poll() + interface.poll() - if self.__process.returncode is not None: + if interface.returncode is not None: result.read_stdout( - src=self.__process.stdout, + src=stdout, log=logger, verbose=verbose ) result.read_stderr( - src=self.__process.stderr, + src=stderr, log=logger, verbose=verbose ) - result.exit_code = self.__process.returncode + result.exit_code = interface.returncode stop.set() - # 1 Command per run - with self.lock: - cmd_for_log = self._mask_command( - cmd=command, - log_mask_re=log_mask_re - ) - - # Store command with hidden data - result = exec_result.ExecResult(cmd=cmd_for_log) - stop_event = threading.Event() - - logger.log( - level=logging.INFO if verbose else logging.DEBUG, - msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) - ) - - # Run - self.__process = subprocess.Popen( - args=[command], - stdout=subprocess.PIPE if open_stdout else devnull, - stderr=subprocess.PIPE if open_stderr else devnull, - stdin=subprocess.PIPE, - shell=True, - cwd=cwd, - env=env, - universal_newlines=False, - ) - if stdin is not None: - if not isinstance(stdin, six.binary_type): - stdin = stdin.encode(encoding='utf-8') - self.__process.stdin.write(stdin) - self.__process.stdin.close() - - # Poll output - - if open_stdout: - set_nonblocking_pipe(self.__process.stdout) - if open_stderr: - set_nonblocking_pipe(self.__process.stderr) - # pylint: disable=assignment-from-no-return - poll_thread = poll_pipes( - result, - stop_event - ) # type: threading.Thread - # pylint: enable=assignment-from-no-return - # wait for process close - stop_event.wait(timeout) - - # Process closed? - if stop_event.isSet(): - stop_event.clear() - self.__process = None - return result - # Kill not ended process and wait for close - try: - self.__process.kill() # kill -9 - stop_event.wait(5) - # Force stop cycle if no exit code after kill - stop_event.set() - poll_thread.join(5) - except OSError: - # Nothing to kill - logger.warning( - u"{!s} has been completed just after timeout: " - "please validate timeout.".format(command)) - self.__process = None - - wait_err_msg = _log_templates.CMD_WAIT_ERROR.format( - result=result, - timeout=timeout - ) - logger.debug(wait_err_msg) - raise exceptions.ExecHelperTimeoutError(wait_err_msg) - - def execute( + # Store command with hidden data + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=log_mask_re + ) + + result = exec_result.ExecResult(cmd=cmd_for_log) + stop_event = threading.Event() + + # pylint: disable=assignment-from-no-return + poll_thread = poll_pipes( + result, + stop_event + ) # type: threading.Thread + # pylint: enable=assignment-from-no-return + # wait for process close + stop_event.wait(timeout) + + # Process closed? + if stop_event.isSet(): + stop_event.clear() + return result + # Kill not ended process and wait for close + try: + interface.kill() # kill -9 + stop_event.wait(5) + # Force stop cycle if no exit code after kill + stop_event.set() + poll_thread.join(5) + except OSError: + # Nothing to kill + logger.warning( + u"{!s} has been completed just after timeout: " + "please validate timeout.".format(command)) + return result + + wait_err_msg = _log_templates.CMD_WAIT_ERROR.format( + result=result, + timeout=timeout + ) + logger.debug(wait_err_msg) + raise exceptions.ExecHelperTimeoutError(wait_err_msg) + + def execute_async( self, command, # type: str + stdin=None, # type: typing.Union[six.text_type, six.binary_type, bytearray, None] + open_stdout=True, # type: bool + open_stderr=True, # type: bool verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] + log_mask_re=None, # type: typing.Optional[str] **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command and wait for return code. - - Timeout limitation: read tick is 100 ms. + ): # type: (...) -> _type_execute_async + """Execute command in async mode and return Popen with IO objects. :param command: Command for execution :type command: str - :param verbose: Produce log.info records for command call and output + :param stdin: pass STDIN text to the process + :type stdin: typing.Union[six.text_type, six.binary_type, bytearray, None] + :param open_stdout: open STDOUT stream for read + :type open_stdout: bool + :param open_stderr: open STDERR stream for read + :type open_stderr: bool + :param verbose: produce verbose log record on command call :type verbose: bool - :param timeout: Timeout for command execution. - :type timeout: typing.Optional[int] - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + :rtype: typing.Tuple[ + subprocess.Popen, + None, + typing.Optional[io.TextIOWrapper], + typing.Optional[io.TextIOWrapper], + ] - .. versionchanged:: 1.2.0 default timeout 1 hour + .. versionadded:: 1.2.0 """ - result = self.__exec_command(command=command, timeout=timeout, - verbose=verbose, **kwargs) - message = _log_templates.CMD_RESULT.format(result=result) - logger.log( + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=log_mask_re + ) + + self.logger.log( level=logging.INFO if verbose else logging.DEBUG, - msg=message + msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) + ) + + process = subprocess.Popen( + args=[command], + stdout=subprocess.PIPE if open_stdout else devnull, + stderr=subprocess.PIPE if open_stderr else devnull, + stdin=subprocess.PIPE, + shell=True, + cwd=kwargs.get('cwd', None), + env=kwargs.get('env', None), + universal_newlines=False, ) - return result + if stdin is not None: + if isinstance(stdin, six.text_type): + stdin = stdin.encode(encoding='utf-8') + elif isinstance(stdin, bytearray): + stdin = bytes(stdin) + try: + process.stdin.write(stdin) + except OSError as exc: + if exc.errno == errno.EINVAL: + # bpo-19612, bpo-30418: On Windows, stdin.write() fails + # with EINVAL if the child process exited or if the child + # process is still running but closed the pipe. + self.logger.warning('STDIN Send failed: closed PIPE') + elif exc.errno in (errno.EPIPE, errno.ESHUTDOWN): # pragma: no cover + self.logger.warning('STDIN Send failed: broken PIPE') + else: + process.kill() + raise + try: + process.stdin.close() + except OSError as exc: + if exc.errno in (errno.EINVAL, errno.EPIPE, errno.ESHUTDOWN): + pass + else: + process.kill() + raise + + if open_stdout: + set_nonblocking_pipe(process.stdout) + if open_stderr: + set_nonblocking_pipe(process.stderr) + + return process, None, process.stderr, process.stdout diff --git a/test/test_exec_result.py b/test/test_exec_result.py index a5fbef8..ea2c9b1 100644 --- a/test/test_exec_result.py +++ b/test/test_exec_result.py @@ -238,3 +238,19 @@ def test_finalize(self): with self.assertRaises(RuntimeError): result.read_stderr([b'err']) + + def test_stdin_none(self): + result = exec_helpers.ExecResult(cmd, exit_code=0) + self.assertIsNone(result.stdin) + + def test_stdin_utf(self): + result = exec_helpers.ExecResult(cmd, stdin=u'STDIN', exit_code=0) + self.assertEqual(result.stdin, u'STDIN') + + def test_stdin_bytes(self): + result = exec_helpers.ExecResult(cmd, stdin=b'STDIN', exit_code=0) + self.assertEqual(result.stdin, u'STDIN') + + def test_stdin_bytearray(self): + result = exec_helpers.ExecResult(cmd, stdin=bytearray(b'STDIN'), exit_code=0) + self.assertEqual(result.stdin, u'STDIN') diff --git a/test/test_ssh_client.py b/test/test_ssh_client.py index 1b2372a..95c338e 100644 --- a/test/test_ssh_client.py +++ b/test/test_ssh_client.py @@ -65,8 +65,7 @@ def __iter__(self): @mock.patch('exec_helpers._ssh_client_base.logger', autospec=True) -@mock.patch( - 'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy') +@mock.patch('paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy') @mock.patch('paramiko.SSHClient', autospec=True) class TestExecute(unittest.TestCase): def tearDown(self): @@ -452,6 +451,186 @@ def test_execute_async_mask_command(self, client, policy, logger): log.mock_calls ) + def test_check_stdin_str(self, client, policy, logger): + stdin_val = u'this is a line' + + stdin = mock.Mock(name='stdin') + stdin_channel = mock.Mock() + stdin_channel.configure_mock(closed=False) + stdin.attach_mock(stdin_channel, 'channel') + + stdout = mock.Mock(name='stdout') + stdout_channel = mock.Mock() + stdout_channel.configure_mock(closed=False) + stdout.attach_mock(stdout_channel, 'channel') + + chan = mock.Mock() + chan.attach_mock(mock.Mock(side_effect=[stdin, stdout]), 'makefile') + + open_session = mock.Mock(return_value=chan) + + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=print_stdin, stdin=stdin_val) + + get_transport.assert_called_once() + open_session.assert_called_once() + stdin.assert_has_calls([ + mock.call.write('{val}\n'.format(val=stdin_val)), + mock.call.flush() + ]) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{val}\n'.format(val=print_stdin)) + )) + + def test_check_stdin_bytes(self, client, policy, logger): + stdin_val = b'this is a line' + + stdin = mock.Mock(name='stdin') + stdin_channel = mock.Mock() + stdin_channel.configure_mock(closed=False) + stdin.attach_mock(stdin_channel, 'channel') + + stdout = mock.Mock(name='stdout') + stdout_channel = mock.Mock() + stdout_channel.configure_mock(closed=False) + stdout.attach_mock(stdout_channel, 'channel') + + chan = mock.Mock() + chan.attach_mock(mock.Mock(side_effect=[stdin, stdout]), 'makefile') + + open_session = mock.Mock(return_value=chan) + + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=print_stdin, stdin=stdin_val) + + get_transport.assert_called_once() + open_session.assert_called_once() + stdin.assert_has_calls([ + mock.call.write('{val}\n'.format(val=stdin_val)), + mock.call.flush() + ]) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{val}\n'.format(val=print_stdin)) + )) + + def test_check_stdin_bytearray(self, client, policy, logger): + stdin_val = bytearray(b'this is a line') + + stdin = mock.Mock(name='stdin') + stdin_channel = mock.Mock() + stdin_channel.configure_mock(closed=False) + stdin.attach_mock(stdin_channel, 'channel') + + stdout = mock.Mock(name='stdout') + stdout_channel = mock.Mock() + stdout_channel.configure_mock(closed=False) + stdout.attach_mock(stdout_channel, 'channel') + + chan = mock.Mock() + chan.attach_mock(mock.Mock(side_effect=[stdin, stdout]), 'makefile') + + open_session = mock.Mock(return_value=chan) + + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=print_stdin, stdin=stdin_val) + + get_transport.assert_called_once() + open_session.assert_called_once() + stdin.assert_has_calls([ + mock.call.write('{val}\n'.format(val=stdin_val)), + mock.call.flush() + ]) + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{val}\n'.format(val=print_stdin)) + )) + + def test_check_stdin_closed(self, client, policy, logger): + stdin_val = 'this is a line' + + stdin = mock.Mock(name='stdin') + stdin_channel = mock.Mock() + stdin_channel.configure_mock(closed=True) + stdin.attach_mock(stdin_channel, 'channel') + + stdout = mock.Mock(name='stdout') + stdout_channel = mock.Mock() + stdout_channel.configure_mock(closed=False) + stdout.attach_mock(stdout_channel, 'channel') + + chan = mock.Mock() + chan.attach_mock(mock.Mock(side_effect=[stdin, stdout]), 'makefile') + + open_session = mock.Mock(return_value=chan) + + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=print_stdin, stdin=stdin_val) + + get_transport.assert_called_once() + open_session.assert_called_once() + stdin.assert_not_called() + + log = logger.getChild('{host}:{port}'.format(host=host, port=port)) + log.warning.assert_called_once_with('STDIN Send failed: closed channel') + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{val}\n'.format(val=print_stdin)) + )) + @staticmethod def get_patched_execute_async_retval( ec=0, @@ -1047,103 +1226,9 @@ def test_check_stderr(self, check_call, client, policy, logger): command, verbose, timeout=None, error_info=None, raise_on_err=raise_on_err) - @mock.patch('exec_helpers.ssh_client.SSHClient.check_call') - def test_check_stdin_str(self, check_call, client, policy, logger): - stdin = u'this is a line' - - return_value = exec_result.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[], - exit_code=0 - ) - check_call.return_value = return_value - - verbose = False - raise_on_err = True - - # noinspection PyTypeChecker - result = self.get_ssh().check_call( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - check_call.assert_called_once_with( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - self.assertEqual(result, return_value) - - @mock.patch('exec_helpers.ssh_client.SSHClient.check_call') - def test_check_stdin_bytes(self, check_call, client, policy, logger): - stdin = b'this is a line' - - return_value = exec_result.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[], - exit_code=0 - ) - check_call.return_value = return_value - - verbose = False - raise_on_err = True - - # noinspection PyTypeChecker - result = self.get_ssh().check_call( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - check_call.assert_called_once_with( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - self.assertEqual(result, return_value) - - @mock.patch('exec_helpers.ssh_client.SSHClient.check_call') - def test_check_stdin_bytearray(self, check_call, client, policy, logger): - stdin = bytearray(b'this is a line') - - return_value = exec_result.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[], - exit_code=0 - ) - check_call.return_value = return_value - - verbose = False - raise_on_err = True - - # noinspection PyTypeChecker - result = self.get_ssh().check_call( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - check_call.assert_called_once_with( - command=print_stdin, - stdin=stdin, - verbose=verbose, - timeout=None, - raise_on_err=raise_on_err) - self.assertEqual(result, return_value) - @mock.patch('exec_helpers._ssh_client_base.logger', autospec=True) -@mock.patch( - 'paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy') +@mock.patch('paramiko.AutoAddPolicy', autospec=True, return_value='AutoAddPolicy') @mock.patch('paramiko.SSHClient', autospec=True) @mock.patch('paramiko.Transport', autospec=True) class TestExecuteThrowHost(unittest.TestCase): diff --git a/test/test_subprocess_runner.py b/test/test_subprocess_runner.py index 28f0b6d..997d2fc 100644 --- a/test/test_subprocess_runner.py +++ b/test/test_subprocess_runner.py @@ -20,11 +20,13 @@ from __future__ import division from __future__ import unicode_literals +import errno import logging import subprocess import unittest import mock +import six import exec_helpers from exec_helpers import subprocess_runner @@ -53,11 +55,12 @@ def fileno(self): @mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) @mock.patch('select.select', autospec=True) -@mock.patch( - 'exec_helpers.subprocess_runner.set_nonblocking_pipe', autospec=True -) +@mock.patch('exec_helpers.subprocess_runner.set_nonblocking_pipe', autospec=True) @mock.patch('subprocess.Popen', autospec=True, name='subprocess.Popen') class TestSubprocessRunner(unittest.TestCase): + def setUp(self): + subprocess_runner.SingletonMeta._instances.clear() + @staticmethod def prepare_close( popen, @@ -65,11 +68,12 @@ def prepare_close( stderr_val=None, ec=0, open_stdout=True, + stdout_override=None, open_stderr=True, cmd_in_result=None, ): if open_stdout: - stdout_lines = stdout_list + stdout_lines = stdout_list if stdout_override is None else stdout_override stdout = FakeFileStream(*stdout_lines) else: stdout = stdout_lines = None @@ -107,7 +111,13 @@ def gen_cmd_result_log_message(result): return ("Command exit code '{code!s}':\n{cmd!s}\n" .format(cmd=result.cmd.rstrip(), code=result.exit_code)) - def test_call(self, popen, _, select, logger): + def test_001_call( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None popen_obj, exp_result = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -154,7 +164,13 @@ def test_call(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_call_verbose(self, popen, _, select, logger): + def test_002_call_verbose( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None popen_obj, _ = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -182,7 +198,13 @@ def test_call_verbose(self, popen, _, select, logger): msg=self.gen_cmd_result_log_message(result)), ]) - def test_context_manager(self, popen, _, select, logger): + def test_003_context_manager( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None popen_obj, exp_result = self.prepare_close(popen) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -204,7 +226,7 @@ def test_context_manager(self, popen, _, select, logger): subprocess_runner.SingletonMeta._instances.clear() @mock.patch('time.sleep', autospec=True) - def test_execute_timeout_fail( + def test_004_execute_timeout_fail( self, sleep, popen, _, select, logger @@ -234,7 +256,7 @@ def test_execute_timeout_fail( ), )) - def test_execute_no_stdout(self, popen, _, select, logger): + def test_005_execute_no_stdout(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen, open_stdout=False) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -272,7 +294,7 @@ def test_execute_no_stdout(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_no_stderr(self, popen, _, select, logger): + def test_006_execute_no_stderr(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close(popen, open_stderr=False) select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] @@ -311,7 +333,7 @@ def test_execute_no_stderr(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_no_stdout_stderr(self, popen, _, select, logger): + def test_007_execute_no_stdout_stderr(self, popen, _, select, logger): popen_obj, exp_result = self.prepare_close( popen, open_stdout=False, @@ -348,7 +370,7 @@ def test_execute_no_stdout_stderr(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_mask_global(self, popen, _, select, logger): + def test_008_execute_mask_global(self, popen, _, select, logger): cmd = "USE='secret=secret_pass' do task" log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" masked_cmd = "USE='secret=<*masked*>' do task" @@ -406,7 +428,7 @@ def test_execute_mask_global(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) - def test_execute_mask_local(self, popen, _, select, logger): + def test_009_execute_mask_local(self, popen, _, select, logger): cmd = "USE='secret=secret_pass' do task" log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" masked_cmd = "USE='secret=<*masked*>' do task" @@ -462,11 +484,402 @@ def test_execute_mask_local(self, popen, _, select, logger): mock.call.poll(), popen_obj.mock_calls ) + def test_004_check_stdin_str( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = u'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin.encode('utf-8')]) + + stdin_mock = mock.Mock() + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin.encode('utf-8')), + mock.call.close() + ]) + + def test_005_check_stdin_bytes( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + stdin_mock = mock.Mock() + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + + def test_006_check_stdin_bytearray( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = bytearray(b'this is a line') + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + stdin_mock = mock.Mock() + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + + @unittest.skipIf(six.PY2, 'Not implemented exception') + def test_007_check_stdin_fail_broken_pipe( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_err = BrokenPipeError() + pipe_err.errno = errno.EPIPE + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_err), 'write') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + logger.warning.assert_called_once_with('STDIN Send failed: broken PIPE') + + def test_008_check_stdin_fail_closed_win( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_error = OSError() + pipe_error.errno = errno.EINVAL + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_error), 'write') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + logger.warning.assert_called_once_with('STDIN Send failed: closed PIPE') + + def test_009_check_stdin_fail_write( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_error = OSError() + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_error), 'write') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + with self.assertRaises(OSError): + # noinspection PyTypeChecker + runner.execute(print_stdin, stdin=stdin) + popen_obj.kill.assert_called_once() + + @unittest.skipIf(six.PY2, 'Not implemented exception') + def test_010_check_stdin_fail_close_pipe( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_err = BrokenPipeError() + pipe_err.errno = errno.EPIPE + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_err), 'close') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + logger.warning.assert_not_called() + + def test_011_check_stdin_fail_close_pipe_win( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_error = OSError() + pipe_error.errno = errno.EINVAL + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_error), 'close') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(print_stdin, stdin=stdin) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[print_stdin], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + + stdin_mock.assert_has_calls([ + mock.call.write(stdin), + mock.call.close() + ]) + logger.warning.assert_not_called() + + def test_012_check_stdin_fail_close( + self, + popen, # type: mock.MagicMock + _, # type: mock.MagicMock + select, # type: mock.MagicMock + logger # type: mock.MagicMock + ): # type: (...) -> None + stdin = b'this is a line' + + popen_obj, exp_result = self.prepare_close(popen, cmd=print_stdin, stdout_override=[stdin]) + + pipe_error = OSError() + + stdin_mock = mock.Mock() + stdin_mock.attach_mock(mock.Mock(side_effect=pipe_error), 'close') + popen_obj.attach_mock(stdin_mock, 'stdin') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + with self.assertRaises(OSError): + # noinspection PyTypeChecker + runner.execute(print_stdin, stdin=stdin) + popen_obj.kill.assert_called_once() + + @mock.patch('time.sleep', autospec=True) + def test_013_execute_timeout_done( + self, + sleep, + popen, _, select, logger + ): + popen_obj, exp_result = self.prepare_close(popen, ec=exec_helpers.ExitCodes.EX_INVALID) + popen_obj.configure_mock(returncode=None) + popen_obj.attach_mock(mock.Mock(side_effect=OSError), 'kill') + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + + res = runner.execute(command, timeout=1) + + self.assertEqual(res, exp_result) + + popen.assert_has_calls(( + mock.call( + args=[command], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + @mock.patch('exec_helpers.subprocess_runner.logger', autospec=True) class TestSubprocessRunnerHelpers(unittest.TestCase): @mock.patch('exec_helpers.subprocess_runner.Subprocess.execute') - def test_check_call(self, execute, logger): + def test_001_check_call(self, execute, logger): exit_code = 0 return_value = exec_helpers.ExecResult( cmd=command, @@ -501,7 +914,7 @@ def test_check_call(self, execute, logger): execute.assert_called_once_with(command, verbose, None) @mock.patch('exec_helpers.subprocess_runner.Subprocess.execute') - def test_check_call_expected(self, execute, logger): + def test_002_check_call_expected(self, execute, logger): exit_code = 0 return_value = exec_helpers.ExecResult( cmd=command, @@ -539,7 +952,7 @@ def test_check_call_expected(self, execute, logger): execute.assert_called_once_with(command, verbose, None) @mock.patch('exec_helpers.subprocess_runner.Subprocess.check_call') - def test_check_stderr(self, check_call, logger): + def test_003_check_stderr(self, check_call, logger): return_value = exec_helpers.ExecResult( cmd=command, stdout=stdout_list, @@ -578,96 +991,3 @@ def test_check_stderr(self, check_call, logger): check_call.assert_called_once_with( command, verbose, timeout=None, error_info=None, raise_on_err=raise_on_err) - - @mock.patch('exec_helpers.subprocess_runner.Subprocess.check_call') - def test_check_stdin_str(self, check_call, logger): - stdin = u'this is a line' - - expected_result = exec_helpers.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[b''], - exit_code=0, - ) - check_call.return_value = expected_result - - verbose = False - - runner = exec_helpers.Subprocess() - - # noinspection PyTypeChecker - result = runner.check_call( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - check_call.assert_called_once_with( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - self.assertEqual(result, expected_result) - assert result == expected_result - - @mock.patch('exec_helpers.subprocess_runner.Subprocess.check_call') - def test_check_stdin_bytes(self, check_call, logger): - stdin = b'this is a line' - - expected_result = exec_helpers.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[b''], - exit_code=0, - ) - check_call.return_value = expected_result - - verbose = False - - runner = exec_helpers.Subprocess() - - # noinspection PyTypeChecker - result = runner.check_call( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - check_call.assert_called_once_with( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - self.assertEqual(result, expected_result) - assert result == expected_result - - @mock.patch('exec_helpers.subprocess_runner.Subprocess.check_call') - def test_check_stdin_bytearray(self, check_call, logger): - stdin = bytearray(b'this is a line') - - expected_result = exec_helpers.ExecResult( - cmd=print_stdin, - stdin=stdin, - stdout=[stdin], - stderr=[b''], - exit_code=0, - ) - check_call.return_value = expected_result - - verbose = False - - runner = exec_helpers.Subprocess() - - # noinspection PyTypeChecker - result = runner.check_call( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - check_call.assert_called_once_with( - command=print_stdin, - verbose=verbose, - timeout=None, - stdin=stdin) - self.assertEqual(result, expected_result) - assert result == expected_result diff --git a/tox.ini b/tox.ini index c1ac392..bee2e03 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ deps = commands = py.test -vv --junitxml=unit_result.xml --html=report.html --cov-config .coveragerc --cov-report html --cov=exec_helpers {posargs:test} - coverage report --fail-under 95 + coverage report --fail-under 97 [testenv:py27-nocov] usedevelop = False @@ -132,6 +132,7 @@ ignore = show-pep8 = True show-source = True count = True +max-line-length = 120 [testenv:docs] deps =