Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Tree: 8f04935da0
Fetching contributors…

Cannot retrieve contributors at this time

1374 lines (1237 sloc) 61.2 KB
# -*- coding: utf-8 -*-
# Copyright 2011 Liftoff Software Corporation (
# NOTE: Commercial licenses for this software are available!
# TODO: See if we can spin off into its own little program that sits between Gate One and That way we can take advantage of multiple cores/processors (for terminal-to-HTML processing). There's no reason why we can't write something that does what dtach does. Just need to redirect the fd of self.cmd to a unix domain socket and os.setsid() somewhere after forking (twice maybe?).
# TODO: Make the environment variables used before launching self.cmd configurable
# Meta
__version__ = '1.0'
__license__ = "AGPLv3 or Proprietary (see LICENSE.txt)"
__version_info__ = (1, 0)
__author__ = 'Dan McDougall <>'
__doc__ = """\
About termio
This module provides a Multiplex class that can perform the following:
* Fork a child process that opens a given terminal program.
* Read and write data to and from the child process (asynchronously).
* Examine the output of the child process in real-time and perform actions (also asynchronously!) based on what is "expected" (aka non-blocking, pexpect-like functionality).
* Log the output of the child process to a file and/or syslog.
The Multiplex class is meant to be used in conjunction with a running
:class:`tornado.ioloop.IOLoop` instance. It can be instantiated from within
your Tornado application like so::
multiplexer = termio.Multiplex(
.. note:: Support for event loops other than Tornado is in the works!
Then *multiplexer* can create and launch a new controlling terminal (tty)
running the given command (e.g. 'nethack')::
env = {
'PATH': os.environ['PATH'],
'MYVAR': 'foo'
fd = multiplexer.spawn(80, 24, env=env)
# The fd is returned from spawn() in case you want more low-level control.
Input and output from the controlled program is asynchronous and gets handled
via IOLoop. It will automatically write all output from the terminal program to
an instance of self.terminal_emulator (which defaults to Gate One's
`terminal.Terminal`). So if you want to perform an action whenever the running
terminal application has output (like, say, sending a message to a client)
you'll need to attach a callback::
def screen_update():
'Called when new output is ready to send to the client'
output = multiplexer.dump_html()
multiplexer.callbacks[multiplexer.CALLBACK_UPDATE] = screen_update
In this example, `screen_update()` will `write()` the output of
`multiplexer.dump_html()` to *socket_or_something* whenever the terminal program
has some sort of output. You can also make calls directly to the terminal
emulator (if you're using a custom one)::
def screen_update():
output = multiplexer.term.my_custom_func()
Writing characters to the controlled terminal application is pretty
multiplexer.write(u'some text')
Typically you'd pass in keystrokes or commands from your application to the
underlying program this way and the screen/terminal emulator would get updated
automatically. If using Gate One's `terminal.Terminal()` you can also attach
callbacks to perform further actions when more specific situations are
encountered (e.g. when the window title is set via its respective escape
def set_title():
'Hypothetical title-setting function'
print("Window title was just set to: %s" % multiplexer.term.title)
multiplexer.term.callbacks[multiplexer.CALLBACK_TITLE] = set_title
Module Functions and Classes
# Stdlib imports
import signal, threading, os, sys, time, struct, io, gzip, re, logging
from copy import copy
from datetime import timedelta, datetime
from functools import partial
from itertools import izip
from multiprocessing import Process
# Import our own stuff
from utils import get_translation, human_readable_bytes, noop, which
from utils import get_or_update_metadata, json_encode, shell_command
from utils import timeout_func
_ = get_translation()
# Globals
SEPARATOR = u"\U000f0f0f" # The character used to separate frames in the log
# NOTE: That unicode character was carefully selected from only the finest
# of the PUA. I hereby dub thee, "U+F0F0F0, The Separator."
CALLBACK_THREAD = None # Used by add_callback()
POSIX = 'posix' in sys.builtin_module_names
MACOS = os.uname()[0] == 'Darwin'
# Helper functions
def debug_expect(m_instance, match):
This method is used by :meth:`BaseMultiplex.expect` if :attr:`self.debug` is
True. It facilitates easy debugging of regular expressions. It will print
out precisely what was matched and where.
.. note:: This function only works with post-process patterns.
print("%s was matched..." % repr(match))
out = ""
for line in m_instance.dump():
regex = re.escape(match)
match_obj = re.match(regex, line)
if match_obj:
out += "--->%s\n" % repr(line)
out += " %s\n" % repr(line)
# Exceptions
class Timeout(Exception):
Used by :meth:`BaseMultiplex.expect` and :meth:`BaseMultiplex.await`;
called when a timeout is reached.
# Classes
class Pattern(object):
Used by :meth:`BaseMultiplex.expect`, an object to store patterns
(regular expressions) and their associated properties.
.. note:: The variable *m_instance* is used below to mean the current instance of BaseMultiplex (or a subclass thereof).
:pattern: A regular expression or iterable of regular expressions that will be checked against the output stream.
:callback: A function that will be called when the pattern is matched. Callbacks are called like so:
>>> callback(m_instance, matched_string)
:optional: Indicates that this pattern is optional. Meaning that it isn't required to match before the next pattern in :attr:`BaseMultiplex._patterns` is checked.
:sticky: Indicates that the pattern will not time out and won't be automatically removed from self._patterns when it is matched.
:errorback: A function to call in the event of a timeout or if an exception is encountered. Errorback functions are called like so:
>>> errorback(m_instance)
:preprocess: Indicates that this pattern is to be checked against the incoming stream before it is processed by the terminal emulator. Useful if you need to match non-printable characters like control codes and escape sequences.
:timeout: A :obj:`datetime.timedelta` object indicating how long we should wait before calling :meth:`errorback`.
:created: A :obj:`datetime.datetime` object that gets set when the Pattern is instantiated by :meth:`BaseMultiplex.expect`. It is used to determine if and when a timeout has been reached.
def __init__(self, pattern, callback,
self.pattern = pattern
self.callback = callback
self.errorback = errorback
self.optional = optional
self.sticky = sticky
self.preprocess = preprocess
self.timeout = timeout
self.created =
class BaseMultiplex(object):
A base class that all Multiplex types will inherit from.
:cmd: *string* - The command to execute when calling :meth:`spawn`.
:terminal_emulator: *terminal.Terminal or similar* - The terminal emulator to write to when capturing the incoming output stream from *cmd*.
:log_path: *string* - The absolute path to the log file where the output from *cmd* will be saved.
:term_id: *string* - The terminal identifier to associated with this instance (only used in the logs to identify terminals).
:syslog: *boolean* - Whether or not the session should be logged using the local syslog daemon.
:syslog_host: *string* - An optional syslog host to send session log information to (this is independent of the *syslog* option above--it does not require a syslog daemon be present on the host running Gate One).
:syslog_facility: *integer* - The syslog facility to use when logging messages. All possible facilities can be found in `utils.FACILITIES` (if you need a reference other than the syslog module).
:debug: *boolean* - Used by the `expect` methods... If set, extra debugging information will be output whenever a regular expression is matched.
CALLBACK_UPDATE = 1 # Screen update
CALLBACK_EXIT = 2 # When the underlying program exits
def __init__(self,
terminal_emulator=None, # Defaults to Gate One's terminal.Terminal
user=None, # Only used by log output (to differentiate who's who)
term_id=None, # Also only for syslog output for the same reason
self.debug = debug
self.lock = threading.Lock()
self.cmd = cmd
if not terminal_emulator:
# Why do this? So you could use/write your own specialty emulator.
# Whatever you use it just has to accept 'rows' and 'cols' as
# keyword arguments in __init__()
from terminal import Terminal # Dynamic import to cut down on waste
self.terminal_emulator = Terminal
self.terminal_emulator = terminal_emulator
self.log_path = log_path # Logs of the terminal output wind up here
self.syslog = syslog # See "if self.syslog:" below
self._alive = False
self.ratelimiter_engaged = False
self.rows = 24
self.cols = 80 = -1 # Means "no pid yet"
self.started = "Never"
self._patterns = []
self.timeout_thread = None
# Setup our callbacks
self.callbacks = { # Defaults do nothing which saves some conditionals
# Configure syslog logging
self.user = user
self.term_id = term_id
self.syslog_buffer = ''
if self.syslog and not self.syslog_host:
import syslog
except ImportError:
"The syslog module is required to log terminal sessions to "
"syslog if no syslog_host is set. The syslog module is not"
" required if you want to send syslog messages to a remote "
"syslog server but for this to work you must set the "
"syslog_host variable either via the command-line switch or"
" in your server.conf."))
if not syslog_facility:
syslog_facility = syslog.LOG_DAEMON
syslog_facility = syslog_facility
# Sets up syslog messages to show up like this:
# Sep 28 19:45:02 <hostname> gateone: <log message>
syslog.openlog('gateone', 0, syslog_facility)
def __repr__(self):
Returns self.__str__()
return "<%s>" % self.__str__()
def __str__(self):
Returns a string representation of this Multiplex instance and the
current state of things.
started = self.started
if started != "Never":
started = self.started.isoformat()
out = (
"%s.%s: "
"term_id: %s, "
"alive: %s, "
"command: %s, "
"started: %s"
% (
return out
def add_callback(self, event, callback, identifier=None):
Attaches the given *callback* to the given *event*. If given,
*identifier* can be used to reference this callback leter (e.g. when you
want to remove it). Otherwise an identifier will be generated
automatically. If the given *identifier* is already attached to a
callback at the given event, that callback will be replaced with
*event* - The numeric ID of the event you're attaching *callback* to (e.g. Multiplex.CALLBACK_UPDATE).
*callback* - The function you're attaching to the *event*.
*identifier* - A string or number to be used as a reference point should you wish to remove or update this callback later.
Returns the identifier of the callback. to Example:
>>> m = Multiplex()
>>> def somefunc(): pass
>>> id = "myref"
>>> ref = m.add_callback(m.CALLBACK_UPDATE, somefunc, id)
.. note:: This allows the controlling program to have multiple callbacks for the same event.
if not identifier:
identifier = callback.__hash__()
self.callbacks[event][identifier] = callback
return identifier
def remove_callback(self, event, identifier):
Removes the callback referenced by *identifier* that is attached to the
given *event*. Example:
>>> m.remove_callback(m.CALLBACK_BELL, "myref")
del self.callbacks[event][identifier]
except KeyError:
pass # Doesn't exist anymore--nothing to do
def remove_all_callbacks(self, identifier):
Removes all callbacks associated with *identifier*.
for event, identifiers in self.callbacks.items():
del self.callbacks[event][identifier]
except KeyError:
pass # Doesn't exist--nothing to worry about
def _call_callback(self, callback):
This method is here in the event that subclasses of `BaseMultiplex` need
to call callbacks in an implementation-specific way. It just calls
def spawn(self, rows=24, cols=80, env=None, em_dimensions=None):
This method must be overridden by suclasses of `BaseMultiplex`. It is
expected to execute a child process in a way that allows non-blocking
reads to be performed.
raise NotImplementedError(_(
"spawn() *must* be overridden by subclasses."))
def isalive(self):
This method must be overridden by suclasses of `BaseMultiplex`. It is
expected to return True if the child process is still alive and False
raise NotImplementedError(_(
"isalive() *must* be overridden by subclasses."))
def term_write(self, stream):
Writes :obj:`stream` to `BaseMultiplex.term` and also takes care of
logging to :attr:`log_path` (if set) and/or syslog (if
:attr:`syslog` is `True`). When complete, will call any
callbacks registered in :obj:`CALLBACK_UPDATE`.
:stream: A string or bytes containing the incoming output stream from the underlying terminal program.
.. note:: This kind of logging doesn't capture user keystrokes. This is intentional as we don't want passwords winding up in the logs.
# Write to the log (if configured)
if self.log_path:
now = int(round(time.time() * 1000))
if not os.path.exists(self.log_path):
# Write the first frame as metadata
metadata = {
'version': '1.0', # Log format version
'rows': self.rows,
'cols': self.cols,
'start_date': now
# NOTE: end_date should be added later when the is read for
# the first time by either the logviewer or the logging
# plugin.
# The hope is that we can use the first-frame-metadata paradigm
# to store all sorts of useful information about a log.
metadata_frame = str(json_encode(metadata))
metadata_frame = "%s:%s\xf3\xb0\xbc\x8f" % (now, metadata_frame)
log =, mode='a')
# NOTE: I'm using an obscure unicode symbol in order to avoid
# conflicts. We need to dpo our best to ensure that we can
# differentiate between terminal output and our log format...
# This should do the trick because it is highly unlikely that
# someone would be displaying this obscure unicode symbol on an
# actual terminal unless they were using Gate One to view a
# Gate One log file in vim or something =)
# \U000f0f0f == U+F0F0F (Private Use Symbol)
#output = unicode(stream.decode('utf-8', "ignore"))
#output = u"%s:%s\U000f0f0f" % (now, output)
output = "%s:%s\xf3\xb0\xbc\x8f" % (now, stream)
log =, mode='a')
# NOTE: Gate One's log format is special in that it can be used for both
# playing back recorded sessions *or* generating syslog-like output.
if self.syslog:
# Try and keep it as line-line as possible so we don't end up with
# a log line per character.
if '\n' in stream:
for line in stream.splitlines():
if self.syslog_buffer:
line = self.syslog_buffer + line
self.syslog_buffer = ''
# Sylog really doesn't like any fancy encodings
line = line.encode('ascii', 'xmlcharrefreplace')
syslog.syslog("%s %s: %s" % (
self.user, self.term_id, line))
self.syslog_buffer += stream
# Handle preprocess patterns (for expect())
if self._patterns:
# Handle post-process patterns (for expect())
if self._patterns:
if self.CALLBACK_UPDATE in self.callbacks:
for callback in self.callbacks[self.CALLBACK_UPDATE].values():
def preprocess(self, stream):
Handles preprocess patterns registered by :meth:`expect`. That
is, those patterns which have been marked with `preprocess = True`.
Patterns marked in this way get handled *before* the terminal emulator
processes the :obj:`stream`.
:stream: A string or bytes containing the incoming output stream from the underlying terminal program.
preprocess_patterns = (a for a in self._patterns if a.preprocess)
finished_non_sticky = False
# If there aren't any preprocess patterns this won't do anything:
for pattern_obj in preprocess_patterns:
if finished_non_sticky and not pattern_obj.sticky:
# We only want sticky patterns if we've already matched once
match =
if match:
callback = partial(pattern_obj.callback, self,
if not pattern_obj.sticky:
self.unexpect(hash(pattern_obj)) # Remove it
if not pattern_obj.optional:
# We only match the first non-optional pattern
finished_non_sticky = True
def postprocess(self):
Handles post-process patterns registered by :meth:`expect`.
# Check the terminal emulator screen for any matching patterns.
post_patterns = (a for a in self._patterns if not a.preprocess)
finished_non_sticky = False
for pattern_obj in post_patterns:
# For post-processing matches we search the terminal emulator's
# screen as a single string. This allows for full-screen screen
# scraping in addition to typical 'expect-like' functionality.
# The big difference being that with traditional expect (and
# pexpect) you don't get to examine the program's output as it
# would be rendered in an actual terminal.
# By using post-processing of the text after it has been handled
# by a terminal emulator we don't have to worry about hidden
# characters and escape sequences that we may not be aware of or
# could make our regular expressions much more complicated than
# they should be.
if finished_non_sticky and not pattern_obj.sticky:
continue # We only want sticky patterns at this point
# For convenience, trailing whitespace is removed from the lines
# output from the terminal emulator. This is so we don't have to
# put '\w*' before every '$' to match the end of a line.
term_lines = "\n".join(
[a.rstrip() for a in self.term.dump()]).rstrip()
if isinstance(pattern_obj.pattern, (list, tuple)):
for pat in pattern_obj.pattern:
match =
if match:
self._handle_match(pattern_obj, match)
match =
if match:
self._handle_match(pattern_obj, match)
def _handle_match(self, pattern_obj, match):
Handles a matched regex detected by :meth:`postprocess`. It calls :obj:`Pattern.callback` and takes care of removing it from :attr:`_patterns` (if it isn't sticky).
callback = partial(pattern_obj.callback, self,
if self.debug:
debug_callback = partial(
debug_expect, self,
if not pattern_obj.sticky:
self.unexpect(hash(pattern_obj)) # Remove it
if not pattern_obj.optional and not pattern_obj.sticky:
# We only match the first non-optional pattern
finished_non_sticky = True
def writeline(self, line=''):
Just like :meth:`write` but it writes a newline after writing *line*.
If no *line* is given a newline will be written.
self.write(line + u'\n')
def writelines(self, lines):
Writes *lines* (a list of strings) to the underlying program, appending
a newline after each line.
if getattr(lines, '__iter__', False):
for line in lines:
self.write(line + u'\n')
raise TypeError(_(
"%s is not iterable (strings don't count :)" % type(lines)))
def dump_html(self, full=False, client_id='0'):
Returns the difference of terminal lines (a list of lines, to be
specific) and its scrollback buffer (which is also a list of lines) as a
(scrollback, screen)
If a line hasn't changed since the last dump said line will be replaced
with an empty string in the output.
If *full*, will return the entire screen (not just the diff).
if *client_id* is given (string), this will be used as a unique client
identifier for keeping track of screen differences (so you can have
multiple clients getting their own unique diff output for the same
Multiplex instance).
if client_id not in self.prev_output:
self.prev_output[client_id] = [None for a in xrange(self.rows-1)]
scrollback, html = ([], [])
if self.term:
result = self.term.dump_html()
if result:
scrollback, html = result
# Make a copy so we can save it to prev_output later
preserved_html = html[:]
except IOError as e:
logging.debug(_("IOError attempting self.term.dump_html()"))
logging.debug("%s" % e)
if html:
if not full:
count = 0
for line1, line2 in izip(self.prev_output[client_id], html):
if line1 != line2:
html[count] = line2 # I love updates-in-place
html[count] = ''
count += 1
# Otherwise a full dump will take place
self.prev_output.update({client_id: preserved_html})
return (scrollback, html)
except ValueError as e:
# This would be special...
logging.error(_("ValueError in dumplines(): %s" % e))
return ([], [])
except (IOError, TypeError) as e:
logging.error(_("Unhandled exception in dumplines(): %s" % e))
if self.ratelimiter_engaged:
# Caused by the program being out of control
return([], [
_("<b>Program output too noisy. Sending Ctrl-c...</b>")])
import traceback
return ([], [])
def dump(self):
Dumps whatever is currently on the screen of the terminal emulator as
a list of plain strings (so they'll be escaped and look nice in an
interactive Python interpreter).
return self.term.dump()
def timeout_check(self, timeout_now=False):
Iterates over :attr:`BaseMultiplex._patterns` checking each to
determine if it has timed out. If a timeout has occurred for a
`Pattern` and said Pattern has an *errorback* function that function
will be called.
Returns True if there are still non-sticky patterns remaining. False
If *timeout_now* is True, will force the first errorback to be called
and will empty out self._patterns.
remaining_patterns = False
for pattern_obj in self._patterns:
if timeout_now:
if pattern_obj.errorback:
errorback = partial(pattern_obj.errorback, self)
return False
elapsed = - pattern_obj.created
if elapsed > pattern_obj.timeout:
if not pattern_obj.sticky:
if pattern_obj.errorback:
errorback = partial(pattern_obj.errorback, self)
elif not pattern_obj.sticky:
remaining_patterns = True
return remaining_patterns
def expect(self, patterns, callback,
Watches the stream of output coming from the underlying terminal program
for *patterns* and if there's a match *callback* will be called::
callback(multiplex_instance, matched_string)
*patterns* can be a string, an :class:`re.RegexObject` (as created by
:func:`re.compile`), or a iterator of either/or. Returns a reference
object that can be used to remove the registered pattern/callback at any
time using the :meth:`unexpect` method (see below).
.. note:: This function is non-blocking!
.. warning:: The *timeout* value gets compared against the time :meth:`expect` was called to create it. So don't wait too long if you're planning on using :meth:`await`!
Here's a simple example that changes a user's password::
>>> def write_password(m_instance, matched):
... print("Sending Password... %s patterns remaining." % len(m_instance._patterns))
... m_instance.writeline('somepassword')
>>> m = Multiplex('passwd someuser') # Assumes running as root :)
>>> m.expect('(?i)password:', write_password) # Step 1
>>> m.expect('(?i)password:', write_password) # Step 2
>>> print(len(m._patterns)) # To show that there's two in the queue
>>> m.spawn() # Execute the command
>>> m.await(10) # This will block for up to 10 seconds waiting for self._patterns to be empty (not counting optional patterns)
Sending Password... 1 patterns remaining.
Sending Password... 0 patterns remaining.
>>> m.isalive()
>>> # All done!
.. tip:: The :meth:`await` method will automatically call :meth:`spawn` if not :meth:`isalive`.
This would result in the password of 'someuser' being changed to 'somepassword'. How is the order determined? Every time :meth:`expect` is called it creates a new :class:`Pattern` using the given parameters and appends it to `self._patterns` (which is a list). As each :class:`Pattern` is matched its *callback* gets called and the :class:`Pattern` is removed from `self._patterns` (unless *sticky* is `True`). So even though the patterns and callbacks listed above were identical they will get executed and removed in the order they were created as each respective :class:`Pattern` is matched.
.. note:: Only the first pattern, or patterns marked as *sticky* are checked against the incoming stream. If the first non-sticky pattern is marked *optional* then the proceeding pattern will be checked (and so on). All other patterns will sit in `self._patterns` until their predecessors are matched/removed.
Patterns can be removed from `self._patterns` as needed by calling `unexpect(<reference>)`. Here's an example::
>>> def handle_accepting_ssh_key(m_instance, matched):
... m_instance.writeline(u'yes')
>>> m = Multiplex('ssh someuser@somehost')
>>> ref1 = m.expect('(?i)Are you sure.*\(yes/no\)\?', handle_accepting_ssh_key, optional=True)
>>> def send_password(m_instance, matched):
... m_instance.unexpect(ref1)
... self.writeline('somepassword')
>>> ref2 = m.expect('(?i)password:', send_password)
>>> # spawn() and/or await() and do stuff...
The example above would send 'yes' if asked by the SSH program to accept
the host's public key (which would result in it being automatically
removed from `self._patterns`). However, if this condition isn't met
before send_password() is called, send_password() will use the reference
object to remove it directly. This ensures that the pattern won't be
accidentally matched later on in the program's execution.
.. note:: Even if we didn't match the "Are you sure..." pattern it would still get auto-removed after its timeout was reached.
**About pattern ordering:** The position at which the given pattern will
be inserted in `self._patterns` can be specified via the
*position* argument. The default is to simply append which should be
appropriate in most cases.
**About Timeouts:** The *timeout* value passed to expect() will be used
to determine how long to wait before the pattern is removed from
self._patterns. When this occurs, *errorback* will be called with
current Multiplex instance as the only argument. If *errorback* is None
(the default) the pattern will simply be discarded with no action taken.
.. note:: If *sticky* is True the *timeout* value will be ignored.
**Notes about the length of what will be matched:** The entire terminal
'screen' will be searched every time new output is read from the
incoming stream. This means that the number of rows and columns of the
terminal determines the size of the search. So if your pattern needs to
look for something inside of 50 lines of text you need to make sure that
when you call `spawn` you specify at least `rows = 50`. Example::
>>> def handle_long_search(m_instance, matched)
... do_stuff(matched)
>>> m = Multiplex('')
>>> # 'begin', at least one non-newline char, 50 newlines, at least one char, then 'end':
>>> my_regex = re.compile('begin.+[\\n]{50}.+end', re.MULTILINE)
>>> ref = m.expect(my_regex, handle_accepting_ssh_key)
>>> m.spawn(rows=51, cols=150)
>>> # Call, m.spawn() or just let an event loop (e.g. Tornado's IOLoop) take care of things...
**About non-printable characters:** If the *postprocess* argument is
True (the default), patterns will be checked against the current screen as
output by the terminal emulator. This means that things like control
codes and escape sequences will be handled and discarded by the terminal
emulator and as such won't be available for patterns to be checked
against. To get around this limitation you can set *preprocess* to True
and the pattern will be checked against the incoming stream before it is
processed by the terminal emulator. Example::
>>> def handle_xterm_title(m_instance, matched)
... print("Caught title: %s" % matched)
>>> m = Multiplex('echo -e "\\033]0;Some Title\\007"')
>>> title_seq_regex = re.compile(r'\\x1b\\][0-2]\;(.*?)(\\x07|\\x1b\\\\)')
>>> m.expect(title_seq_regex, handle_xterm_title, preprocess=True) # <-- 'preprocess=True'
>>> m.await()
Caught title: Some Title
**Notes about debugging:** Instead of using `await` to wait for all of your patterns to be matched at once you can make individual calls to `read` to determine if your patterns are being matched in the way that you want. For example::
>>> def do_stuff(m_instance, matched):
... print("Debug: do_stuff() got %s" % repr(matched))
... # Do stuff here
>>> m = Multiplex('')
>>> m.expect('some pattern', do_stuff)
>>> m.expect('some other pattern', do_stuff)
>>> m.spawn()
>>> # Instead of calling await() just call one read() at a time...
>>> print(repr(
>>> print(repr( # Oops, called read() too soon. Try again:
'some other pattern'
>>> # Doh! Looks like 'some other pattern' comes first. Let's start over...
>>> m.unexpect() # Called with no arguments, it empties m._patterns
>>> m.terminate() # Tip: This will call unexpect() too so the line above really isn't necessary
>>> m.expect('some other pattern', do_stuff) # This time this one will be first
>>> m.expect('some pattern', do_stuff)
>>> m.spawn()
>>> print(repr( # This time I waited a moment :)
'Debug: do_stuff() got "some other pattern"'
'some other pattern'
>>> # Huzzah! Now let's see if 'some pattern' matches...
>>> print(repr(
'Debug: do_stuff() got "some pattern"'
'some pattern'
>>> # As you can see, calling read() at-will in an interactive interpreter can be very handy.
**About asynchronous use:** This mechanism is non-blocking (with the exception of `await`) and is meant to be used asynchronously. This means that if the running program has no output, `read` won't result in any patterns being matched. So you must be careful about timing *or* you need to ensure that `read` gets called either automatically when there's data to be read (IOLoop, EPoll, select, etc) or at regular intervals via a loop. Also, if you're not calling `read` at an interval (i.e. you're using a mechanism to detect when there's output to be read before calling it e.g. IOLoop) you need to ensure that `timeout_check` is called regularly anyway or timeouts won't get detected if there's no output from the underlying program. See the `` override for an example of what this means and how to do it.
# Create the Pattern object before we do anything else
if isinstance(patterns, (str, unicode)):
# Convert to a compiled regex (assume MULTILINE for the sanity of
# the ignorant)
patterns = re.compile(patterns, re.MULTILINE)
if isinstance(patterns, (tuple, list)):
# Ensure that all patterns are RegexObjects
pattern_list = []
for pattern in patterns:
if isinstance(pattern, str):
pattern = re.compile(pattern)
patterns = tuple(pattern_list) # No reason to keep it as a list
# Convert timeout to a timedelta if necessary
if isinstance(timeout, (str, int, float)):
timeout = timedelta(seconds=float(timeout))
elif not isinstance(timeout, timedelta):
raise TypeError(_(
"The timeout value must be a string, integer, float, or a "
"timedelta object"))
pattern_obj = Pattern(patterns, callback,
if not position:
self._patterns.insert(position, pattern_obj)
return hash(pattern_obj)
def unexpect(self, ref=None):
Removes *ref* from self._patterns so it will no longer be checked
against the incoming stream. If *ref* is None (the default),
`self._patterns` will be emptied.
if not ref:
self._patterns = [] # Reset
for i, item in enumerate(self._patterns):
if hash(item) == ref:
def await(self, timeout=15, rows=24, cols=80, env=None, em_dimensions=None):
Blocks until all non-optional patterns inside self._patterns have been
removed *or* if the given *timeout* is reached. *timeout* may be an
integer (in seconds) or a `datetime.timedelta` object.
Returns True if all non-optional, non-sticky patterns were handled
.. warning:: The timeouts attached to Patterns are set when they are created. Not when when you call :meth:`await`!
As a convenience, if :meth:`isalive` resolves to False,
:meth:`spawn` will be called automatically with *rows*, *cols*,
and *env* given as arguments.
To wait with expectation.
if not self.isalive():
rows=rows, cols=cols, env=env, em_dimensions=em_dimensions)
start =
# Convert timeout to a timedelta if necessary
if isinstance(timeout, (str, int, float)):
timeout = timedelta(seconds=float(timeout))
elif not isinstance(timeout, timedelta):
raise TypeError(_(
"The timeout value must be a string, integer, float, or a "
"timedelta object"))
remaining_patterns = True
while remaining_patterns:
# First we need to discount optional patterns
remaining_patterns = False
for pattern in self._patterns:
if not pattern.optional and not pattern.sticky:
remaining_patterns = True
# Now check if we've timed out
if ( - start) > timeout:
raise Timeout("Lingered longer than %s" % timeout.seconds)
# Lastly we perform a read() to ensure the output is processed # Remember: read() is non-blocking
time.sleep(0.1) # So we don't eat up all the CPU
return True
def terminate(self):
This method must be overridden by suclasses of `BaseMultiplex`. It is
expected to terminate/kill the child process.
raise NotImplementedError(_(
"terminate() *must* be overridden by subclasses."))
def _read(self, bytes=-1):
This method must be overridden by subclasses of `BaseMultiplex`. It is
expected that this method read the output from the running terminal
program in a non-blocking way, pass the result into `term_write`, and
then return the result.
raise NotImplementedError(_(
"_read() *must* be overridden by subclasses."))
def read(self, bytes=-1):
Calls `_read` and checks if any timeouts have been reached in
`self._patterns`. Returns the result of `_read`.
result = self._read(bytes)
# Perform checks for timeouts in self._patterns (used by self.expect())
return result
def write(self):
raise NotImplementedError(_(
"write() *must* be overridden by subclasses."))
class MultiplexPOSIXIOLoop(BaseMultiplex):
The MultiplexPOSIXIOLoop class takes care of executing a child process on
POSIX (aka Unix) systems and keeping track of its state via a terminal
emulator (`terminal.Terminal` by default). If there's a started instance
of :class:`tornado.ioloop.IOLoop`, handlers will be added to it that
automatically keep the terminal emulator synchronized with the output of the
child process.
If there's no IOLoop (or it just isn't started), terminal applications can
be interacted with by calling `` (to write any
pending output to the terminal emulator) and `MultiplexPOSIXIOLoop.write`
(which writes directly to stdin of the child).
.. note:: `` is non-blocking.
def __init__(self, *args, **kwargs):
super(MultiplexPOSIXIOLoop, self).__init__(*args, **kwargs)
from tornado import ioloop
self.terminating = False
self.sent_sigint = False
self.env = {}
self.io_loop = ioloop.IOLoop.instance() # Monitors child for activity
#self.io_loop.set_blocking_signal_threshold(2, self._blocked_io_handler)
signal.signal(signal.SIGALRM, self._blocked_io_handler)
self.reenable_timeout = None
interval = 100 # A 0.1 second interval should be fast enough
self.scheduler = ioloop.PeriodicCallback(self._timeout_checker, interval)
def __del__(self):
Makes sure that the underlying terminal program is terminated so we
don't leave things hanging around.
def _call_callback(self, callback):
If the IOLoop is started, adds the callback via
:meth:`IOLoop.add_callback` to ensure it gets called at the next IOLoop
iteration (which is thread safe). If the IOLoop isn't started
*callback* will get called immediately and directly.
if self.io_loop.running():
del callback
def _reenable_output(self):
Restarts capturing output from the underlying terminal program by
disengaging the rate limiter.
self.ratelimiter_engaged = False
def __reset_sent_sigint(self):
self.sent_sigint = False
def _blocked_io_handler(self, signum=None, frame=None):
Handles the situation where a terminal is blocking IO (usually because
of too much output). This method would typically get called
automatically by IOLoop's signal threshold mechanism
if not self.isalive():
# This can happen if terminate() gets called too fast from another
# thread... Strange stuff, mixing threading, signals, and
# multiprocessing!
return # Nothing to do
"Noisy process (%s) kicked off rate limiter." %
self.ratelimiter_engaged = True
for callback in self.callbacks[self.CALLBACK_UPDATE].values():
self.reenable_timeout = self.io_loop.add_timeout(
timedelta(seconds=10), self._reenable_output)
def spawn(self, rows=24, cols=80, env=None, em_dimensions=None):
Creates a new virtual terminal (tty) and executes self.cmd within it.
Also attaches :meth:`self._ioloop_read_handler` to the IOLoop so that
the terminal emulator will automatically stay in sync with the output of
the child process.
:cols: The number of columns to emulate on the virtual terminal (width)
:rows: The number of rows to emulate (height).
:env: Optional - A dictionary of environment variables to set when executing self.cmd.
:em_dimensions: Optional - The dimensions of a single character within the terminal (only used when calculating the number of rows/cols images take up).
self.started =
signal.signal(signal.SIGCHLD, signal.SIG_IGN) # No zombies allowed
"spawn(rows=%s, cols=%s, env=%s, em_dimensions=%s)" % (
rows, cols, repr(env), repr(em_dimensions)))
rows = min(200, rows) # Max 300 to limit memory utilization
cols = min(500, cols) # Max 500 for the same reason
import pty
pid, fd = pty.fork()
if pid == 0: # We're inside the child process
# Close all file descriptors other than stdin, stdout, and stderr (0, 1, 2)
# This ensures that the child doesn't get the parent's FDs
os.closerange(3, 256)
except OSError:
if not env:
env = {}
env["COLUMNS"] = str(cols)
env["LINES"] = str(rows)
env["TERM"] = "xterm-256color" # TODO: Needs to be configurable
env["PATH"] = os.environ['PATH']
env["LANG"] = os.environ.get('LANG', 'en_US.UTF-8')
env["PYTHONIOENCODING"] = "utf_8"
# Setup stdout to be more Gate One friendly
import termios
# Fix missing termios.IUTF8
if 'IUTF8' not in termios.__dict__:
termios.IUTF8 = 16384 # Hopefully not platform independent
stdin = 0
stdout = 1
stderr = 2
attrs = termios.tcgetattr(stdout)
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = attrs
# Enable flow control and UTF-8 input (probably not needed)
iflag |= (termios.IXON | termios.IXOFF | termios.IUTF8)
# OPOST: Enable post-processing of chars (not sure if this matters)
# INLCR: We're disabling this so we don't get \r\r\n anywhere
oflag |= (termios.OPOST | termios.ONLCR | termios.INLCR)
attrs = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
termios.tcsetattr(stdout, termios.TCSANOW, attrs)
# Now do the same for stdin
attrs = termios.tcgetattr(stdin)
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = attrs
iflag |= (termios.IXON | termios.IXOFF | termios.IUTF8)
oflag |= (termios.OPOST | termios.ONLCR | termios.INLCR)
attrs = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
termios.tcsetattr(stdin, termios.TCSANOW, attrs)
# The sleep statement below ensures we capture all output from the
# fd before it is closed... It turns out that IOLoop's response to
# changes in the fd is so fast that it can result in the fd being
# closed the very moment the Python interpreter is reading from it.
cmd = ['/bin/sh', '-c', self.cmd + '; sleep .1']
os.dup2(2, 1) # Copy stderr to stdout (equivalent to 2>&1)
os.execvpe(cmd[0], cmd, env)
else: # We're inside this Python script
logging.debug("spawn() pid: %s" % pid)
self._alive = True
self.fd = fd
self.env = env
self.em_dimensions = em_dimensions = pid
self.time = time.time()
self.term = self.terminal_emulator(
# Tell our IOLoop instance to start watching the child
fd, self._ioloop_read_handler, self.io_loop.READ)
self.prev_output = {}
# Set non-blocking so we don't wait forever for a read()
import fcntl
fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# Set the size of the terminal
self.resize(rows, cols, ctrl_l=False)
return fd
def isalive(self):
Checks the underlying process to see if it is alive and sets self._alive
if self._alive: # Re-check it
for f in os.listdir('/proc'):
pid_dir = os.path.join('/proc', f)
if os.path.isdir(pid_dir):
pid = int(f)
except ValueError:
continue # Not a PID
if pid ==
self._alive = True
return True
self._alive = False
return False
def resize(self, rows, cols, em_dimensions=None, ctrl_l=True):
Resizes the child process's terminal window to *rows* and *cols* by
first sending it a TIOCSWINSZ event and then sending ctrl-l.
If *em_dimensions* are provided they will be updated along with the
rows and cols.
The sending of ctrl-l can be disabled by setting *ctrl_l* to False.
logging.debug("Resizing term %s to rows: %s, cols: %s" % (
self.term_id, rows, cols))
if rows < 2:
rows = 24
if cols < 2:
cols = 80
self.rows = rows
self.cols = cols
self.term.resize(rows, cols, em_dimensions)
# Sometimes the resize doesn't actually apply (for whatever reason)
# so to get around this we have to send a different value than the
# actual value we want then send our actual value. It's a bug outside
# of Gate One that I have no idea how to isolate but this has proven to
# be an effective workaround.
import fcntl, termios
s = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(self.fd, termios.TIOCSWINSZ, s)
if ctrl_l:
self.write(u'\x0c') # ctrl-l
# SIGWINCH has been disabled since it can screw things up
#os.kill(, signal.SIGWINCH) # Send the resize signal
def terminate(self):
Kill the child process associated with `self.fd`.
.. note:: If dtach is being used this only kills the dtach process.
if not self.terminating:
self.terminating = True
return # Something else already called it
logging.debug("terminate() %s" %
if self.reenable_timeout:
# Unset our blocked IO handler so there's no references to self hanging
# around preventing us from freeing up memory
self.io_loop.set_blocking_signal_threshold(None, None)
except ValueError:
pass # Can happen if this instance winds up in a thread
for callback in self.callbacks[self.CALLBACK_EXIT].values():
if self._patterns:
# NOTE: Without this 'del' we end up with a memory leak every time
# a new instance of Multiplex is created. Apparently the references
# inside of PeriodicCallback pointing to self prevent proper garbage
# collection.
del self.scheduler
except (KeyError, IOError, OSError):
# This can happen when the fd is removed by the underlying process
# before the next cycle of the IOLoop. Not really a problem.
# TODO: Make this walk the series from SIGINT to SIGKILL
#os.kill(, signal.SIGINT)
os.kill(, signal.SIGTERM)
#os.kill(, signal.SIGKILL)
os.waitpid(-1, os.WNOHANG)
except OSError:
# The process is already dead--great.
# Reset all callbacks so there's nothing to prevent GC
self.callbacks = {
# Kick off a process that finalizes the log (updates metadata and
# recompresses everything to save disk space)
if not self.log_path:
return # No log to finalize so we're done.
PROC = Process(target=get_or_update_metadata, args=(self.log_path, self.user), kwargs={'force_update': True})
def _ioloop_read_handler(self, fd, event):
Read in the output of the process associated with *fd* and write it to
:fd: The file descriptor of the child process.
:event: An IOLoop event (e.g. IOLoop.READ).
.. note:: This method is not meant to be called directly... The IOLoop should be the one calling it when it detects any given event on the fd.
if event == self.io_loop.READ:
else: # Child died
"Apparently fd %s just died (event: %s)" % (self.fd, event)))
def _read(self, bytes=-1):
Reads at most *bytes* from the incoming stream, writes the result to
the terminal emulator using `term_write`, and returns what was read.
If *bytes* is -1 (default) it will read `self.fd` until there's no more
Returns the result of all that reading.
.. note:: Non-blocking.
result = ""
with self.lock:
with, 'rb', closefd=False) as reader:
if bytes == -1:
while True:
updated =
if not updated:
if self.ratelimiter_engaged:
# Truncate the output to the last 1024 chars
# NOTE: If we didn't truncate the output we'd
# eventually have to process it all which would
# take forever.
result += updated
elif bytes:
result =
except IOError as e:
# IOErrors can happen when self.fd is closed before we finish
# reading from it. Not a big deal.
except OSError as e:
logging.error("Got exception in read: %s" % `e`)
except Exception as e:
import traceback
"Got unhandled exception in read (???): %s" % `e`)
if self.isalive():
return result
def _timeout_checker(self):
Runs `timeout_check` and if there are no more non-sticky
patterns in :attr:`self._patterns`, stops :attr:`scheduler`.
remaining_patterns = self.timeout_check()
if not remaining_patterns:
# No reason to keep the PeriodicCallback going
logging.debug("Stopping self.scheduler (no remaining patterns)")
except AttributeError:
# Now this is a neat trick: The way IOLoop works with its
# stack_context thingamabob the scheduler doesn't actualy end up
# inside the MultiplexPOSIXIOLoop instance inside of this
# instance of _timeout_checker() *except* inside the main
# thread. It is absolutely wacky but it works and works well :)
def read(self, bytes=-1):
.. note:: This is an override of `` in order to take advantage of the IOLoop for ensuring `BaseMultiplex.expect` patterns timeout properly.
Calls `_read` and checks if any timeouts have been reached
in :attr:`self._patterns`. Returns the result of :meth:`_read`. This
is an override of `` that will create a
:class:`tornado.ioloop.PeriodicCallback` (as `self.scheduler`) that
executes :attr:`timeout_check` at a regular interval. The
`PeriodicCallback` will automatically cancel itself if there are no more
non-sticky patterns in :attr:`self._patterns`.
result = self._read(bytes)
remaining_patterns = self.timeout_check()
if remaining_patterns and not self.scheduler._running:
# Start 'er up in case we don't get any more output
logging.debug("Starting self.scheduler to check for timeouts")
return result
def _write(self, chars):
Writes *chars* to `self.fd` (pretty straightforward). If IOError or
OSError exceptions are encountered, will run `terminate`. All other
exceptions are logged but no action will be taken.
) as writer:
self.ratelimiter_engaged = False
except (IOError, OSError) as e:
if self.isalive():
except Exception as e:
logging.error("write() exception: %s" % e)
def write(self, chars):
Calls `_write(*chars*)` via `_call_callback` to ensure thread safety.
write = partial(self._write, chars)
class MultiplexMacOSIOLoop(MultiplexPOSIXIOLoop):
This class is a subclass of `MultiplexPOSIXIOLoop` that overrides the
`isalive` function since Mac OS X doesn't have /proc.
def isalive(self):
A Mac OS X-specific version of `MultiplexPOSIXIOLoop.isalive` that
checks the underlying process to see if it is alive and sets
`self._alive` appropriately.
# sub-subprocesses are inefficient (and blocking) but what can you do?
exitstatus, output = shell_command(
"ps -ef | awk '{print $2}' | grep %s" %
if exitstatus != 0:
self._alive = True
self._alive = False
return self._alive
def spawn(cmd, rows=24, cols=80, env=None, em_dimensions=None, *args, **kwargs):
A shortcut to::
>>> m = Multiplex(cmd, *args, **kwargs)
>>> m.spawn(rows, cols, env)
>>> return m
m = Multiplex(cmd, *args, **kwargs)
m.spawn(rows, cols, env, em_dimensions=em_dimensions)
return m
Multiplex = MultiplexMacOSIOLoop
Multiplex = MultiplexPOSIXIOLoop
raise NotImplementedError(_(
"termio currently only works on Unix platforms."))
Jump to Line
Something went wrong with that request. Please try again.