Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions exec_helpers/_ssh_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def __call__(
logger.debug('Reconnect {}'.format(ssh))
ssh.reconnect()
return ssh
if CPYTHON and sys.getrefcount(cls.__cache[key]) == 2:
if (
CPYTHON and
sys.getrefcount(cls.__cache[key]) == 2
): # pragma: no cover
# If we have only cache reference and temporary getrefcount
# reference: close connection before deletion
logger.debug('Closing {} as unused'.format(cls.__cache[key]))
Expand All @@ -161,7 +164,10 @@ def clear_cache(mcs): # type: () -> None
# PY3: cache, ssh, temporary
# PY4: cache, values mapping, ssh, temporary
for ssh in mcs.__cache.values():
if CPYTHON and sys.getrefcount(ssh) == n_count:
if (
CPYTHON and
sys.getrefcount(ssh) == n_count
): # pragma: no cover
logger.debug('Closing {} as unused'.format(ssh))
ssh.close()
mcs.__cache = {}
Expand Down
53 changes: 50 additions & 3 deletions exec_helpers/subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,19 @@
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+

_win = sys.platform == "win32"
_posix = 'posix' in sys.builtin_module_names
_type_exit_codes = typing.Union[int, proc_enums.ExitCodes]
_type_expected = typing.Optional[typing.Iterable[_type_exit_codes]]

if _posix: # pragma: no cover
import fcntl # pylint: disable=import-error

elif _win: # pragma: no cover
import msvcrt # pylint: disable=import-error
import ctypes
from ctypes import wintypes # pylint: disable=import-error
from ctypes import windll # pylint: disable=import-error


class SingletonMeta(type):
"""Metaclass for Singleton.
Expand All @@ -51,12 +61,15 @@ class SingletonMeta(type):
"""

_instances = {}
_lock = threading.RLock()

def __call__(cls, *args, **kwargs):
"""Singleton."""
if cls not in cls._instances:
cls._instances[cls] = super(
SingletonMeta, cls).__call__(*args, **kwargs)
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super(
SingletonMeta, cls
).__call__(*args, **kwargs)
return cls._instances[cls]


Expand All @@ -78,6 +91,35 @@ def _py2_str(src): # pragma: no cover
)


def set_nonblocking_pipe(pipe): # type: (os.pipe) -> None
"""Set PIPE unblocked to allow polling of all pipes in parallel."""
descriptor = pipe.fileno()

if _posix: # pragma: no cover
# Get flags
flags = fcntl.fcntl(descriptor, fcntl.F_GETFL)

# Set nonblock mode
fcntl.fcntl(descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK)

elif _win: # pragma: no cover
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [
wintypes.HANDLE,
wintypes.LPDWORD,
wintypes.LPDWORD,
wintypes.LPDWORD
]
SetNamedPipeHandleState.restype = wintypes.BOOL
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
handle = msvcrt.get_osfhandle(descriptor)

windll.kernel32.SetNamedPipeHandleState(
handle,
ctypes.byref(PIPE_NOWAIT), None, None
)


class Subprocess(BaseSingleton):
"""Subprocess helper with timeouts and lock-free FIFO."""

Expand Down Expand Up @@ -227,6 +269,11 @@ def poll_pipes(
)

# Poll output

if open_stdout:
set_nonblocking_pipe(self.__process.stdout)
if open_stderr:
set_nonblocking_pipe(self.__process.stderr)
# pylint: disable=assignment-from-no-return
poll_thread = poll_pipes(
result,
Expand Down
17 changes: 10 additions & 7 deletions test/test_subprocess_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def fileno(self):

@mock.patch('exec_helpers.subprocess_runner.logger', autospec=True)
@mock.patch('select.select', autospec=True)
@mock.patch(
'exec_helpers.subprocess_runner.set_nonblocking_pipe', autospec=True
)
@mock.patch('subprocess.Popen', autospec=True, name='subprocess.Popen')
class TestSubprocessRunner(unittest.TestCase):
@staticmethod
Expand Down Expand Up @@ -100,7 +103,7 @@ def gen_cmd_result_log_message(result):
return ("Command exit code '{code!s}':\n{cmd!s}\n"
.format(cmd=result.cmd.rstrip(), code=result.exit_code))

def test_call(self, popen, select, logger):
def test_call(self, popen, _, select, logger):
popen_obj, exp_result = self.prepare_close(popen)
select.return_value = [popen_obj.stdout, popen_obj.stderr], [], []

Expand Down Expand Up @@ -147,7 +150,7 @@ def test_call(self, popen, select, logger):
mock.call.poll(), popen_obj.mock_calls
)

def test_call_verbose(self, popen, select, logger):
def test_call_verbose(self, popen, _, select, logger):
popen_obj, _ = self.prepare_close(popen)
select.return_value = [popen_obj.stdout, popen_obj.stderr], [], []

Expand Down Expand Up @@ -175,7 +178,7 @@ def test_call_verbose(self, popen, select, logger):
msg=self.gen_cmd_result_log_message(result)),
])

def test_context_manager(self, popen, select, logger):
def test_context_manager(self, popen, _, select, logger):
popen_obj, exp_result = self.prepare_close(popen)
select.return_value = [popen_obj.stdout, popen_obj.stderr], [], []

Expand All @@ -200,7 +203,7 @@ def test_context_manager(self, popen, select, logger):
def test_execute_timeout_fail(
self,
sleep,
popen, select, logger
popen, _, select, logger
):
popen_obj, exp_result = self.prepare_close(popen)
popen_obj.configure_mock(returncode=None)
Expand All @@ -227,7 +230,7 @@ def test_execute_timeout_fail(
),
))

def test_execute_no_stdout(self, popen, select, logger):
def test_execute_no_stdout(self, popen, _, select, logger):
popen_obj, exp_result = self.prepare_close(popen, open_stdout=False)
select.return_value = [popen_obj.stdout, popen_obj.stderr], [], []

Expand Down Expand Up @@ -265,7 +268,7 @@ def test_execute_no_stdout(self, popen, select, logger):
mock.call.poll(), popen_obj.mock_calls
)

def test_execute_no_stderr(self, popen, select, logger):
def test_execute_no_stderr(self, popen, _, select, logger):
popen_obj, exp_result = self.prepare_close(popen, open_stderr=False)
select.return_value = [popen_obj.stdout, popen_obj.stderr], [], []

Expand Down Expand Up @@ -304,7 +307,7 @@ def test_execute_no_stderr(self, popen, select, logger):
mock.call.poll(), popen_obj.mock_calls
)

def test_execute_no_stdout_stderr(self, popen, select, logger):
def test_execute_no_stdout_stderr(self, popen, _, select, logger):
popen_obj, exp_result = self.prepare_close(
popen,
open_stdout=False,
Expand Down