Skip to content
Browse files

Refactor coordinator logic into base class

  • Loading branch information...
1 parent 41c4d6f commit 9d725ab51ebc7fda5e4be20ab426e2181cc69bd3 @rfk committed Aug 8, 2012
Showing with 198 additions and 137 deletions.
  1. +0 −2 TODO.txt
  2. +39 −25 playitagainsam/__init__.py
  3. +111 −0 playitagainsam/coordinator.py
  4. +27 −29 playitagainsam/player.py
  5. +21 −81 playitagainsam/recorder.py
View
2 TODO.txt
@@ -1,6 +1,4 @@
- * factor out a common "controller" base class, along with proxy_to_controller
- helper function
* editing out ECHO of backspace, so I can stuff up during recording
and have it magically fixed upon playback
* child process watching, cleanup etc.
View
64 playitagainsam/__init__.py
@@ -70,40 +70,54 @@
def main(argv):
parser = argparse.ArgumentParser()
+ parser.add_argument("--join", action="store_true",
+ help="join an existing record/replay session")
subparsers = parser.add_subparsers(dest="subcommand")
+
# The "record" command.
parser_record = subparsers.add_parser("record")
parser_record.add_argument("datafile")
parser_record.add_argument("--shell",
+ help="the shell to execute",
default=os.environ.get("SHELL", "/bin/sh"))
- parser_record.add_argument("--join")
# The "replay" command.
parser_replay = subparsers.add_parser("replay")
parser_replay.add_argument("datafile")
- parser_replay.add_argument("--term")
+ parser_replay.add_argument("--terminal",
+ help="the terminal program to execute",
+ default="/usr/bin/gnome-terminal")
args = parser.parse_args(argv[1:])
- if args.subcommand == "record":
- if args.join is None:
- events = playitagainsam.util.EventLog()
- recorder = playitagainsam.recorder.Recorder(events)
- t = threading.Thread(target=recorder.run)
- t.setDaemon(True)
- t.start()
- addr = ("localhost", 12345)
- else:
- addr = args.join.split(":")
- addr = (addr[0], int(addr[1]))
- playitagainsam.recorder.spawn_in_recorder(addr, args.shell)
- if args.join is None:
- t.join()
- with open(args.datafile, "w") as datafile:
- data = {"events": events.events}
- output = json.dumps(data, indent=2, sort_keys=True)
- datafile.write(output)
- elif args.subcommand == "replay":
- with open(args.datafile, "r") as datafile:
- events = json.loads(datafile.read())["events"]
- player = playitagainsam.player.Replayer(events)
- player.run()
+ sock_path = args.datafile + ".sock"
+ if os.path.exists(sock_path) and not args.join:
+ raise RuntimeError("session already in progress")
+
+ try:
+ if args.subcommand == "record":
+ recorder = None
+ if not args.join:
+ events = playitagainsam.util.EventLog()
+ recorder = playitagainsam.recorder.Recorder(events, sock_path)
+ recorder.start()
+ playitagainsam.recorder.spawn_in_recorder(sock_path, args.shell)
+ if recorder is not None:
+ recorder.join()
+ with open(args.datafile, "w") as datafile:
+ data = {"events": events.events}
+ output = json.dumps(data, indent=2, sort_keys=True)
+ datafile.write(output)
+
+ elif args.subcommand == "replay":
+ if not args.join:
+ with open(args.datafile, "r") as datafile:
+ events = json.loads(datafile.read())["events"]
+ player = playitagainsam.player.Player(events, args.terminal, sock_path)
+ player.start()
+ playitagainsam.player.proxy_to_player(sock_path)
+ if player is not None:
+ player.join()
+
+ finally:
+ if os.path.exists(sock_path) and not args.join:
+ os.unlink(sock_path)
View
111 playitagainsam/coordinator.py
@@ -0,0 +1,111 @@
+# Copyright (c) 2012, Ryan Kelly.
+# All rights reserved; available under the terms of the MIT License.
+"""
+
+playitagainsam.coordinator: object for coordinating simulated terminals
+========================================================================
+
+This module provides a base class that can be used to coordinate input/output
+for one or more simulated terminals. Each terminal is associated with a
+"view" process that handles input and output.
+
+"""
+
+import os
+import sys
+import time
+import select
+import socket
+import threading
+
+from playitagainsam.util import get_fd, no_echo
+
+
+class StopCoordinator(Exception):
+ """Exception raised to stop execution of the coordinator."""
+ pass
+
+
+class SocketCoordinator(object):
+ """Object for coordinating activity between views and data processes."""
+
+ def __init__(self, sock_path):
+ self.__running = False
+ self.__run_thread = None
+ self.__ping_pipe_r, self.__ping_pipe_w = os.pipe()
+ self.sock_path = sock_path
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.sock.bind(sock_path)
+ self.sock.listen(1)
+
+ def __del__(self):
+ self.__cleanup_pipes()
+
+ def __cleanup_pipes(self, os=os):
+ if self.__ping_pipe_r is not None:
+ os.close(self.__ping_pipe_r)
+ self.__ping_pipe_r = None
+ if self.__ping_pipe_w is not None:
+ os.close(self.__ping_pipe_w)
+ self.__ping_pipe_w = None
+
+ def start(self):
+ assert self.__run_thread is None
+ self.__running = True
+ def runit():
+ try:
+ self.run()
+ except StopCoordinator:
+ pass
+ finally:
+ self.cleanup()
+ self.__run_thread = threading.Thread(target=runit)
+ self.__run_thread.start()
+
+ def stop(self):
+ assert self.__run_thread is not None
+ self.__running = False
+ os.write(self.__ping_pipe_w, "X")
+
+ def join(self):
+ self.__run_thread.join()
+
+ def run(self):
+ raise NotImplementedError
+
+ def cleanup(self):
+ pass
+
+ def wait_for_data(self, fds, timeout=None):
+ fds = [self.__ping_pipe_r] + list(fds)
+ try:
+ ready, _, _ = select.select(fds, [], fds, timeout)
+ if not self.__running:
+ raise StopCoordinator
+ return ready
+ except OSError:
+ return []
+
+
+def proxy_to_coordinator(socket_path, header=None, stdin=None, stdout=None):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(socket_path)
+ try:
+ stdin_fd = get_fd(stdin, sys.stdin)
+ stdout_fd = get_fd(stdout, sys.stdout)
+ with no_echo(stdin_fd):
+ if header is not None:
+ sock.sendall(header)
+ while True:
+ ready, _, _ = select.select([stdin_fd, sock], [], [])
+ if stdin_fd in ready:
+ c = os.read(stdin_fd, 1)
+ if c:
+ sock.send(c)
+ if sock in ready:
+ c = sock.recv(1024)
+ if not c:
+ break
+ os.write(stdout_fd, c)
+ finally:
+ sock.close()
View
56 playitagainsam/player.py
@@ -13,41 +13,22 @@
import socket
from playitagainsam.util import no_echo, get_fd, forkexec
+from playitagainsam.coordinator import SocketCoordinator, proxy_to_coordinator
-class Replayer(object):
+class Player(SocketCoordinator):
- def __init__(self, events):
+ def __init__(self, events, terminal, sock_path):
self.events = list(events)
- self._ping_pipe_r, self._ping_pipe_w = os.pipe()
- self.running = False
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.bind(("localhost", 12345))
- self.sock.listen(1)
+ self.terminal = terminal
+ super(Player, self).__init__(sock_path)
self.terminals = {}
self.view_fds = {}
- def __del__(self):
- self._cleanup_pipes()
-
- def _cleanup_pipes(self, os=os):
- if getattr(self, "_ping_pipe_r", None) is not None:
- os.close(self._ping_pipe_r)
- self._ping_pipe_r = None
- if getattr(self, "_ping_pipe_w", None) is not None:
- os.close(self._ping_pipe_w)
- self._ping_pipe_w = None
-
- def stop(self):
- self.running = False
- os.write(self._ping_pipe_w, "X")
-
def run(self):
- self.running = True
event_stream = self._iter_events()
try:
- while self.running:
+ while True:
event = event_stream.next()
if event["act"] == "OPEN":
self._do_open_terminal(event["term"])
@@ -61,8 +42,12 @@ def run(self):
self._do_write(event["term"], event["data"])
except StopIteration:
pass
+
+ def cleanup(self):
for term in self.terminals:
- self._do_close_terminal(term)
+ view_sock, = self.terminals[term]
+ view_sock.close()
+ super(Player, self).cleanup()
def _iter_events(self):
for event in self.events:
@@ -77,12 +62,21 @@ def _iter_events(self):
yield event
def _do_open_terminal(self, term):
- child_pid = forkexec("/usr/bin/gnome-terminal", "-x", "/bin/bash", "-c", sys.executable + " -c \"from playitagainsam.recorder import proxy_to_recorder_addr; proxy_to_recorder_addr(('localhost', 12345))\" ; sleep 10")
+ ready = self.wait_for_data([self.sock], 0.1)
+ if self.sock not in ready:
+ # XXX TODO: wait for a keypress from some existing terminal
+ # to trigger the appearance of the terminal.
+ join_cmd = list(sys.argv)
+ join_cmd.insert(1, "--join")
+ forkexec(self.terminal, "-x", *join_cmd)
view_sock, _ = self.sock.accept()
- self.terminals[term] = (view_sock, child_pid)
+ self.terminals[term] = (view_sock,)
def _do_close_terminal(self, term):
- view_sock, client_pid = self.terminals[term]
+ view_sock, = self.terminals[term]
+ c = view_sock.recv(1)
+ while c not in ("\n", "\r"):
+ c = view_sock.recv(1)
view_sock.close()
def _do_read(self, term, wanted):
@@ -95,3 +89,7 @@ def _do_read(self, term, wanted):
def _do_write(self, term, data):
view_sock = self.terminals[term][0]
view_sock.sendall(data)
+
+
+def proxy_to_player(sock_path, **kwds):
+ return proxy_to_coordinator(sock_path, **kwds)
View
102 playitagainsam/recorder.py
@@ -6,8 +6,6 @@
==============================================================
This module provides the ability to record interactive terminal sessions.
-They are recorded by means of a single "recording master" process along
-with one "recording slave" per terminal used during the session.
"""
@@ -20,48 +18,33 @@
import uuid
from playitagainsam.util import forkexec_pty, get_fd, no_echo
+from playitagainsam.coordinator import SocketCoordinator, proxy_to_coordinator
-class Recorder(object):
+class Recorder(SocketCoordinator):
"""Object for recording activity in a session."""
- def __init__(self, eventlog):
+ def __init__(self, eventlog, sock_path):
self.eventlog = eventlog
- self._ping_pipe_r, self._ping_pipe_w = os.pipe()
- self.running = False
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.bind(("localhost", 12345))
- self.sock.listen(1)
+ super(Recorder, self).__init__(sock_path)
self.terminals = {}
self.view_fds = {}
self.proc_fds = {}
- def __del__(self):
- self._cleanup_pipes()
-
- def _cleanup_pipes(self, os=os):
- if getattr(self, "_ping_pipe_r", None) is not None:
- os.close(self._ping_pipe_r)
- self._ping_pipe_r = None
- if getattr(self, "_ping_pipe_w", None) is not None:
- os.close(self._ping_pipe_w)
- self._ping_pipe_w = None
-
def run(self):
- self.running = True
# Loop waiting for the first terminal to be opened.
- while self.running and not self.terminals:
- ready = self._wait_for_activity([self.sock])
+ while not self.terminals:
+ ready = self.wait_for_data([self.sock])
if self.sock in ready:
client_sock, _ = self.sock.accept()
self._handle_open_terminal(client_sock)
# Loop waiting for activity to occur, or all terminals to close.
- while self.running and self.terminals:
+ while self.terminals:
# Time how long it takes, in case we need to trigger output
# via a pause in the event stream.
t1 = time.time()
- ready = self._wait_for_activity()
+ fds = [self.sock] + list(self.view_fds) + list(self.proc_fds)
+ ready = self.wait_for_data(fds)
t2 = time.time()
if not ready:
continue
@@ -81,25 +64,13 @@ def run(self):
# Now process any output that has been triggered.
# This will loop and consume as much output as is available.
self._handle_output()
- # Clean up any terminals that are open when we're asked to stop.
- for term in self.terminals:
- self._handle_close_terminal(term)
- def stop(self):
- self.running = False
- os.write(self._ping_pipe_w, "X")
-
- def _wait_for_activity(self, fds=None, timeout=None):
- if fds is not None:
- fds = [self._ping_pipe_r] + list(fds)
- else:
- fds = [self._ping_pipe_r, self.sock] + \
- list(self.view_fds) + list(self.proc_fds)
- try:
- ready, _, _ = select.select(fds, [], fds, timeout)
- return ready
- except OSError:
- return []
+ def cleanup(self):
+ for term in self.terminals:
+ client_sock, proc_fd, proc_pid = self.terminals[term]
+ client_sock.close()
+ os.close(proc_fd)
+ super(Recorder, self).cleanup()
def _handle_input(self, view_fd):
try:
@@ -120,15 +91,15 @@ def _handle_input(self, view_fd):
os.write(proc_fd, c)
def _handle_output(self):
- ready = self._wait_for_activity(self.proc_fds)
+ ready = self.wait_for_data(self.proc_fds)
# Process output from each ready process in turn.
for proc_fd in ready:
term = self.proc_fds[proc_fd]
view_fd = self.terminals[term][0].fileno()
# Loop through one character at a time, consuming as
# much output from the process as is available.
proc_ready = [proc_fd]
- while proc_ready and self.running:
+ while proc_ready:
try:
c = os.read(proc_fd, 1)
if not c:
@@ -145,7 +116,7 @@ def _handle_output(self):
})
# Forward it to the corresponding terminal view.
os.write(view_fd, c)
- proc_ready = self._wait_for_activity([proc_fd], 0)
+ proc_ready = self.wait_for_data([proc_fd], 0)
def _handle_open_terminal(self, client_sock):
# Read the program to start, in form "SIZE JSON-DATA\n"
@@ -194,38 +165,7 @@ def _handle_pause(self, duration):
})
-def proxy_to_recorder(sock, stdin=None, stdout=None):
- stdin_fd = get_fd(stdin, sys.stdin)
- stdout_fd = get_fd(stdout, sys.stdout)
- with no_echo(stdin_fd):
- while True:
- ready, _, _ = select.select([stdin_fd, sock], [], [])
- if stdin_fd in ready:
- c = os.read(stdin_fd, 1)
- if c:
- sock.send(c)
- if sock in ready:
- c = sock.recv(1024)
- if not c:
- break
- os.write(stdout_fd, c)
-
-
-def spawn_in_recorder(server_addr, shell, stdin=None, stdout=None):
- sock = socket.socket()
- sock.connect(server_addr)
+def spawn_in_recorder(sock_path, shell, **kwds):
data = json.dumps(shell)
- sock.sendall("%d %s\n" % (len(data), data))
- try:
- proxy_to_recorder(sock, stdin=stdin, stdout=stdout)
- finally:
- sock.close()
-
-
-def proxy_to_recorder_addr(addr, stdin=None, stdout=None):
- sock = socket.socket()
- sock.connect(addr)
- try:
- proxy_to_recorder(sock, stdin=stdin, stdout=stdout)
- finally:
- sock.close()
+ header = "%d %s\n" % (len(data), data)
+ return proxy_to_coordinator(sock_path, header, **kwds)

0 comments on commit 9d725ab

Please sign in to comment.
Something went wrong with that request. Please try again.