From a8050b0e855b6771d07c8e6c1f5d0974e69ecec6 Mon Sep 17 00:00:00 2001 From: Alexey Stepanov Date: Fri, 6 Apr 2018 17:24:07 +0200 Subject: [PATCH 1/3] Fix docstrings and :raises: constructions. Divorce DeserializeValueError from ExecHelperError --- doc/source/ExecResult.rst | 8 ++++++-- doc/source/SSHClient.rst | 16 +++++++++------- doc/source/Subprocess.rst | 8 +++++--- doc/source/exceptions.rst | 4 ++++ exec_helpers/_ssh_client_base.py | 22 +++++++++++++--------- exec_helpers/exceptions.py | 6 ++++++ exec_helpers/exec_result.py | 11 ++++++----- exec_helpers/ssh_auth.py | 2 +- exec_helpers/subprocess_runner.py | 12 +++++++++--- 9 files changed, 59 insertions(+), 30 deletions(-) diff --git a/doc/source/ExecResult.rst b/doc/source/ExecResult.rst index d0bad19..6330ae3 100644 --- a/doc/source/ExecResult.rst +++ b/doc/source/ExecResult.rst @@ -84,14 +84,18 @@ API: ExecResult .. py:attribute:: stdout_json - ``typing.Any`` JSON from stdout. + :rtype: ``typing.Any`` + :raises DeserializeValueError: STDOUT can not be deserialized as JSON + .. py:attribute:: stdout_yaml - ``typing.Any`` YAML from stdout. + :rtype: ``typing.Any`` + :raises DeserializeValueError: STDOUT can not be deserialized as YAML + .. py:method:: read_stdout(src=None, log=None, verbose=False) Read stdout file-like object to stdout. diff --git a/doc/source/SSHClient.rst b/doc/source/SSHClient.rst index c840d49..ccf423c 100644 --- a/doc/source/SSHClient.rst +++ b/doc/source/SSHClient.rst @@ -122,7 +122,7 @@ API: SSHClient and SSHAuth. :param timeout: Timeout for command execution. :type timeout: ``typing.Optional[int]`` :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour @@ -143,7 +143,8 @@ API: SSHClient and SSHAuth. :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: ``bool`` :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code .. versionchanged:: 1.2.0 default timeout 1 hour @@ -162,7 +163,8 @@ API: SSHClient and SSHAuth. :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: ``bool`` :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code or stderr presents .. note:: expected return codes can be overridden via kwargs. .. versionchanged:: 1.2.0 default timeout 1 hour @@ -186,7 +188,7 @@ API: SSHClient and SSHAuth. :param get_pty: open PTY on target machine :type get_pty: ``bool`` :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour @@ -206,8 +208,8 @@ API: SSHClient and SSHAuth. :type raise_on_err: ``bool`` :return: dictionary {(hostname, port): result} :rtype: typing.Dict[typing.Tuple[str, int], ExecResult] - :raises: ParallelCallProcessError - :raises: ParallelCallExceptions + :raises ParallelCallProcessError: Unexpected any code at lest on one target + :raises ParallelCallExceptions: At lest one exception raised during execution (including timeout) .. versionchanged:: 1.2.0 default timeout 1 hour @@ -351,4 +353,4 @@ API: SSHClient and SSHAuth. :type port: ``int`` :param log: Log on generic connection failure :type log: ``bool`` - :raises: paramiko.AuthenticationException + :raises paramiko.AuthenticationException: Authentication failed. diff --git a/doc/source/Subprocess.rst b/doc/source/Subprocess.rst index de86282..26f204e 100644 --- a/doc/source/Subprocess.rst +++ b/doc/source/Subprocess.rst @@ -35,7 +35,7 @@ API: Subprocess :param timeout: Timeout for command execution. :type timeout: ``typing.Optional[int]`` :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.1.0 make method .. versionchanged:: 1.2.0 @@ -60,7 +60,8 @@ API: Subprocess :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: ``bool`` :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code .. versionchanged:: 1.1.0 make method .. versionchanged:: 1.2.0 default timeout 1 hour @@ -80,7 +81,8 @@ API: Subprocess :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: ``bool`` :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code or stderr presents .. note:: expected return codes can be overridden via kwargs. diff --git a/doc/source/exceptions.rst b/doc/source/exceptions.rst index 789ccff..d558cae 100644 --- a/doc/source/exceptions.rst +++ b/doc/source/exceptions.rst @@ -10,6 +10,10 @@ API: exceptions Base class for all exceptions raised inside. +.. py:exception:: DeserializeValueError(ExecHelperError, ValueError) + + Deserialize impossible. + .. py:exception:: ExecHelperTimeoutError(ExecHelperError) Execution timeout. diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 2d5f9ef..9a7b931 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -412,7 +412,7 @@ def _sftp(self): # type: () -> paramiko.sftp_client.SFTPClient """SFTP channel access for inheritance. :rtype: paramiko.sftp_client.SFTPClient - :raises: paramiko.SSHException + :raises paramiko.SSHException: SFTP connection failed """ if self.__sftp is not None: return self.__sftp @@ -445,7 +445,7 @@ def close(self): @close.class_method def close(cls): # pylint: disable=no-self-argument """Close all memorized SSH and SFTP sessions.""" - # noinspection PyDeprecation + # noinspection PyUnresolvedReferences cls.__class__.close_connections() @classmethod @@ -589,7 +589,7 @@ def __exec_command( :type timeout: int :type verbose: bool :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded """ def poll_streams( result, # type: exec_result.ExecResult @@ -705,7 +705,7 @@ def execute( :param timeout: Timeout for command execution. :type timeout: typing.Optional[int] :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour """ @@ -752,7 +752,8 @@ def check_call( :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: bool :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code .. versionchanged:: 1.2.0 default timeout 1 hour """ @@ -795,7 +796,8 @@ def check_stderr( :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: bool :rtype: ExecResult - :raises: CalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code or stderr presents .. note:: expected return codes can be overridden via kwargs. .. versionchanged:: 1.2.0 default timeout 1 hour @@ -845,7 +847,7 @@ def execute_through_host( :param get_pty: open PTY on target machine :type get_pty: bool :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour """ @@ -909,8 +911,10 @@ def execute_together( :type raise_on_err: bool :return: dictionary {(hostname, port): result} :rtype: typing.Dict[typing.Tuple[str, int], exec_result.ExecResult] - :raises: ParallelCallProcessError - :raises: ParallelCallExceptions + :raises ParallelCallProcessError: + Unexpected any code at lest on one target + :raises ParallelCallExceptions: + At lest one exception raised during execution (including timeout) .. versionchanged:: 1.2.0 default timeout 1 hour """ diff --git a/exec_helpers/exceptions.py b/exec_helpers/exceptions.py index 457b847..ac714a2 100644 --- a/exec_helpers/exceptions.py +++ b/exec_helpers/exceptions.py @@ -41,6 +41,12 @@ class ExecHelperError(Exception): __slots__ = () +class DeserializeValueError(ExecHelperError, ValueError): + """Deserialize impossible.""" + + __slots__ = () + + class ExecHelperTimeoutError(ExecHelperError): """Execution timeout.""" diff --git a/exec_helpers/exec_result.py b/exec_helpers/exec_result.py index 214e751..a82c8fa 100644 --- a/exec_helpers/exec_result.py +++ b/exec_helpers/exec_result.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -"""Execution restult.""" +"""Execution result.""" from __future__ import absolute_import from __future__ import division @@ -322,7 +322,8 @@ def __deserialize(self, fmt): # type: (str) -> typing.Any :type fmt: str :rtype: object - :raises: DevopsError + :raises NotImplementedError: fmt deserialization not implemented + :raises DeserializeValueError: Not valid source format """ try: if fmt == 'json': @@ -335,7 +336,7 @@ def __deserialize(self, fmt): # type: (str) -> typing.Any '{{stdout!r}}\n'.format( fmt=fmt)) logger.exception(self.cmd + tmpl.format(stdout=self.stdout_str)) - raise exceptions.ExecHelperError( + raise exceptions.DeserializeValueError( self.cmd + tmpl.format(stdout=self.stdout_brief) ) msg = '{fmt} deserialize target is not implemented'.format(fmt=fmt) @@ -408,7 +409,7 @@ def __str__(self): ) def __eq__(self, other): - """Comparsion.""" + """Comparision.""" return all( ( getattr(self, val) == getattr(other, val) @@ -417,7 +418,7 @@ def __eq__(self, other): ) def __ne__(self, other): - """Comparsion.""" + """Comparision.""" return not self.__eq__(other) def __hash__(self): diff --git a/exec_helpers/ssh_auth.py b/exec_helpers/ssh_auth.py index 0c2e285..6d128d9 100644 --- a/exec_helpers/ssh_auth.py +++ b/exec_helpers/ssh_auth.py @@ -159,7 +159,7 @@ def connect( :type port: int :param log: Log on generic connection failure :type log: bool - :raises: paramiko.AuthenticationException + :raises paramiko.AuthenticationException: Authentication failed. """ kwargs = { 'username': self.username, diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index 4858bf1..84095bd 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -39,6 +39,7 @@ from exec_helpers import _log_templates logger = logging.getLogger(__name__) +# noinspection PyUnresolvedReferences devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+ _win = sys.platform == "win32" @@ -50,6 +51,7 @@ import fcntl # pylint: disable=import-error elif _win: # pragma: no cover + # noinspection PyUnresolvedReferences import msvcrt # pylint: disable=import-error import ctypes from ctypes import wintypes # pylint: disable=import-error @@ -105,6 +107,7 @@ def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None fcntl.fcntl(descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK) elif _win: # pragma: no cover + # noinspection PyPep8Naming SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState SetNamedPipeHandleState.argtypes = [ wintypes.HANDLE, @@ -113,6 +116,7 @@ def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None wintypes.LPDWORD ] SetNamedPipeHandleState.restype = wintypes.BOOL + # noinspection PyPep8Naming PIPE_NOWAIT = wintypes.DWORD(0x00000001) handle = msvcrt.get_osfhandle(descriptor) @@ -327,7 +331,7 @@ def execute( :type verbose: bool :type timeout: typing.Optional[int] :rtype: ExecResult - :raises: ExecHelperTimeoutError + :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour """ @@ -362,7 +366,8 @@ def check_call( :type expected: typing.Optional[typing.Iterable[_type_exit_codes]] :type raise_on_err: bool :rtype: ExecResult - :raises: DevopsCalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code .. versionchanged:: 1.2.0 default timeout 1 hour """ @@ -402,7 +407,8 @@ def check_stderr( :type error_info: typing.Optional[str] :type raise_on_err: bool :rtype: ExecResult - :raises: DevopsCalledProcessError + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code or stderr presents .. versionchanged:: 1.2.0 default timeout 1 hour """ From 8627a6b8be330fcc18d135768f45d4c7534188c8 Mon Sep 17 00:00:00 2001 From: Alexey Stepanov Date: Thu, 12 Apr 2018 14:50:07 +0200 Subject: [PATCH 2/3] Allow masking part of command in logs. Fix #13 --- README.rst | 14 ++ doc/source/SSHClient.rst | 6 + doc/source/Subprocess.rst | 15 ++ exec_helpers/_api.py | 246 ++++++++++++++++++++++++++++++ exec_helpers/_ssh_client_base.py | 227 +++++++++------------------ exec_helpers/subprocess_runner.py | 172 ++++++--------------- setup.py | 1 + test/test_ssh_client.py | 160 ++++++++++++++++--- test/test_subprocess_runner.py | 128 +++++++++++++++- 9 files changed, 664 insertions(+), 305 deletions(-) create mode 100644 exec_helpers/_api.py diff --git a/README.rst b/README.rst index c7469ab..3651ff0 100644 --- a/README.rst +++ b/README.rst @@ -160,6 +160,20 @@ If no STDOUT or STDERR required, it is possible to disable this FIFO pipes via ` The next command level uses lower level and kwargs are forwarded, so expected exit codes are forwarded from `check_stderr`. Implementation specific flags are always set via kwargs. +If required to mask part of command from logging, `log_mask_re` attribute can be set global over instance or providden with command. +All regex matched groups will be replaced by `'<*masked*>'`. + +.. code-block:: python + + result = helper.execute( + command="AUTH='top_secret_key'; run command", # type: str + verbose=False, # type: bool + timeout=1 * 60 * 60, # type: typing.Optional[int] + log_mask_re=r"AUTH\s*=\s*'(\w+)'" # type: typing.Optional[str] + ) + +`result.cmd` will be equal to `AUTH='<*masked*>'; run command` + ExecResult ---------- diff --git a/doc/source/SSHClient.rst b/doc/source/SSHClient.rst index ccf423c..73d3bee 100644 --- a/doc/source/SSHClient.rst +++ b/doc/source/SSHClient.rst @@ -27,6 +27,12 @@ API: SSHClient and SSHAuth. .. note:: auth has priority over username/password/private_keys + .. py:attribute:: log_mask_re + + ``typing.Optional[str]`` + + regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' + .. py:attribute:: lock ``threading.RLock`` diff --git a/doc/source/Subprocess.rst b/doc/source/Subprocess.rst index 26f204e..f883e06 100644 --- a/doc/source/Subprocess.rst +++ b/doc/source/Subprocess.rst @@ -8,6 +8,21 @@ API: Subprocess .. py:class:: Subprocess() + .. py:method:: __init__(logger, log_mask_re=None) + + ExecHelper global API. + + :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd + + .. py:attribute:: log_mask_re + + ``typing.Optional[str]`` + + regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' + .. py:attribute:: lock ``threading.RLock`` diff --git a/exec_helpers/_api.py b/exec_helpers/_api.py new file mode 100644 index 0000000..e5a1f1d --- /dev/null +++ b/exec_helpers/_api.py @@ -0,0 +1,246 @@ +# Copyright 2018 Alexey Stepanov aka penguinolog. + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""ExecHelpers global API. + +.. versionadded:: 1.2.0 +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import re +import threading + +from exec_helpers import constants +from exec_helpers import exceptions +from exec_helpers import proc_enums +from exec_helpers import _log_templates + + +class ExecHelper(object): + """ExecHelper global API.""" + + __slots__ = ( + '__lock', + '__logger', + 'log_mask_re' + ) + + def __init__( + self, + logger, # type: logging.Logger + log_mask_re=None, # type: typing.Optional[str] + ): + """ExecHelper global API. + + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd + """ + self.__lock = threading.RLock() + self.__logger = logger + self.log_mask_re = log_mask_re + + @property + def logger(self): # type: () -> logging.Logger + """Instance logger access.""" + return self.__logger + + @property + def lock(self): # type: () -> threading.RLock + """Lock. + + :rtype: threading.RLock + """ + return self.__lock + + def __enter__(self): + """Get context manager. + + .. versionchanged:: 1.1.0 lock on enter + """ + self.lock.acquire() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager usage.""" + raise NotImplementedError() # pragma: no cover + + def _mask_command( + self, + cmd, # type: str + log_mask_re=None, # type: typing.Optional[str] + ): # type: (...) -> str + """Log command with masking and return parsed cmd. + + :type cmd: str + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] + + .. versionadded:: 1.2.0 + """ + def mask(text, rules): # type: (str, str) -> str + """Mask part of text using rules.""" + indexes = [0] # Start of the line + + # places to exclude + for match in re.finditer(rules, text): + for idx, _ in enumerate(match.groups()): + indexes.extend(match.span(idx + 1)) + indexes.append(len(text)) # End + + masked = "" + + # Replace inserts + for idx in range(0, len(indexes) - 2, 2): + start = indexes[idx] + end = indexes[idx + 1] + masked += text[start: end] + '<*masked*>' + + masked += text[indexes[-2]: indexes[-1]] # final part + return masked + + cmd = cmd.rstrip() + + if self.log_mask_re: + cmd = mask(cmd, self.log_mask_re) + if log_mask_re: + cmd = mask(cmd, log_mask_re) + + return cmd + + def execute( + self, + command, # type: str + verbose=False, # type: bool + timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] + **kwargs + ): # type: (...) -> exec_result.ExecResult + """Execute command and wait for return code. + + Timeout limitation: read tick is 100 ms. + + :param command: Command for execution + :type command: str + :param verbose: Produce log.info records for command call and output + :type verbose: bool + :param timeout: Timeout for command execution. + :type timeout: typing.Optional[int] + :rtype: ExecResult + :raises ExecHelperTimeoutError: Timeout exceeded + + .. versionchanged:: 1.2.0 default timeout 1 hour + """ + raise NotImplementedError() # pragma: no cover + + def check_call( + self, + command, # type: str + verbose=False, # type: bool + timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] + error_info=None, # type: typing.Optional[str] + expected=None, # type: _type_expected + raise_on_err=True, # type: bool + **kwargs + ): # type: (...) -> exec_result.ExecResult + """Execute command and check for return code. + + Timeout limitation: read tick is 100 ms. + + :param command: Command for execution + :type command: str + :param verbose: Produce log.info records for command call and output + :type verbose: bool + :param timeout: Timeout for command execution. + :type timeout: typing.Optional[int] + :param error_info: Text for error details, if fail happens + :type error_info: typing.Optional[str] + :param expected: expected return codes (0 by default) + :type expected: typing.Optional[typing.Iterable[int]] + :param raise_on_err: Raise exception on unexpected return code + :type raise_on_err: bool + :rtype: ExecResult + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code + + .. versionchanged:: 1.2.0 default timeout 1 hour + """ + expected = proc_enums.exit_codes_to_enums(expected) + ret = self.execute(command, verbose, timeout, **kwargs) + if ret['exit_code'] not in expected: + message = ( + _log_templates.CMD_UNEXPECTED_EXIT_CODE.format( + append=error_info + '\n' if error_info else '', + result=ret, + expected=expected + )) + self.logger.error(message) + if raise_on_err: + raise exceptions.CalledProcessError( + result=ret, + expected=expected, + ) + return ret + + def check_stderr( + self, + command, # type: str + verbose=False, # type: bool + timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] + error_info=None, # type: typing.Optional[str] + raise_on_err=True, # type: bool + **kwargs + ): # type: (...) -> exec_result.ExecResult + """Execute command expecting return code 0 and empty STDERR. + + Timeout limitation: read tick is 100 ms. + + :param command: Command for execution + :type command: str + :param verbose: Produce log.info records for command call and output + :type verbose: bool + :param timeout: Timeout for command execution. + :type timeout: typing.Optional[int] + :param error_info: Text for error details, if fail happens + :type error_info: typing.Optional[str] + :param raise_on_err: Raise exception on unexpected return code + :type raise_on_err: bool + :rtype: ExecResult + :raises ExecHelperTimeoutError: Timeout exceeded + :raises CalledProcessError: Unexpected exit code or stderr presents + + .. versionchanged:: 1.2.0 default timeout 1 hour + """ + ret = self.check_call( + command, verbose, timeout=timeout, + error_info=error_info, raise_on_err=raise_on_err, **kwargs) + if ret['stderr']: + message = ( + _log_templates.CMD_UNEXPECTED_STDERR.format( + append=error_info + '\n' if error_info else '', + result=ret, + )) + self.logger.error(message) + if raise_on_err: + raise exceptions.CalledProcessError( + result=ret, + expected=kwargs.get('expected'), + ) + return ret diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 9a7b931..bbf28fa 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -21,6 +21,7 @@ from __future__ import unicode_literals import base64 +import collections # noinspection PyCompatibility import concurrent.futures import copy @@ -39,6 +40,7 @@ import threaded import six +from exec_helpers import _api from exec_helpers import constants from exec_helpers import exec_result from exec_helpers import exceptions @@ -101,6 +103,19 @@ class _MemorizedSSH(type): __cache = {} + @classmethod + def __prepare__( + mcs, + name, + bases, + **kwargs + ): # pylint: disable=unused-argument + """Metaclass magic for object storage. + + .. versionadded:: 1.2.0 + """ + return collections.OrderedDict() + def __call__( cls, host, # type: str @@ -196,30 +211,11 @@ def close_connections( mcs.__cache[key].close() -def _py2_str(src): # pragma: no cover - """Convert text to correct python type.""" - if not six.PY3 and isinstance(src, six.text_type): - return src.encode( - encoding='utf-8', - errors='strict', - ) - return src - - -BaseSSHClient = type.__new__( # noqa - _MemorizedSSH, - _py2_str('BaseSSHClient'), - (object, ), - {'__slots__': ()} -) - - -class SSHClientBase(BaseSSHClient): +class SSHClientBase(six.with_metaclass(_MemorizedSSH, _api.ExecHelper)): """SSH Client helper.""" __slots__ = ( '__hostname', '__port', '__auth', '__ssh', '__sftp', 'sudo_mode', - '__lock', '__logger' ) class __get_sudo(object): @@ -281,7 +277,11 @@ def __init__( .. note:: auth has priority over username/password/private_keys """ - self.__lock = threading.RLock() + super(SSHClientBase, self).__init__( + logger=logger.getChild( + '{host}:{port}'.format(host=host, port=port) + ), + ) self.__hostname = host self.__port = port @@ -301,25 +301,6 @@ def __init__( self.__auth = copy.copy(auth) self.__connect() - self.__logger = logger.getChild( - '{host}:{port}'.format(host=host, port=port) - ) - - @property - def lock(self): # type: () -> threading.RLock - """Connection lock. - - :rtype: threading.RLock - """ - return self.__lock - - @property - def logger(self): # type: () -> logging.Logger - """Internal logger. - - :rtype: logging.Logger - """ - return self.__logger @property def auth(self): # type: () -> ssh_auth.SSHAuth @@ -474,14 +455,6 @@ def __del__(self): ) self.__sftp = None - def __enter__(self): - """Get context manager. - - .. versionchanged:: 1.1.0 lock on enter - """ - self.lock.acquire() - return self - def __exit__(self, exc_type, exc_val, exc_tb): """Exit context manager. @@ -539,8 +512,15 @@ def execute_async( .. versionchanged:: 1.2.0 open_stdout and open_stderr flags """ - message = _log_templates.CMD_EXEC.format(cmd=command.rstrip()) - self.logger.debug(message) + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=kwargs.get('log_mask_re', None) + ) + + self.logger.log( + level=logging.INFO if kwargs.get('verbose') else logging.DEBUG, + msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) + ) chan = self._ssh.get_transport().open_session() @@ -578,7 +558,8 @@ def __exec_command( stdout, # type: paramiko.channel.ChannelFile stderr, # type: paramiko.channel.ChannelFile timeout, # type: int - verbose=False # type: bool + verbose=False, # type: bool + log_mask_re=None, # type: typing.Optional[str] ): # type: (...) -> exec_result.ExecResult """Get exit status from channel with timeout. @@ -588,8 +569,13 @@ def __exec_command( :type stderr: paramiko.channel.ChannelFile :type timeout: int :type verbose: bool + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded + + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ def poll_streams( result, # type: exec_result.ExecResult @@ -652,14 +638,16 @@ def poll_pipes( stop.set() # channel.status_event.wait(timeout) - result = exec_result.ExecResult(cmd=command) - stop_event = threading.Event() - message = _log_templates.CMD_EXEC.format(cmd=command.rstrip()) - self.logger.log( - level=logging.INFO if verbose else logging.DEBUG, - msg=message + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=log_mask_re ) + # Store command with hidden data + result = exec_result.ExecResult(cmd=cmd_for_log) + + stop_event = threading.Event() + # pylint: disable=assignment-from-no-return future = poll_pipes( stdout=stdout, @@ -714,11 +702,16 @@ def execute( _, stderr, # type: paramiko.channel.ChannelFile stdout, # type: paramiko.channel.ChannelFile - ) = self.execute_async(command, **kwargs) + ) = self.execute_async( + command, + verbose=verbose, + **kwargs + ) result = self.__exec_command( command, chan, stdout, stderr, timeout, - verbose=verbose + verbose=verbose, + log_mask_re=kwargs.get('log_mask_re', None), ) message = _log_templates.CMD_RESULT.format(result=result) self.logger.log( @@ -727,98 +720,6 @@ def execute( ) return result - def check_call( - self, - command, # type: str - verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] - error_info=None, # type: typing.Optional[str] - expected=None, # type: typing.Optional[typing.Iterable[]] - raise_on_err=True, # type: bool - **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command and check for return code. - - :param command: Command for execution - :type command: str - :param verbose: Produce log.info records for command call and output - :type verbose: bool - :param timeout: Timeout for command execution. - :type timeout: typing.Optional[int] - :param error_info: Text for error details, if fail happens - :type error_info: typing.Optional[str] - :param expected: expected return codes (0 by default) - :type expected: typing.Optional[typing.Iterable[int]] - :param raise_on_err: Raise exception on unexpected return code - :type raise_on_err: bool - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded - :raises CalledProcessError: Unexpected exit code - - .. versionchanged:: 1.2.0 default timeout 1 hour - """ - expected = proc_enums.exit_codes_to_enums(expected) - ret = self.execute(command, verbose, timeout, **kwargs) - if ret.exit_code not in expected: - message = ( - _log_templates.CMD_UNEXPECTED_EXIT_CODE.format( - append=error_info + '\n' if error_info else '', - result=ret, - expected=expected, - )) - self.logger.error(message) - if raise_on_err: - raise exceptions.CalledProcessError( - result=ret, - expected=expected - ) - return ret - - def check_stderr( - self, - command, # type: str - verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] - error_info=None, # type: typing.Optional[str] - raise_on_err=True, # type: bool - **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command expecting return code 0 and empty STDERR. - - :param command: Command for execution - :type command: str - :param verbose: Produce log.info records for command call and output - :type verbose: bool - :param timeout: Timeout for command execution. - :type timeout: typing.Optional[int] - :param error_info: Text for error details, if fail happens - :type error_info: typing.Optional[str] - :param raise_on_err: Raise exception on unexpected return code - :type raise_on_err: bool - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded - :raises CalledProcessError: Unexpected exit code or stderr presents - - .. note:: expected return codes can be overridden via kwargs. - .. versionchanged:: 1.2.0 default timeout 1 hour - """ - ret = self.check_call( - command, verbose, timeout=timeout, - error_info=error_info, raise_on_err=raise_on_err, **kwargs) - if ret.stderr: - message = ( - _log_templates.CMD_UNEXPECTED_STDERR.format( - append=error_info + '\n' if error_info else '', - result=ret, - )) - self.logger.error(message) - if raise_on_err: - raise exceptions.CalledProcessError( - result=ret, - expected=kwargs.get('expected'), - ) - return ret - def execute_through_host( self, hostname, # type: str @@ -850,7 +751,17 @@ def execute_through_host( :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=kwargs.get('log_mask_re', None) + ) + logger.log( + level=logging.INFO if verbose else logging.DEBUG, + msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) + ) + if auth is None: auth = self.auth @@ -881,7 +792,9 @@ def execute_through_host( # noinspection PyDictCreation result = self.__exec_command( - command, channel, stdout, stderr, timeout, verbose=verbose) + command, channel, stdout, stderr, timeout, verbose=verbose, + log_mask_re=kwargs.get('log_mask_re', None), + ) intermediate_channel.close() @@ -917,6 +830,7 @@ def execute_together( At lest one exception raised during execution (including timeout) .. versionchanged:: 1.2.0 default timeout 1 hour + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ @threaded.threadpooled def get_result( @@ -933,7 +847,14 @@ def get_result( chan.status_event.wait(timeout) exit_code = chan.recv_exit_status() - result = exec_result.ExecResult(cmd=command) + # pylint: disable=protected-access + cmd_for_log = remote._mask_command( + cmd=command, + log_mask_re=kwargs.get('log_mask_re', None) + ) + # pylint: enable=protected-access + + result = exec_result.ExecResult(cmd=cmd_for_log) result.read_stdout(src=stdout) result.read_stderr(src=stderr) result.exit_code = exit_code diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index 84095bd..b4b5123 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -20,6 +20,7 @@ from __future__ import division from __future__ import unicode_literals +import collections import logging import os import select @@ -32,6 +33,7 @@ import six import threaded +from exec_helpers import _api from exec_helpers import constants from exec_helpers import exec_result from exec_helpers import exceptions @@ -76,23 +78,18 @@ def __call__(cls, *args, **kwargs): ).__call__(*args, **kwargs) return cls._instances[cls] + @classmethod + def __prepare__( + mcs, + name, + bases, + **kwargs + ): # pylint: disable=unused-argument + """Metaclass magic for object storage. -def _py2_str(src): # pragma: no cover - """Convert text to correct python type.""" - if not six.PY3 and isinstance(src, six.text_type): - return src.encode( - encoding='utf-8', - errors='strict', - ) - return src - - -BaseSingleton = type.__new__( # noqa - SingletonMeta, - _py2_str('BaseSingleton'), - (object, ), - {'__slots__': ()} -) + .. versionadded:: 1.2.0 + """ + return collections.OrderedDict() def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None @@ -126,34 +123,32 @@ def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None ) -class Subprocess(BaseSingleton): +class Subprocess(six.with_metaclass(SingletonMeta, _api.ExecHelper)): """Subprocess helper with timeouts and lock-free FIFO.""" __slots__ = ( - '__lock', '__process', ) - def __init__(self): + def __init__( + self, + log_mask_re=None, # type: typing.Optional[str] + ): """Subprocess helper with timeouts and lock-free FIFO. For excluding race-conditions we allow to run 1 command simultaneously - """ - self.__lock = threading.RLock() - self.__process = None - @property - def lock(self): # type: () -> threading.RLock - """Lock. + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] - :rtype: threading.RLock + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ - return self.__lock - - def __enter__(self): - """Context manager usage.""" - self.lock.acquire() - return self + super(Subprocess, self).__init__( + logger=logger, + log_mask_re=log_mask_re + ) + self.__process = None def __exit__(self, exc_type, exc_val, exc_tb): """Context manager usage.""" @@ -173,6 +168,7 @@ def __exec_command( env=None, # type: typing.Optional[typing.Dict[str, typing.Any]] timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] verbose=False, # type: bool + log_mask_re=None, # type: typing.Optional[str] open_stdout=True, # type: bool open_stderr=True, # type: bool ): @@ -182,6 +178,11 @@ def __exec_command( :type cwd: str :type env: dict :type timeout: int + :param verbose: use INFO log level instead of DEBUG + :type verbose: str + :param log_mask_re: regex lookup rule to mask command for logger. + all MATCHED groups will be replaced by '<*masked*>' + :type log_mask_re: typing.Optional[str] :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param open_stderr: open STDERR stream for read @@ -190,6 +191,7 @@ def __exec_command( .. versionchanged:: 1.2.0 open_stdout and open_stderr flags .. versionchanged:: 1.2.0 default timeout 1 hour + .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd """ def poll_streams( result, # type: exec_result.ExecResult @@ -258,13 +260,20 @@ def poll_pipes( # 1 Command per run with self.lock: - result = exec_result.ExecResult(cmd=command) + cmd_for_log = self._mask_command( + cmd=command, + log_mask_re=log_mask_re + ) + + # Store command with hidden data + result = exec_result.ExecResult(cmd=cmd_for_log) stop_event = threading.Event() - message = _log_templates.CMD_EXEC.format(cmd=command.rstrip()) - if verbose: - logger.info(message) - else: - logger.debug(message) + + logger.log( + level=logging.INFO if verbose else logging.DEBUG, + msg=_log_templates.CMD_EXEC.format(cmd=cmd_for_log) + ) + # Run self.__process = subprocess.Popen( args=[command], @@ -327,8 +336,11 @@ def execute( Timeout limitation: read tick is 100 ms. + :param command: Command for execution :type command: str + :param verbose: Produce log.info records for command call and output :type verbose: bool + :param timeout: Timeout for command execution. :type timeout: typing.Optional[int] :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded @@ -344,87 +356,3 @@ def execute( ) return result - - def check_call( - self, - command, # type: str - verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] - error_info=None, # type: typing.Optional[str] - expected=None, # type: _type_expected - raise_on_err=True, # type: bool - **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command and check for return code. - - Timeout limitation: read tick is 100 ms. - - :type command: str - :type verbose: bool - :type timeout: typing.Optional[int] - :type error_info: typing.Optional[str] - :type expected: typing.Optional[typing.Iterable[_type_exit_codes]] - :type raise_on_err: bool - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded - :raises CalledProcessError: Unexpected exit code - - .. versionchanged:: 1.2.0 default timeout 1 hour - """ - expected = proc_enums.exit_codes_to_enums(expected) - ret = self.execute(command, verbose, timeout, **kwargs) - if ret['exit_code'] not in expected: - message = ( - _log_templates.CMD_UNEXPECTED_EXIT_CODE.format( - append=error_info + '\n' if error_info else '', - result=ret, - expected=expected - )) - logger.error(message) - if raise_on_err: - raise exceptions.CalledProcessError( - result=ret, - expected=expected, - ) - return ret - - def check_stderr( - self, - command, # type: str - verbose=False, # type: bool - timeout=constants.DEFAULT_TIMEOUT, # type: typing.Optional[int] - error_info=None, # type: typing.Optional[str] - raise_on_err=True, # type: bool - **kwargs - ): # type: (...) -> exec_result.ExecResult - """Execute command expecting return code 0 and empty STDERR. - - Timeout limitation: read tick is 100 ms. - - :type command: str - :type verbose: bool - :type timeout: typing.Optional[int] - :type error_info: typing.Optional[str] - :type raise_on_err: bool - :rtype: ExecResult - :raises ExecHelperTimeoutError: Timeout exceeded - :raises CalledProcessError: Unexpected exit code or stderr presents - - .. versionchanged:: 1.2.0 default timeout 1 hour - """ - ret = self.check_call( - command, verbose, timeout=timeout, - error_info=error_info, raise_on_err=raise_on_err, **kwargs) - if ret['stderr']: - message = ( - _log_templates.CMD_UNEXPECTED_STDERR.format( - append=error_info + '\n' if error_info else '', - result=ret, - )) - logger.error(message) - if raise_on_err: - raise exceptions.CalledProcessError( - result=ret, - expected=kwargs.get('expected'), - ) - return ret diff --git a/setup.py b/setup.py index 3550094..e145e15 100644 --- a/setup.py +++ b/setup.py @@ -63,6 +63,7 @@ def _extension(modpath): requires_optimization = [ + _extension('exec_helpers._api'), _extension('exec_helpers.constants'), _extension('exec_helpers._log_templates'), _extension('exec_helpers.exceptions'), diff --git a/test/test_ssh_client.py b/test/test_ssh_client.py index a724820..010b83d 100644 --- a/test/test_ssh_client.py +++ b/test/test_ssh_client.py @@ -118,7 +118,7 @@ def test_execute_async(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -153,7 +153,7 @@ def test_execute_async_pty(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -237,7 +237,7 @@ def test_execute_async_sudo(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -273,7 +273,7 @@ def test_execute_async_with_sudo_enforce(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -305,7 +305,7 @@ def test_execute_async_with_no_sudo_enforce(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -337,7 +337,7 @@ def test_execute_async_with_none_enforce(self, client, policy, logger): )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), log.mock_calls ) @@ -382,7 +382,72 @@ def test_execute_async_sudo_password( )) log = logger.getChild('{host}:{port}'.format(host=host, port=port)) self.assertIn( - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), + log.mock_calls + ) + + def test_execute_async_verbose(self, client, policy, logger): + chan = mock.Mock() + open_session = mock.Mock(return_value=chan) + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=command, verbose=True) + get_transport.assert_called_once() + open_session.assert_called_once() + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{}\n'.format(command)) + )) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)) + self.assertIn( + mock.call.log(level=logging.INFO, msg=command_log), + log.mock_calls + ) + + def test_execute_async_mask_command(self, client, policy, logger): + cmd = "USE='secret=secret_pass' do task" + log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" + masked_cmd = "USE='secret=<*masked*>' do task" + cmd_log = u"Executing command:\n{!s}\n".format(masked_cmd) + + chan = mock.Mock() + open_session = mock.Mock(return_value=chan) + transport = mock.Mock() + transport.attach_mock(open_session, 'open_session') + get_transport = mock.Mock(return_value=transport) + _ssh = mock.Mock() + _ssh.attach_mock(get_transport, 'get_transport') + client.return_value = _ssh + + ssh = self.get_ssh() + + # noinspection PyTypeChecker + result = ssh.execute_async(command=cmd, log_mask_re=log_mask_re) + get_transport.assert_called_once() + open_session.assert_called_once() + + self.assertIn(chan, result) + chan.assert_has_calls(( + mock.call.makefile('wb'), + mock.call.makefile('rb'), + mock.call.makefile_stderr('rb'), + mock.call.exec_command('{}\n'.format(cmd)) + )) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)) + self.assertIn( + mock.call.log(level=logging.DEBUG, msg=cmd_log), log.mock_calls ) @@ -392,6 +457,7 @@ def get_patched_execute_async_retval( stderr_val=None, open_stdout=True, open_stderr=True, + cmd_log=None ): """get patched execute_async retval @@ -425,7 +491,7 @@ def get_patched_execute_async_retval( # noinspection PyTypeChecker exp_result = exec_result.ExecResult( - cmd=command, + cmd=cmd_log if cmd_log is not None else command, stderr=err, stdout=out, exit_code=ec @@ -458,14 +524,12 @@ def test_execute( result, exp_result ) - execute_async.assert_called_once_with(command) + execute_async.assert_called_once_with(command, verbose=False) chan.assert_has_calls((mock.call.status_event.is_set(), )) message = self.gen_cmd_result_log_message(result) log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log log.assert_has_calls( [ - mock.call(level=logging.DEBUG, msg=command_log), - ] + [ mock.call( level=logging.DEBUG, msg=str(x.rstrip().decode('utf-8')) @@ -506,15 +570,13 @@ def test_execute_verbose( result, exp_result ) - execute_async.assert_called_once_with(command) + execute_async.assert_called_once_with(command, verbose=True) chan.assert_has_calls((mock.call.status_event.is_set(), )) message = self.gen_cmd_result_log_message(result) log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log log.assert_has_calls( [ - mock.call(level=logging.INFO, msg=command_log), - ] + [ mock.call( level=logging.INFO, msg=str(x.rstrip().decode('utf-8')) ) @@ -557,13 +619,12 @@ def test_execute_no_stdout( result, exp_result ) - execute_async.assert_called_once_with(command, open_stdout=False) + execute_async.assert_called_once_with( + command, verbose=False, open_stdout=False) message = self.gen_cmd_result_log_message(result) log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log log.assert_has_calls( [ - mock.call(level=logging.DEBUG, msg=command_log), - ] + [ mock.call( level=logging.DEBUG, msg=str(x.rstrip().decode('utf-8')) @@ -602,13 +663,12 @@ def test_execute_no_stderr( result, exp_result ) - execute_async.assert_called_once_with(command, open_stderr=False) + execute_async.assert_called_once_with( + command, verbose=False, open_stderr=False) message = self.gen_cmd_result_log_message(result) log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log log.assert_has_calls( [ - mock.call(level=logging.DEBUG, msg=command_log), - ] + [ mock.call( level=logging.DEBUG, msg=str(x.rstrip().decode('utf-8')) @@ -653,6 +713,7 @@ def test_execute_no_stdout_stderr( ) execute_async.assert_called_once_with( command, + verbose=False, open_stdout=False, open_stderr=False ) @@ -660,8 +721,6 @@ def test_execute_no_stdout_stderr( log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log log.assert_has_calls( [ - mock.call(level=logging.DEBUG, msg=command_log), - ] + [ mock.call(level=logging.DEBUG, msg=message), ] ) @@ -691,7 +750,7 @@ def test_execute_timeout( result, exp_result ) - execute_async.assert_called_once_with(command) + execute_async.assert_called_once_with(command, verbose=False) chan.assert_has_calls((mock.call.status_event.is_set(), )) message = self.gen_cmd_result_log_message(result) log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log @@ -723,9 +782,62 @@ def test_execute_timeout_fail( # noinspection PyTypeChecker ssh.execute(command=command, verbose=False, timeout=1) - execute_async.assert_called_once_with(command) + execute_async.assert_called_once_with(command, verbose=False) chan.assert_has_calls((mock.call.status_event.is_set(), )) + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') + def test_execute_mask_command( + self, + execute_async, + client, policy, logger + ): + cmd = "USE='secret=secret_pass' do task" + log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" + masked_cmd = "USE='secret=<*masked*>' do task" + + ( + chan, _stdin, exp_result, stderr, stdout + ) = self.get_patched_execute_async_retval(cmd_log=masked_cmd) + is_set = mock.Mock(return_value=True) + chan.status_event.attach_mock(is_set, 'is_set') + + execute_async.return_value = chan, _stdin, stderr, stdout + + ssh = self.get_ssh() + + logger.reset_mock() + + # noinspection PyTypeChecker + result = ssh.execute( + command=cmd, verbose=False, log_mask_re=log_mask_re) + + self.assertEqual( + result, + exp_result + ) + execute_async.assert_called_once_with( + cmd, log_mask_re=log_mask_re, verbose=False) + chan.assert_has_calls((mock.call.status_event.is_set(),)) + message = self.gen_cmd_result_log_message(result) + log = logger.getChild('{host}:{port}'.format(host=host, port=port)).log + log.assert_has_calls( + [ + mock.call( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stdout_list + ] + [ + mock.call( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stderr_list + ] + [ + mock.call(level=logging.DEBUG, msg=message), + ] + ) + @mock.patch('exec_helpers.ssh_client.SSHClient.execute_async') def test_execute_together(self, execute_async, client, policy, logger): ( diff --git a/test/test_subprocess_runner.py b/test/test_subprocess_runner.py index a642d76..11c3d85 100644 --- a/test/test_subprocess_runner.py +++ b/test/test_subprocess_runner.py @@ -60,10 +60,12 @@ class TestSubprocessRunner(unittest.TestCase): @staticmethod def prepare_close( popen, + cmd=command, stderr_val=None, ec=0, open_stdout=True, open_stderr=True, + cmd_in_result=None, ): if open_stdout: stdout_lines = stdout_list @@ -91,7 +93,7 @@ def prepare_close( # noinspection PyTypeChecker exp_result = exec_helpers.ExecResult( - cmd=command, + cmd=cmd_in_result if cmd_in_result is not None else cmd, stderr=stderr_lines, stdout=stdout_lines, exit_code=ec @@ -130,7 +132,7 @@ def test_call(self, popen, _, select, logger): )) logger.assert_has_calls( [ - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), ] + [ mock.call.log( level=logging.DEBUG, @@ -162,7 +164,7 @@ def test_call_verbose(self, popen, _, select, logger): logger.assert_has_calls( [ - mock.call.info(command_log), + mock.call.log(level=logging.INFO, msg=command_log), ] + [ mock.call.log( level=logging.INFO, @@ -254,7 +256,7 @@ def test_execute_no_stdout(self, popen, _, select, logger): )) logger.assert_has_calls( [ - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), ] + [ mock.call.log( level=logging.DEBUG, @@ -292,7 +294,7 @@ def test_execute_no_stderr(self, popen, _, select, logger): )) logger.assert_has_calls( [ - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), ] + [ mock.call.log( level=logging.DEBUG, @@ -335,7 +337,121 @@ def test_execute_no_stdout_stderr(self, popen, _, select, logger): )) logger.assert_has_calls( [ - mock.call.debug(command_log), + mock.call.log(level=logging.DEBUG, msg=command_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=self.gen_cmd_result_log_message(result)), + ]) + self.assertIn( + mock.call.poll(), popen_obj.mock_calls + ) + + def test_execute_mask_global(self, popen, _, select, logger): + cmd = "USE='secret=secret_pass' do task" + log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" + masked_cmd = "USE='secret=<*masked*>' do task" + cmd_log = u"Executing command:\n{!s}\n".format(masked_cmd) + + popen_obj, exp_result = self.prepare_close( + popen, + cmd=cmd, + cmd_in_result=masked_cmd + ) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess( + log_mask_re=log_mask_re + ) + + # noinspection PyTypeChecker + result = runner.execute(cmd) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[cmd], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + logger.assert_has_calls( + [ + mock.call.log(level=logging.DEBUG, msg=cmd_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stdout_list + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8'))) + for x in stderr_list + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=self.gen_cmd_result_log_message(result)), + ]) + self.assertIn( + mock.call.poll(), popen_obj.mock_calls + ) + + def test_execute_mask_local(self, popen, _, select, logger): + cmd = "USE='secret=secret_pass' do task" + log_mask_re = r"secret\s*=\s*([A-Z-a-z0-9_\-]+)" + masked_cmd = "USE='secret=<*masked*>' do task" + cmd_log = u"Executing command:\n{!s}\n".format(masked_cmd) + + popen_obj, exp_result = self.prepare_close( + popen, + cmd=cmd, + cmd_in_result=masked_cmd + ) + select.return_value = [popen_obj.stdout, popen_obj.stderr], [], [] + + runner = exec_helpers.Subprocess() + + # noinspection PyTypeChecker + result = runner.execute(cmd, log_mask_re=log_mask_re) + self.assertEqual( + result, exp_result + + ) + popen.assert_has_calls(( + mock.call( + args=[cmd], + cwd=None, + env=None, + shell=True, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=False, + ), + )) + logger.assert_has_calls( + [ + mock.call.log(level=logging.DEBUG, msg=cmd_log), + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8')) + ) + for x in stdout_list + ] + [ + mock.call.log( + level=logging.DEBUG, + msg=str(x.rstrip().decode('utf-8'))) + for x in stderr_list ] + [ mock.call.log( level=logging.DEBUG, From 14e86848b21b9486790a67b1c8fee6b4b42f26ee Mon Sep 17 00:00:00 2001 From: Alexey Stepanov Date: Thu, 12 Apr 2018 15:15:27 +0200 Subject: [PATCH 3/3] Move lock release to api --- exec_helpers/_api.py | 4 ++-- exec_helpers/_ssh_client_base.py | 2 +- exec_helpers/subprocess_runner.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/exec_helpers/_api.py b/exec_helpers/_api.py index e5a1f1d..35aaf48 100644 --- a/exec_helpers/_api.py +++ b/exec_helpers/_api.py @@ -78,9 +78,9 @@ def __enter__(self): self.lock.acquire() return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb): # pragma: no cover """Context manager usage.""" - raise NotImplementedError() # pragma: no cover + self.lock.release() def _mask_command( self, diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index bbf28fa..8015cba 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -462,7 +462,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): .. versionchanged:: 1.1.0 release lock on exit """ self.close() - self.lock.release() + super(SSHClientBase, self).__exit__(exc_type, exc_val, exc_tb) def reconnect(self): # type: () -> None """Reconnect SSH session.""" diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index b4b5123..4730fab 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -154,7 +154,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Context manager usage.""" if self.__process: self.__process.kill() - self.lock.release() + super(Subprocess, self).__exit__(exc_type, exc_val, exc_tb) def __del__(self): """Destructor. Kill running subprocess, if it running."""