From 78eaf38c26abc40dc3798e3c2340dbd31cdd71a5 Mon Sep 17 00:00:00 2001 From: Aleksei Stepanov Date: Mon, 18 Feb 2019 14:39:42 +0100 Subject: [PATCH] Backport from master Signed-off-by: Aleksei Stepanov --- exec_helpers/_ssh_client_base.py | 117 ++++++++++-------- exec_helpers/api.py | 105 +++++++++++++++- exec_helpers/exceptions.py | 2 +- exec_helpers/subprocess_runner.py | 13 +- requirements.txt | 6 +- test/test_ssh_client_execute_async_special.py | 50 ++++++++ tox.ini | 10 +- 7 files changed, 238 insertions(+), 65 deletions(-) diff --git a/exec_helpers/_ssh_client_base.py b/exec_helpers/_ssh_client_base.py index 3a675ff..25dba0d 100644 --- a/exec_helpers/_ssh_client_base.py +++ b/exec_helpers/_ssh_client_base.py @@ -120,6 +120,7 @@ def __call__( # type: ignore private_keys=None, # type: typing.Optional[typing.Iterable[paramiko.RSAKey]] auth=None, # type: typing.Optional[ssh_auth.SSHAuth] verbose=True, # type: bool + chroot_path=None, # type: typing.Optional[typing.Union[str, typing.Text]] ): # type: (...) -> SSHClientBase """Main memorize method: check for cached instance and return it. @@ -137,10 +138,12 @@ def __call__( # type: ignore :type auth: typing.Optional[ssh_auth.SSHAuth] :param verbose: show additional error/warning messages :type verbose: bool + :param chroot_path: chroot path (use chroot if set) + :type chroot_path: typing.Optional[typing.Union[str, typing.Text]] :return: SSH client instance :rtype: SSHClientBase """ - if (host, port) in cls.__cache: + if (host, port) in cls.__cache and not chroot_path: # chrooted connections are not memorized key = host, port if auth is None: auth = ssh_auth.SSHAuth(username=username, password=password, keys=private_keys) @@ -168,8 +171,10 @@ def __call__( # type: ignore private_keys=private_keys, auth=auth, verbose=verbose, + chroot_path=chroot_path, ) - cls.__cache[(ssh.hostname, ssh.port)] = ssh + if not chroot_path: + cls.__cache[(ssh.hostname, ssh.port)] = ssh return ssh @classmethod @@ -179,8 +184,7 @@ def clear_cache(mcs): # type: (typing.Type[_MemorizedSSH]) -> None getrefcount is used to check for usage, so connections closed on CPYTHON only. """ n_count = 4 - # PY3: cache, ssh, temporary - # PY4: cache, values mapping, ssh, temporary + # PY2: cache, values mapping, ssh, temporary for ssh in mcs.__cache.values(): if CPYTHON and sys.getrefcount(ssh) == n_count: # pragma: no cover ssh.logger.debug("Closing as unused") @@ -195,63 +199,65 @@ def close_connections(mcs): # type: (typing.Type[_MemorizedSSH]) -> None ssh.close() # type: ignore -class SSHClientBase(six.with_metaclass(_MemorizedSSH, api.ExecHelper)): - """SSH Client helper.""" +class _SudoContext(object): + """Context manager for call commands with sudo.""" - __slots__ = ("__hostname", "__port", "__auth", "__ssh", "__sftp", "__sudo_mode", "__keepalive_mode", "__verbose") + __slots__ = ("__ssh", "__sudo_status", "__enforce") + + def __init__(self, ssh, enforce=None): # type: (SSHClientBase, typing.Optional[bool]) -> None + """Context manager for call commands with sudo. + + :param ssh: connection instance + :type ssh: SSHClientBase + :param enforce: sudo mode for context manager + :type enforce: typing.Optional[bool] + """ + self.__ssh = ssh + self.__sudo_status = ssh.sudo_mode + self.__enforce = enforce - class __get_sudo(object): - """Context manager for call commands with sudo.""" + def __enter__(self): # type: () -> None + self.__sudo_status = self.__ssh.sudo_mode + if self.__enforce is not None: + self.__ssh.sudo_mode = self.__enforce - __slots__ = ("__ssh", "__sudo_status", "__enforce") + def __exit__(self, exc_type, exc_val, exc_tb): # type: (typing.Any, typing.Any, typing.Any) -> None + self.__ssh.sudo_mode = self.__sudo_status - def __init__(self, ssh, enforce=None): # type: (SSHClientBase, typing.Optional[bool]) -> None - """Context manager for call commands with sudo. - :param ssh: connection instance - :type ssh: SSHClientBase - :param enforce: sudo mode for context manager - :type enforce: typing.Optional[bool] - """ - self.__ssh = ssh - self.__sudo_status = ssh.sudo_mode - self.__enforce = enforce +class _KeepAliveContext(object): + """Context manager for keepalive management.""" - def __enter__(self): # type: () -> None - self.__sudo_status = self.__ssh.sudo_mode - if self.__enforce is not None: - self.__ssh.sudo_mode = self.__enforce + __slots__ = ("__ssh", "__keepalive_status", "__enforce") - def __exit__(self, exc_type, exc_val, exc_tb): # type: (typing.Any, typing.Any, typing.Any) -> None - self.__ssh.sudo_mode = self.__sudo_status + def __init__(self, ssh, enforce=True): # type: (SSHClientBase, bool) -> None + """Context manager for keepalive management. - class __get_keepalive(object): - """Context manager for keepalive management.""" + :param ssh: connection instance + :type ssh: SSHClientBase + :param enforce: keepalive mode for context manager + :type enforce: bool + :param enforce: Keep connection alive after context manager exit + """ + self.__ssh = ssh + self.__keepalive_status = ssh.keepalive_mode + self.__enforce = enforce - __slots__ = ("__ssh", "__keepalive_status", "__enforce") + def __enter__(self): # type: () -> None + self.__keepalive_status = self.__ssh.keepalive_mode + if self.__enforce is not None: + self.__ssh.keepalive_mode = self.__enforce + self.__ssh.__enter__() - def __init__(self, ssh, enforce=True): # type: (SSHClientBase, bool) -> None - """Context manager for keepalive management. + def __exit__(self, exc_type, exc_val, exc_tb): # type: (typing.Any, typing.Any, typing.Any) -> None + self.__ssh.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) # type: ignore + self.__ssh.keepalive_mode = self.__keepalive_status - :param ssh: connection instance - :type ssh: SSHClientBase - :param enforce: keepalive mode for context manager - :type enforce: bool - :param enforce: Keep connection alive after context manager exit - """ - self.__ssh = ssh - self.__keepalive_status = ssh.keepalive_mode - self.__enforce = enforce - def __enter__(self): # type: () -> None - self.__keepalive_status = self.__ssh.keepalive_mode - if self.__enforce is not None: - self.__ssh.keepalive_mode = self.__enforce - self.__ssh.__enter__() +class SSHClientBase(six.with_metaclass(_MemorizedSSH, api.ExecHelper)): + """SSH Client helper.""" - def __exit__(self, exc_type, exc_val, exc_tb): # type: (typing.Any, typing.Any, typing.Any) -> None - self.__ssh.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) # type: ignore - self.__ssh.keepalive_mode = self.__keepalive_status + __slots__ = ("__hostname", "__port", "__auth", "__ssh", "__sftp", "__sudo_mode", "__keepalive_mode", "__verbose") def __hash__(self): # type: () -> int """Hash for usage as dict keys.""" @@ -266,6 +272,7 @@ def __init__( private_keys=None, # type: typing.Optional[typing.Iterable[paramiko.RSAKey]] auth=None, # type: typing.Optional[ssh_auth.SSHAuth] verbose=True, # type: bool + chroot_path=None, # type: typing.Optional[typing.Union[str, typing.Text]] ): # type: (...) -> None """Main SSH Client helper. @@ -283,11 +290,14 @@ def __init__( :type auth: typing.Optional[ssh_auth.SSHAuth] :param verbose: show additional error/warning messages :type verbose: bool + :param chroot_path: chroot path (use chroot if set) + :type chroot_path: typing.Optional[typing.Union[str, typing.Text]] .. note:: auth has priority over username/password/private_keys """ super(SSHClientBase, self).__init__( - logger=logging.getLogger(self.__class__.__name__).getChild("{host}:{port}".format(host=host, port=port)) + logger=logging.getLogger(self.__class__.__name__).getChild("{host}:{port}".format(host=host, port=port)), + chroot_path=chroot_path ) self.__hostname = host @@ -357,7 +367,7 @@ def __unicode__(self): # type: () -> typing.Text # pragma: no cover cls=self.__class__.__name__, self=self, username=self.auth.username ) - def __str__(self): # type: () -> str # pragma: no cover + def __str__(self): # type: () -> bytes # pragma: no cover """Representation for debug purposes.""" return self.__unicode__().encode("utf-8") @@ -510,7 +520,7 @@ def sudo(self, enforce=None): # type: (typing.Optional[bool]) -> typing.Context :return: context manager with selected sudo state inside :rtype: typing.ContextManager """ - return self.__get_sudo(ssh=self, enforce=enforce) + return _SudoContext(ssh=self, enforce=enforce) def keepalive(self, enforce=True): # type: (bool) -> typing.ContextManager[None] """Call contextmanager with keepalive mode change. @@ -523,7 +533,7 @@ def keepalive(self, enforce=True): # type: (bool) -> typing.ContextManager[None .. Note:: Enter and exit ssh context manager is produced as well. .. versionadded:: 1.2.1 """ - return self.__get_keepalive(ssh=self, enforce=enforce) + return _KeepAliveContext(ssh=self, enforce=enforce) # noinspection PyMethodOverriding def execute_async( @@ -569,6 +579,7 @@ def execute_async( .. versionchanged:: 1.2.0 stdin data .. versionchanged:: 1.2.0 get_pty moved to `**kwargs` .. versionchanged:: 1.4.0 Use typed NamedTuple as result + .. versionchanged:: 1.12.0 support chroot """ cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re) @@ -592,7 +603,7 @@ def execute_async( stdout = chan.makefile("rb") # type: paramiko.ChannelFile stderr = chan.makefile_stderr("rb") if open_stderr else None - cmd = "{command}\n".format(command=command) + cmd = "{cmd}\n".format(cmd=self._prepare_command(cmd=command, chroot_path=kwargs.get("chroot_path", None))) started = datetime.datetime.utcnow() if self.sudo_mode: encoded_cmd = base64.b64encode(cmd.encode("utf-8")).decode("utf-8") diff --git a/exec_helpers/api.py b/exec_helpers/api.py index 892d569..c28aafa 100644 --- a/exec_helpers/api.py +++ b/exec_helpers/api.py @@ -50,32 +50,115 @@ ) +class _ChRootContext(object): + """Context manager for call commands with chroot. + + .. versionadded:: 1.12.0 + """ + + __slots__ = ("__conn", "__chroot_status", "__path") + + def __init__( + self, + conn, # type: ExecHelper + path=None, # type: typing.Optional[typing.Union[str, typing.Text]] + ): # type: (...) -> None + """Context manager for call commands with sudo. + + :param conn: connection instance + :type conn: ExecHelper + :param path: chroot path or None for no chroot + :type path: typing.Optional[str] + """ + self.__conn = conn # type: ExecHelper + self.__chroot_status = conn.chroot_path # type: typing.Optional[typing.Union[str, typing.Text]] + self.__path = path # type: typing.Optional[typing.Union[str, typing.Text]] + + def __enter__(self): # type: () -> None + self.__conn.__enter__() + self.__chroot_status = self.__conn.chroot_path + self.__conn.chroot_path = self.__path + + def __exit__(self, exc_type, exc_val, exc_tb): # type: (typing.Any, typing.Any, typing.Any) -> None + self.__conn.chroot_path = self.__chroot_status + self.__conn.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) # type: ignore + + class ExecHelper(six.with_metaclass(abc.ABCMeta, object)): """ExecHelper global API.""" - __slots__ = ("__lock", "__logger", "log_mask_re") + __slots__ = ("__lock", "__logger", "log_mask_re", "__chroot_path") - def __init__(self, logger, log_mask_re=None): # type: (logging.Logger, typing.Optional[typing.Text]) -> None + def __init__( + self, + logger, # type: logging.Logger + log_mask_re=None, # type: typing.Optional[typing.Text] + chroot_path=None # type: typing.Optional[typing.Union[str, typing.Text]] + ): # type: (...) -> None """Global ExecHelper API. :param logger: logger instance to use :type logger: logging.Logger :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' + :param chroot_path: chroot path (use chroot if set) + :type chroot_path: typing.Optional[str] :type log_mask_re: typing.Optional[str] .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd .. versionchanged:: 1.3.5 make API public to use as interface + .. versionchanged:: 1.12.0 support chroot """ self.__lock = threading.RLock() self.__logger = logger - self.log_mask_re = log_mask_re + self.log_mask_re = log_mask_re # type: typing.Optional[typing.Text] + self.__chroot_path = chroot_path # type: typing.Optional[typing.Union[str, typing.Text]] @property def logger(self): # type: () -> logging.Logger """Instance logger access.""" return self.__logger + @property + def chroot_path(self): # type: () -> typing.Optional[typing.Union[str, typing.Text]] + """Path for chroot if set. + + :rtype: typing.Optional[typing.Text] + .. versionadded:: 1.12.0 + """ + return self.__chroot_path + + @chroot_path.setter + def chroot_path(self, new_state): # type: (typing.Optional[typing.Union[str, typing.Text]]) -> None + """Path for chroot if set. + + :param new_state: new path + :type new_state: typing.Optional[typing.Text] + .. versionadded:: 1.12.0 + """ + self.__chroot_path = new_state + + @chroot_path.deleter + def chroot_path(self): # type: () -> None + """Remove Path for chroot. + + .. versionadded:: 1.12.0 + """ + self.__chroot_path = None + + def chroot(self, path): # type: (typing.Union[str, typing.Text, None]) -> typing.ContextManager[None] + """Context manager for changing chroot rules. + + :param path: chroot path or none for working without chroot. + :type path: typing.Optional[typing.Text] + :return: context manager with selected chroot state inside + :rtype: typing.ContextManager + + .. Note:: Enter and exit main context manager is produced as well. + .. versionadded:: 1.12.0 + """ + return _ChRootContext(conn=self, path=path) + @property def lock(self): # type: () -> threading.RLock """Lock. @@ -142,6 +225,19 @@ def mask(text, rules): # type: (str, typing.Text) -> str return result + def _prepare_command( + self, + cmd, # type: typing.Union[str, typing.Text] + chroot_path=None # type: typing.Optional[typing.Union[str, typing.Text]] + ): # type: (...) -> typing.Text + """Prepare command: cower chroot and other cases.""" + if any((chroot_path, self.chroot_path)): + return "chroot {chroot_path} {cmd}".format( + chroot_path=chroot_path if chroot_path else self.chroot_path, + cmd=cmd + ) + return cmd + @abc.abstractmethod def execute_async( self, @@ -185,6 +281,7 @@ def execute_async( .. versionchanged:: 1.2.0 open_stdout and open_stderr flags .. versionchanged:: 1.2.0 stdin data .. versionchanged:: 1.4.0 Use typed NamedTuple as result + .. versionchanged:: 1.12.0 support chroot """ raise NotImplementedError # pragma: no cover @@ -259,7 +356,7 @@ def __call__( command, # type: typing.Union[str, typing.Text] verbose=False, # type: bool timeout=constants.DEFAULT_TIMEOUT, # type: typing.Union[int, float, None] - **kwargs + **kwargs # type: typing.Any ): # type: (...) -> exec_result.ExecResult """Execute command and wait for return code. diff --git a/exec_helpers/exceptions.py b/exec_helpers/exceptions.py index e50a11d..2a0cfed 100644 --- a/exec_helpers/exceptions.py +++ b/exec_helpers/exceptions.py @@ -246,7 +246,7 @@ def __init__( exceptions, # type: typing.Dict[typing.Tuple[typing.Union[str, typing.Text], int], Exception] errors, # type: typing.Dict[typing.Tuple[typing.Union[str, typing.Text], int], exec_result.ExecResult] results, # type: typing.Dict[typing.Tuple[typing.Union[str, typing.Text], int], exec_result.ExecResult] - expected=(proc_enums.EXPECTED,), # type: typing.List[typing.Union[int, proc_enums.ExitCodes]] + expected=(proc_enums.EXPECTED,), # type: typing.Tuple[typing.Union[int, proc_enums.ExitCodes], ...] **kwargs # type: typing.Any ): # type: (...) -> None """Exception during parallel execution. diff --git a/exec_helpers/subprocess_runner.py b/exec_helpers/subprocess_runner.py index d87e7ac..4943433 100644 --- a/exec_helpers/subprocess_runner.py +++ b/exec_helpers/subprocess_runner.py @@ -57,7 +57,11 @@ def interface(self): # type: () -> subprocess.Popen class Subprocess(six.with_metaclass(metaclasses.SingleLock, api.ExecHelper)): """Subprocess helper with timeouts and lock-free FIFO.""" - def __init__(self, log_mask_re=None): # type: (typing.Optional[typing.Text]) -> None + def __init__( + self, + log_mask_re=None, # type: typing.Optional[typing.Text] + chroot_path=None, # type: typing.Optional[typing.Union[str, typing.Text]] + ): # type: (...) -> None """Subprocess helper with timeouts and lock-free FIFO. For excluding race-conditions we allow to run 1 command simultaneously @@ -65,6 +69,8 @@ def __init__(self, log_mask_re=None): # type: (typing.Optional[typing.Text]) -> :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: typing.Optional[str] + :param chroot_path: chroot path (use chroot if set) + :type chroot_path: typing.Optional[str] .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd .. versionchanged:: 1.9.0 Not singleton anymore. Only lock is shared between all instances. @@ -164,7 +170,7 @@ def close_streams(): # type: () -> None wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout) self.logger.debug(wait_err_msg) - raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout) # type: ignore + raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout) # noinspection PyMethodOverriding def execute_async( @@ -209,6 +215,7 @@ def execute_async( .. versionadded:: 1.2.0 .. versionchanged:: 1.4.0 Use typed NamedTuple as result + .. versionchanged:: 1.12.0 support chroot """ cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re) @@ -219,7 +226,7 @@ def execute_async( started = datetime.datetime.utcnow() process = subprocess.Popen( - args=[command], + args=[self._prepare_command(cmd=command, chroot_path=kwargs.get("chroot_path", None))], stdout=subprocess.PIPE if open_stdout else devnull, stderr=subprocess.PIPE if open_stderr else devnull, stdin=subprocess.PIPE, diff --git a/requirements.txt b/requirements.txt index a84ec07..edccdb5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ paramiko>=2.4 # LGPLv2.1+ tenacity>=4.4.0 # Apache-2.0 six>=1.10.0 # MIT -threaded>=1.0,<2.0 # Apache-2.0 +threaded>=1.0 # Apache-2.0 PyYAML>=3.12 # MIT -advanced-descriptors>=1.0,<2.0 # Apache-2.0 -typing >= 3.6 # PSF +advanced-descriptors>=1.0 # Apache-2.0 +typing >= 3.6; python_version < "3.7" # PSF futures>=3.1; python_version == "2.7" enum34>=1.1; python_version == "2.7" psutil >= 5.0 diff --git a/test/test_ssh_client_execute_async_special.py b/test/test_ssh_client_execute_async_special.py index 68a265f..1f8884f 100644 --- a/test/test_ssh_client_execute_async_special.py +++ b/test/test_ssh_client_execute_async_special.py @@ -258,3 +258,53 @@ def test_010_check_stdin_closed(paramiko_ssh_client, chan_makefile, auto_add_pol log = get_logger(ssh.__class__.__name__).getChild("{host}:{port}".format(host=host, port=port)) log.warning.assert_called_once_with("STDIN Send failed: closed channel") + + +def test_011_execute_async_chroot(ssh, ssh_transport_channel): + """Global chroot path.""" + ssh.chroot_path = "/" + + ssh.execute_async(command) + ssh_transport_channel.assert_has_calls( + ( + mock.call.makefile_stderr("rb"), + mock.call.exec_command('chroot {ssh.chroot_path} {command}\n'.format(ssh=ssh, command=command)), + ) + ) + + +def test_012_execute_async_chroot_cmd(ssh, ssh_transport_channel): + """Command-only chroot path.""" + ssh.execute_async(command, chroot_path='/') + ssh_transport_channel.assert_has_calls( + ( + mock.call.makefile_stderr("rb"), + mock.call.exec_command('chroot / {command}\n'.format(command=command)), + ) + ) + + +def test_013_execute_async_chroot_context(ssh, ssh_transport_channel): + """Context-managed chroot path.""" + with ssh.chroot('/'): + ssh.execute_async(command) + ssh_transport_channel.assert_has_calls( + ( + mock.call.makefile_stderr("rb"), + mock.call.exec_command('chroot / {command}\n'.format(command=command)), + ) + ) + + +def test_014_execute_async_no_chroot_context(ssh, ssh_transport_channel): + """Context-managed chroot path override.""" + ssh.chroot_path = "/" + + with ssh.chroot(None): + ssh.execute_async(command) + ssh_transport_channel.assert_has_calls( + ( + mock.call.makefile_stderr("rb"), + mock.call.exec_command('{command}\n'.format(command=command)), + ) + ) diff --git a/tox.ini b/tox.ini index f241a83..72d01be 100644 --- a/tox.ini +++ b/tox.ini @@ -104,6 +104,14 @@ ignore = FI17 # line break before binary operator W503 + # First line should be in imperative mood; try rephrasing + D401 + # No blank lines allowed after function docstring + D202 + # 1 blank line required before class docstring + D203 + # Multi-line docstring summary should start at the second line + D213 show-pep8 = True show-source = True count = True @@ -140,7 +148,7 @@ commands = pipdeptree [testenv:mypy] usedevelop = False deps = - mypy>=0.660 + mypy>=0.670 lxml -r{toxinidir}/CI_REQUIREMENTS.txt commands = mypy --strict --xslt-html-report mypy_report -p exec_helpers