Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/source/SSHClient.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ API: SSHClient and SSHAuth.
``bool``
Use sudo for all calls, except wrapped in connection.sudo context manager.

.. py:attribute:: keepalive_mode

``bool``
Use keepalive mode for context manager. If `False` - close connection on exit from context manager.

.. py:method:: close()

Close connection
Expand All @@ -93,6 +98,7 @@ API: SSHClient and SSHAuth.

.. versionchanged:: 1.0.0 disconnect enforced on close
.. versionchanged:: 1.1.0 release lock on exit
.. versionchanged:: 1.2.1 disconnect enforced on close only not in keepalive mode

.. py:method:: sudo(enforce=None)

Expand All @@ -101,6 +107,16 @@ API: SSHClient and SSHAuth.
:param enforce: Enforce sudo enabled or disabled. By default: None
:type enforce: ``typing.Optional[bool]``

.. py:method:: keepalive(enforce=None)

Context manager getter for keepalive operation.

:param enforce: Enforce keepalive enabled or disabled. By default: True
:type enforce: ``typing.bool``

.. Note:: Enter and exit ssh context manager is produced as well.
.. versionadded:: 1.2.1

.. py:method:: execute_async(command, stdin=None, open_stdout=True, open_stderr=True, verbose=False, log_mask_re=None, **kwargs)

Execute command in async mode and return channel with IO objects.
Expand Down
100 changes: 96 additions & 4 deletions exec_helpers/_ssh_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,19 @@ class SSHClientBase(six.with_metaclass(_MemorizedSSH, _api.ExecHelper)):
"""SSH Client helper."""

__slots__ = (
'__hostname', '__port', '__auth', '__ssh', '__sftp', 'sudo_mode',
'__hostname', '__port', '__auth', '__ssh', '__sftp',
'__sudo_mode', '__keepalive_mode',
)

class __get_sudo(object):
"""Context manager for call commands with sudo."""

__slots__ = (
'__ssh',
'__sudo_status',
'__enforce',
)

def __init__(
self,
ssh, # type: SSHClientBase
Expand All @@ -243,6 +250,40 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.__ssh.sudo_mode = self.__sudo_status

class __get_keepalive(object):
"""Context manager for keepalive management."""

__slots__ = (
'__ssh',
'__keepalive_status',
'__enforce',
)

def __init__(
self,
ssh, # type: SSHClientBase
enforce=True # type: bool
): # type: (...) -> None
"""Context manager for keepalive management.

:type ssh: SSHClient
:type enforce: bool
:param enforce: Keep connection alive after context manager exit
"""
self.__ssh = ssh
self.__keepalive_status = ssh.keepalive_mode
self.__enforce = enforce

def __enter__(self):
self.__keepalive_status = self.__ssh.keepalive_mode
if self.__enforce is not None:
self.__ssh.keepalive_mode = self.__enforce
self.__ssh.__enter__()

def __exit__(self, exc_type, exc_val, exc_tb):
self.__ssh.__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
self.__ssh.keepalive_mode = self.__keepalive_status

def __hash__(self):
"""Hash for usage as dict keys."""
return hash((
Expand Down Expand Up @@ -286,7 +327,9 @@ def __init__(
self.__hostname = host
self.__port = port

self.sudo_mode = False
self.__sudo_mode = False
self.__keepalive_mode = True

self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__sftp = None
Expand Down Expand Up @@ -460,10 +503,44 @@ def __exit__(self, exc_type, exc_val, exc_tb):

.. versionchanged:: 1.0.0 disconnect enforced on close
.. versionchanged:: 1.1.0 release lock on exit
.. versionchanged:: 1.2.1 disconnect enforced on close only not in keepalive mode
"""
self.close()
if not self.__keepalive_mode:
self.close()
super(SSHClientBase, self).__exit__(exc_type, exc_val, exc_tb)

@property
def sudo_mode(self): # type: () -> bool
"""Persistent sudo mode for connection object.

:rtype: bool
"""
return self.__sudo_mode

@sudo_mode.setter
def sudo_mode(self, mode): # type: (bool) -> None
"""Persistent sudo mode change for connection object.

:type mode: bool
"""
self.__sudo_mode = bool(mode)

@property
def keepalive_mode(self): # type: () -> bool
"""Persistent keepalive mode for connection object.

:rtype: bool
"""
return self.__keepalive_mode

@keepalive_mode.setter
def keepalive_mode(self, mode): # type: (bool) -> None
"""Persistent keepalive mode change for connection object.

:type mode: bool
"""
self.__keepalive_mode = bool(mode)

def reconnect(self): # type: () -> None
"""Reconnect SSH session."""
with self.lock:
Expand All @@ -485,6 +562,20 @@ def sudo(
"""
return self.__get_sudo(ssh=self, enforce=enforce)

def keepalive(
self,
enforce=True # type: bool
):
"""Call contextmanager with keepalive mode change.

:param enforce: Enforce keepalive enabled or disabled.
:type enforce: bool

.. Note:: Enter and exit ssh context manager is produced as well.
.. versionadded:: 1.2.1
"""
return self.__get_keepalive(ssh=self, enforce=enforce)

def execute_async(
self,
command, # type: str
Expand Down Expand Up @@ -839,7 +930,8 @@ def get_result(
list(futures.values()),
timeout=timeout
) # type: typing.Set[concurrent.futures.Future], typing.Set[concurrent.futures.Future]
for future in not_done:

for future in not_done: # pragma: no cover
future.cancel()

for (
Expand Down
14 changes: 8 additions & 6 deletions exec_helpers/subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from __future__ import unicode_literals

import collections
# noinspection PyCompatibility
import concurrent.futures
import errno
import logging
import os
Expand Down Expand Up @@ -203,7 +205,7 @@ def poll_streams(
verbose=verbose
)

@threaded.threaded(started=True)
@threaded.threadpooled()
def poll_pipes(
result, # type: exec_result.ExecResult
stop, # type: threading.Event
Expand Down Expand Up @@ -245,17 +247,17 @@ def poll_pipes(
stop_event = threading.Event()

# pylint: disable=assignment-from-no-return
poll_thread = poll_pipes(
future = poll_pipes(
result,
stop_event
) # type: threading.Thread
) # type: concurrent.futures.Future
# pylint: enable=assignment-from-no-return
# wait for process close
stop_event.wait(timeout)

concurrent.futures.wait([future], timeout)

# Process closed?
if stop_event.is_set():
poll_thread.join(0.1)
stop_event.clear()
return result
# Kill not ended process and wait for close
Expand All @@ -264,7 +266,7 @@ def poll_pipes(
stop_event.wait(5)
# Force stop cycle if no exit code after kill
stop_event.set()
poll_thread.join(5)
future.cancel()
except OSError:
# Nothing to kill
logger.warning(
Expand Down
Loading