Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ python:
- 3.4
- 3.5
- &mainstream_python 3.6
- pypy2.7-5.8.0
- pypy3.5-5.8.0
- pypy
- pypy3.5
install:
- &upgrade_python_toolset pip install --upgrade pip setuptools wheel
- pip install tox-travis
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ This methods are almost the same for `SSHCleint` and `Subprocess`, except specif
raise_on_err=True, # type: bool
)

If no STDOUT or STDERR required, it is possible to disable this FIFO pipes via `**kwargs` with flags `open_stdout=False` and `open_stderr=False`.

The next command level uses lower level and kwargs are forwarded, so expected exit codes are forwarded from `check_stderr`.
Implementation specific flags are always set via kwargs.

Expand Down
12 changes: 8 additions & 4 deletions doc/source/ExecResult.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,28 @@ API: ExecResult
``typing.Any``
YAML from stdout.

.. py:method:: read_stdout(src, log=None, verbose=False)
.. py:method:: read_stdout(src=None, log=None, verbose=False)

Read stdout file-like object to stdout.

:param src: source
:type src: ``typing.Iterable``
:type src: ``typing.Optional[typing.Iterable]``
:param log: logger
:type log: ``typing.Optional[logging.Logger]``
:param verbose: use log.info instead of log.debug
:type verbose: ``bool``

.. py:method:: read_stderr(src, log=None, verbose=False)
.. versionchanged:: 1.2.0 - src can be None

.. py:method:: read_stderr(src=None, log=None, verbose=False)

Read stderr file-like object to stderr.

:param src: source
:type src: ``typing.Iterable``
:type src: ``typing.Optional[typing.Iterable]``
:param log: logger
:type log: ``typing.Optional[logging.Logger]``
:param verbose: use log.info instead of log.debug
:type verbose: ``bool``

.. versionchanged:: 1.2.0 - src can be None
8 changes: 7 additions & 1 deletion doc/source/SSHClient.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,22 @@ API: SSHClient and SSHAuth.
:param enforce: Enforce sudo enabled or disabled. By default: None
:type enforce: ``typing.Optional[bool]``

.. py:method:: execute_async(command, get_pty=False, **kwargs)
.. py:method:: execute_async(command, get_pty=False, open_stdout=True, open_stderr=True, **kwargs)

Execute command in async mode and return channel with IO objects.

:param command: Command for execution
:type command: ``str``
:param get_pty: open PTY on remote machine
:type get_pty: ``bool``
:param open_stdout: open STDOUT stream for read
:type open_stdout: bool
:param open_stderr: open STDERR stream for read
:type open_stderr: bool
:rtype: ``typing.Tuple[paramiko.Channel, paramiko.ChannelFile, paramiko.ChannelFile, paramiko.ChannelFile]``

.. versionchanged:: 1.2.0 - open_stdout and open_stderr flags

.. py:method:: execute(command, verbose=False, timeout=None, **kwargs)

Execute command and wait for return code.
Expand Down
1 change: 1 addition & 0 deletions doc/source/Subprocess.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ API: Subprocess
:raises: ExecHelperTimeoutError

.. versionchanged:: 1.1.0 - make method
.. versionchanged:: 1.2.0 - open_stdout and open_stderr flags

.. py:method:: check_call(command, verbose=False, timeout=None, error_info=None, expected=None, raise_on_err=True, **kwargs)

Expand Down
33 changes: 21 additions & 12 deletions exec_helpers/_ssh_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ def execute_async(
self,
command, # type: str
get_pty=False, # type: bool
open_stdout=True, # type: bool
open_stderr=True, # type: bool
**kwargs
): # type: (...) -> _type_execute_async
"""Execute command in async mode and return channel with IO objects.
Expand All @@ -515,12 +517,18 @@ def execute_async(
:type command: str
:param get_pty: open PTY on remote machine
:type get_pty: bool
:param open_stdout: open STDOUT stream for read
:type open_stdout: bool
:param open_stderr: open STDERR stream for read
:type open_stderr: bool
:rtype: typing.Tuple[
paramiko.Channel,
paramiko.ChannelFile,
paramiko.ChannelFile,
paramiko.ChannelFile,
typing.Optional[paramiko.ChannelFile],
typing.Optional[paramiko.ChannelFile],
]

.. versionchanged:: 1.2.0 - open_stdout and open_stderr flags
"""
message = _log_templates.CMD_EXEC.format(cmd=command.rstrip())
self.logger.debug(message)
Expand All @@ -536,8 +544,8 @@ def execute_async(
)

stdin = chan.makefile('wb')
stdout = chan.makefile('rb')
stderr = chan.makefile_stderr('rb')
stdout = chan.makefile('rb') if open_stdout else None
stderr = chan.makefile_stderr('rb') if open_stderr else None
cmd = "{command}\n".format(command=command)
if self.sudo_mode:
encoded_cmd = base64.b64encode(cmd.encode('utf-8')).decode('utf-8')
Expand Down Expand Up @@ -581,13 +589,13 @@ def poll_streams(
stderr, # type: paramiko.channel.ChannelFile
):
"""Poll FIFO buffers if data available."""
if channel.recv_ready():
if stdout and channel.recv_ready():
result.read_stdout(
src=stdout,
log=self.logger,
verbose=verbose
)
if channel.recv_stderr_ready():
if stderr and channel.recv_stderr_ready():
result.read_stderr(
src=stderr,
log=self.logger,
Expand All @@ -612,12 +620,13 @@ def poll_pipes(
"""
while not stop.isSet():
time.sleep(0.1)
poll_streams(
result=result,
channel=channel,
stdout=stdout,
stderr=stderr,
)
if stdout or stderr:
poll_streams(
result=result,
channel=channel,
stdout=stdout,
stderr=stderr,
)

if channel.status_event.is_set():
result.read_stdout(
Expand Down
14 changes: 11 additions & 3 deletions exec_helpers/exec_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,28 +180,32 @@ def __poll_stream(

def read_stdout(
self,
src, # type: typing.Iterable
src=None, # type: typing.Optional[typing.Iterable]
log=None, # type: typing.Optional[logging.Logger]
verbose=False # type: bool
):
"""Read stdout file-like object to stdout.

:param src: source
:type src: typing.Iterable
:type src: typing.Optional[typing.Iterable]
:param log: logger
:type log: typing.Optional[logging.Logger]
:param verbose: use log.info instead of log.debug
:type verbose: bool

.. versionchanged:: 1.2.0 - src can be None
"""
if self.timestamp:
raise RuntimeError('Final exit code received.')
if not src:
return
with self.lock:
self.__stdout_str = self.__stdout_brief = None
self.__stdout += tuple(self.__poll_stream(src, log, verbose))

def read_stderr(
self,
src, # type: typing.Iterable
src=None, # type: typing.Optional[typing.Iterable]
log=None, # type: typing.Optional[logging.Logger]
verbose=False # type: bool
):
Expand All @@ -213,9 +217,13 @@ def read_stderr(
:type log: typing.Optional[logging.Logger]
:param verbose: use log.info instead of log.debug
:type verbose: bool

.. versionchanged:: 1.2.0 - src can be None
"""
if self.timestamp:
raise RuntimeError('Final exit code received.')
if not src:
return
with self.lock:
self.__stderr_str = self.__stderr_brief = None
self.__stderr += tuple(self.__poll_stream(src, log, verbose))
Expand Down
33 changes: 23 additions & 10 deletions exec_helpers/subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import unicode_literals

import logging
import os
import select
import sys
import subprocess # nosec # Expected usage
Expand All @@ -36,6 +37,7 @@
from exec_helpers import proc_enums

logger = logging.getLogger(__name__)
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+

_win = sys.platform == "win32"
_type_exit_codes = typing.Union[int, proc_enums.ExitCodes]
Expand Down Expand Up @@ -122,15 +124,23 @@ def __exec_command(
cwd=None, # type: typing.Optional[str]
env=None, # type: typing.Optional[typing.Dict[str, typing.Any]]
timeout=None, # type: typing.Optional[int]
verbose=False # type: bool
verbose=False, # type: bool
open_stdout=True, # type: bool
open_stderr=True, # type: bool
):
"""Command executor helper.

:type command: str
:type cwd: str
:type env: dict
:type timeout: int
:param open_stdout: open STDOUT stream for read
:type open_stdout: bool
:param open_stderr: open STDERR stream for read
:type open_stderr: bool
:rtype: ExecResult

.. versionchanged:: 1.2.0 - open_stdout and open_stderr flags
"""
def poll_streams(
result, # type: exec_result.ExecResult
Expand All @@ -142,9 +152,9 @@ def poll_streams(
# select.select is not supported on windows
result.read_stdout(src=stdout, log=logger, verbose=verbose)
result.read_stderr(src=stderr, log=logger, verbose=verbose)
else:
else: # pragma: no cover
rlist, _, _ = select.select(
[stdout, stderr],
[item for item in (stdout, stderr) if item is not None],
[],
[])
if rlist:
Expand Down Expand Up @@ -173,11 +183,12 @@ def poll_pipes(
"""
while not stop.isSet():
time.sleep(0.1)
poll_streams(
result=result,
stdout=self.__process.stdout,
stderr=self.__process.stderr,
)
if open_stdout or open_stderr:
poll_streams(
result=result,
stdout=self.__process.stdout,
stderr=self.__process.stderr,
)

self.__process.poll()

Expand Down Expand Up @@ -209,8 +220,8 @@ def poll_pipes(
self.__process = subprocess.Popen(
args=[command],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE if open_stdout else devnull,
stderr=subprocess.PIPE if open_stderr else devnull,
shell=True, cwd=cwd, env=env,
universal_newlines=False,
)
Expand All @@ -234,6 +245,8 @@ def poll_pipes(
try:
self.__process.kill() # kill -9
stop_event.wait(5)
# Force stop cycle if no exit code after kill
stop_event.set()
poll_thread.join(5)
except OSError:
# Nothing to kill
Expand Down
Loading