Skip to content

Commit

Permalink
Extract ephemeral TCP server support from executor.ssh.server.SSHServer
Browse files Browse the repository at this point in the history
  • Loading branch information
xolox committed Nov 8, 2015
1 parent a682efe commit c784551
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 73 deletions.
2 changes: 1 addition & 1 deletion executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
unicode = str

# Semi-standard module versioning.
__version__ = '7.3'
__version__ = '7.5'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down
208 changes: 141 additions & 67 deletions executor/ssh/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: October 18, 2015
# Last Change: November 8, 2015
# URL: https://executor.readthedocs.org

"""
Expand All @@ -14,6 +14,7 @@
"""

# Standard library modules.
import itertools
import logging
import os
import random
Expand All @@ -22,10 +23,11 @@
import tempfile

# Modules included in our package.
from executor import execute, ExternalCommand, ExternalCommandFailed, which
from executor import ExternalCommand, execute, which

# External dependencies.
from humanfriendly import format_timespan, Spinner, Timer
from humanfriendly import Spinner, Timer, format_timespan, pluralize
from property_manager import lazy_property, mutable_property

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand All @@ -34,7 +36,124 @@
"""The name of the SSH server executable (a string)."""


class SSHServer(ExternalCommand):
class EphemeralTCPServer(ExternalCommand):

"""
Make it easy to launch ephemeral TCP servers.
The :class:`EphemeralTCPServer` class makes it easy to allocate an
`ephemeral port number`_ that is not (yet) in use.
.. _ephemeral port number: \
http://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic.2C_private_or_ephemeral_ports
"""

@property
def async(self):
"""Ephemeral TCP servers always set :attr:`.ExternalCommand.async` to :data:`True`."""
return True

@mutable_property
def scheme(self):
"""A URL scheme that indicates the purpose of the ephemeral port (a string, defaults to 'tcp')."""
return 'tcp'

@mutable_property
def hostname(self):
"""The host name or IP address to connect to (a string, defaults to ``localhost``)."""
return 'localhost'

@lazy_property
def port_number(self):
"""A dynamically selected port number that was not in use at the moment it was selected (an integer)."""
logger.debug("Looking for a free ephemeral port (for %s traffic) ..", self.scheme.upper())
for i in itertools.count(1):
port_number = random.randint(49152, 65535)
if not self.is_connected(port_number):
logger.debug("Took %s to select free ephemeral port (%s).",
pluralize(i, "attempt"), self.render_location(port_number))
return port_number

@mutable_property
def connect_timeout(self):
"""The timeout in seconds for connection attempts (a number, defaults to 2)."""
return 2

@mutable_property
def wait_timeout(self):
"""The timeout in seconds for :func:`wait_until_connected()` (a number, defaults to 30)."""
return 30

def start(self, **options):
"""
Start the TCP server and wait for it to start accepting connections.
:param options: Any keyword arguments are passed to :func:`~.ExternalCommand.start()`.
:raises: Any exceptions raised by :func:`wait_until_connected()`
and :func:`~.ExternalCommand.start()`.
If the TCP server doesn't start accepting connections within the
configured timeout (see :attr:`wait_timeout`) the process will be
terminated and the timeout exception will be propagated.
"""
if not self.was_started:
logger.debug("Preparing to start %s server ..", self.scheme.upper())
super(EphemeralTCPServer, self).start(**options)
try:
self.wait_until_connected()
except TimeoutError:
self.terminate()
raise

def wait_until_connected(self, port_number=None):
"""
Wait until the TCP server starts accepting connections.
:param port_number: The port number to check (an integer, defaults to
the computed value of :attr:`port_number`).
:raises: :exc:`TimeoutError` when the SSH server isn't fast enough to
initialize.
"""
timer = Timer()
if port_number is None:
port_number = self.port_number
location = self.render_location(port_number)
with Spinner(timer=timer) as spinner:
while not self.is_connected(port_number):
if timer.elapsed_time > self.wait_timeout:
msg = "Server didn't start accepting connections within timeout of %s!"
raise TimeoutError(msg % format_timespan(self.wait_timeout))
spinner.step(label="Waiting for server to accept connections (%s)" % location)
spinner.sleep()
logger.debug("Waited %s for server to accept connections (%s).", timer, location)

def is_connected(self, port_number=None):
"""
Check whether the TCP server is accepting connections.
:param port_number: The port number to check (an integer, defaults to
the computed value of :attr:`port_number`).
:returns: :data:`True` if the TCP server is accepting connections,
:data:`False` otherwise.
"""
if port_number is None:
port_number = self.port_number
location = self.render_location(port_number)
logger.debug("Checking whether %s is accepting connections ..", location)
try:
socket.create_connection((self.hostname, port_number), self.connect_timeout)
logger.debug("Yes %s is accepting connections.", location)
return True
except Exception:
logger.debug("No %s isn't accepting connections.", location)
return False

def render_location(self, port_number):
"""Render a human friendly representation of an :class:`EphemeralPort` object."""
return "%s://%s:%i" % (self.scheme, self.hostname, port_number)


class SSHServer(EphemeralTCPServer):

"""
Subclass of :class:`.ExternalCommand` that manages a temporary SSH server.
Expand All @@ -61,32 +180,18 @@ def __init__(self, **options):
self.config_file = os.path.join(self.temporary_directory, 'config')
"""The pathname of the generated OpenSSH server configuration file (a string)."""
self.host_key_file = os.path.join(self.temporary_directory, 'host-key')
"""The pathname of the generated OpenSSH host key file (a string)."""
# http://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic.2C_private_or_ephemeral_ports
self.port = random.randint(49152, 65535)
"""The random port number on which the SSH server will listen (an integer)."""
# Initialize the superclass.
command = [self.sshd_path, '-D', '-f', self.config_file]
super(SSHServer, self).__init__(*command, logger=logger, **options)

@property
def is_accepting_connections(self):
""":data:`True` if the SSH server is running and accepting connections, :data:`False` otherwise."""
if self.is_running:
try:
address = ('localhost', self.port)
socket.create_connection(address, 2)
return True
except Exception:
pass
return False
super(SSHServer, self).__init__(
self.sshd_path, '-D', '-f', self.config_file,
scheme='ssh', logger=logger, **options
)

@property
def sshd_path(self):
"""The absolute pathname of :data:`SSHD_PROGRAM_NAME` (a string or :data:`None`)."""
matches = which(SSHD_PROGRAM_NAME)
if matches:
return matches[0]
"""The absolute pathname of :data:`SSHD_PROGRAM_NAME` (a string)."""
executables = which(SSHD_PROGRAM_NAME)
return executables[0] if executables else SSHD_PROGRAM_NAME

@property
def client_options(self):
Expand All @@ -99,55 +204,24 @@ def client_options(self):
"""
return dict(identity_file=self.client_key_file,
ignore_known_hosts=True,
port=self.port)
port=self.port_number)

def start(self, **options):
"""
Start the OpenSSH server.
:param options: Any keyword arguments are passed to
:func:`wait_until_accepting_connections()`.
:raises: :exc:`TimeoutError` when the SSH server isn't fast enough to
initialize.
Start the SSH server and wait for it to start accepting connections.
The :func:`start()` method automatically calls the following methods:
:param options: Any keyword arguments are passed to :func:`~.ExternalCommand.start()`.
:raises: Any exceptions raised by :func:`~.ExternalCommand.start()`.
1. :func:`generate_key_file()`
2. :func:`generate_config()`
3. :func:`executor.ExternalCommand.start()`.
4. :func:`wait_until_accepting_connections()`
The :func:`start()` method automatically calls the
:func:`generate_key_file()` and :func:`generate_config()` methods.
"""
if not self.was_started:
logger.debug("Preparing to start SSH server ..")
for key_file in (self.host_key_file, self.client_key_file):
self.generate_key_file(key_file)
self.generate_config()
super(SSHServer, self).start()
try:
self.wait_until_accepting_connections(**options)
except TimeoutError:
self.terminate()
raise

def wait_until_accepting_connections(self, timeout=30):
"""
Wait until the SSH server starts accepting connections.
:param timeout: If the SSH server doesn't start accepting connections
within the given timeout (number of seconds) the
attempt is aborted.
:raises: :exc:`TimeoutError` when the SSH server isn't fast enough to
initialize.
"""
timer = Timer()
with Spinner(timer=timer) as spinner:
while not self.is_accepting_connections:
if timer.elapsed_time > timeout:
msg = "SSH server didn't start accepting connections within timeout of %s!"
raise TimeoutError(command=self, error_message=msg % format_timespan(timeout))
spinner.step(label="Waiting for SSH server to accept connections")
spinner.sleep()
logger.debug("Waited %s after startup for SSH server to accept connections.", timer)

def generate_key_file(self, filename):
"""
Expand Down Expand Up @@ -179,7 +253,7 @@ def generate_config(self):
handle.write("LogLevel QUIET\n")
handle.write("PasswordAuthentication no\n")
handle.write("PidFile %s/sshd.pid\n" % self.temporary_directory)
handle.write("Port %i\n" % self.port)
handle.write("Port %i\n" % self.port_number)
handle.write("StrictModes no\n")
handle.write("UsePAM no\n")
handle.write("UsePrivilegeSeparation no\n")
Expand All @@ -194,11 +268,11 @@ def cleanup(self):
super(SSHServer, self).cleanup()


class TimeoutError(ExternalCommandFailed):
class TimeoutError(Exception):

"""
Raised when the OpenSSH server doesn't initialize quickly enough.
Raised when a TCP server doesn't start accepting connections quickly enough.
This exception is raised by :func:`~SSHServer.wait_until_accepting_connections()`
when the SSH server doesn't start accepting connections within a reasonable time.
This exception is raised by :func:`~EphemeralPort.wait_until_connected()`
when the TCP server doesn't start accepting connections within a reasonable time.
"""
10 changes: 5 additions & 5 deletions executor/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def test_ssh_unreachable(self):

def test_remote_working_directory(self):
"""Make sure remote working directories can be set."""
with SSHServer(async=True) as server:
with SSHServer() as server:
some_random_directory = tempfile.mkdtemp()
try:
cmd = RemoteCommand('127.0.0.1',
Expand All @@ -523,13 +523,13 @@ def test_remote_working_directory(self):

def test_remote_error_handling(self):
"""Make sure remote commands preserve exit codes."""
with SSHServer(async=True) as server:
with SSHServer() as server:
cmd = RemoteCommand('127.0.0.1', 'exit 42', **server.client_options)
self.assertRaises(RemoteCommandFailed, cmd.start)

def test_foreach(self):
"""Make sure remote command pools work."""
with SSHServer(async=True) as server:
with SSHServer() as server:
ssh_aliases = ['127.0.0.%i' % i for i in (1, 2, 3, 4, 5, 6, 7, 8)]
results = foreach(ssh_aliases, 'echo $SSH_CONNECTION',
concurrency=3, capture=True,
Expand All @@ -542,7 +542,7 @@ def test_foreach_with_logging(self):
directory = tempfile.mkdtemp()
try:
ssh_aliases = ['127.0.0.%i' % i for i in (1, 2, 3, 4, 5, 6, 7, 8)]
with SSHServer(async=True) as server:
with SSHServer() as server:
foreach(ssh_aliases, 'echo $SSH_CONNECTION',
concurrency=3, logs_directory=directory,
capture=True, **server.client_options)
Expand All @@ -558,7 +558,7 @@ def test_local_context(self):

def test_remote_context(self):
"""Test a remote command context."""
with SSHServer(async=True) as server:
with SSHServer() as server:
self.check_context(RemoteContext('127.0.0.1', **server.client_options))

def check_context(self, context):
Expand Down

0 comments on commit c784551

Please sign in to comment.