Permalink
Browse files

This is a script for getting info about running tmuxes

Still under heavy development, and occasionally copied from my svn
repo into this git one.
  • Loading branch information...
1 parent e17368c commit 591e00babf3aeef4c9e1db8bdfd1a16f9c2c9140 @richo richo committed May 17, 2011
Showing with 307 additions and 0 deletions.
  1. +307 −0 tmux/tinfo
View
@@ -0,0 +1,307 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import subprocess
+import argparse
+sp = subprocess
+
+parser = argparse.ArgumentParser(description="Parse and display tmux info data")
+# Make this the default args I Think
+parser.add_argument('search', metavar='search terms', action='store',
+ default=None, nargs='*',
+ help="Search for <string> in window names")
+parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
+ default=False,
+ help="Enable verbose reporting (show full data)")
+parser.add_argument('-a', '--attach', dest='attach', action='store_true',
+ default=False,
+ help="Attach to the session, if your search results only return one")
+parser.add_argument('-p', '--pty', dest='searchpty', action='store',
+ default=None,
+ help="Search for a given {p,t}ty amongst those owned by windows")
+
+class TMuxInfo(object):
+ """This is the master class that will replace our broken toplevel"""
+ def get_session_id_somehow(self):
+ return "59"
+ def __init__(self, ph=None):
+ self.handlers = { "Clients": self.handle_clients,
+ "Sessions": self.handle_sessions }
+ self.output = sys.stdout
+ self.verbose = False
+ self.dump_func = repr
+ if not ph:
+ p = sp.Popen(["tmux", "info"], bufsize=512,
+ stdin=None, stdout=sp.PIPE, close_fds=True)
+ self.ph = Watcher(p.stdout)
+ else:
+ self.ph = Watcher(ph)
+ def out(self, msg):
+ print >>self.output, msg
+
+ @staticmethod
+ def parse_client_ind1(line): # {{{
+ dat = Data()
+ parts = line.split(" ")
+ dat.idx = parts[0]
+ dat.idx = dat.idx.replace(":", "")
+ dat.pty = parts[1]
+ dat.sizex, dat.sizey = map(int,
+ (parts[2][1:-1], parts[3][:-2]))
+ dat.sessno = parts[4]
+ dat.size = parts[5][1:]
+ dat.term = parts[6][:-1]
+ dat.extra = " ".join((parts[7], parts[8]))
+ return dat
+ #}}}
+
+ def build(self):
+ while self.ph:
+ if self.ph.line == os.linesep:
+ try:
+ self.handlers[self.ph.next().split(":")[0]](self.ph)
+ except IndexError:
+ # We're borked
+ raise
+ pass
+ except KeyError:
+ # It's a heading we don't care about
+ pass
+ else:
+ # This should never really be reached, as each handler
+ # is done.. Wait it hink they'll consume a line finding
+ # the "\n" ????
+ self.ph.next()
+
+ def handle_clients(self, h):
+ data = {}
+ while h.next() != os.linesep:
+ ind, line = get_indent(h.line)
+ if ind == 1:
+ client = self.parse_client_ind1(line)
+ try:
+ data[client.sessno].append(Client(client))
+ except KeyError:
+ data[client.sessno] = [Client(client)]
+ self.clients = data
+
+ def parse_session_header(self, line): # {{{
+ # TODO port to new api
+ dat = Data()
+ dat.sessno, dat.sessno, dat.nowins, dat.dump = line.split(" ", 3)
+ dat.sessno = dat.sessno.replace(":", "")
+ return dat
+ #}}}
+ def parse_session_ind3(self, line): # {{{
+ dat = Data()
+ parts = line.split(" ")
+ dat.winno = parts[0].replace(":", "")
+ ind = 1
+ while not _looks_like_size(parts[ind]):
+ ind += 1
+ dat.name = ' '.join(parts[1:ind])
+ dat.sizex, dat.sizey = map(int,
+ parts[ind][1:-1].split("x"))
+ dat.extra = " ".join(parts[ind+1:])
+ return dat
+ #}}}
+ def parse_session_ind5(self, line): # {{{
+ dat = Data()
+ parts = line.split(" ")
+ dat.paneno = parts[0].replace(":", "")
+ dat.pty = parts[1]
+ dat.extra = ' '.join(parts[2:])
+ return dat
+ #}}}
+
+ def handle_sessions(self, h):
+ data = {}
+ while h.next() != os.linesep:
+ # try:
+ try:
+ ind, line = get_indent(h.line)
+ if ind <= 1:
+ cur = Session(self.parse_session_header(line))
+ data[cur.sessno] = cur
+ elif ind == 3:
+ winref = cur.add_win(self.parse_session_ind3(line))
+ elif ind == 5:
+ # Record the TTY's against that window
+ winref.add_pane(self.parse_session_ind5(line))
+ except:
+ raise
+ self.sessions = data
+ def count_sessions(self):
+ return len(self.sessions)
+
+ def dump(self):
+ s = self.sessions
+ c = self.clients
+ for i, ival in self.sessions.iteritems():
+ if ival.sessno in c:
+ # TODO Work out verbose mode.. multiline?
+ self.out("Session %s: %s" % (i,
+ " ".join([self.dump_func(n) for n in c[ival.sessno]])))
+ else:
+ self.out("Session %s: [Detached]" % (i))
+ keys = ival.wins.keys()
+ keys.sort()
+ for j in keys:
+ self.out("%s: %s" % (j, self.dump_func(ival.wins[j])))
+ def search(self, searchTerm, field='name'):
+ s = self.sessions
+ c = self.clients
+ valid_sessions = {}
+ for i, ival in self.sessions.iteritems():
+ wins = []
+ keys = ival.wins.keys()
+ keys.sort()
+ # Try the same search on the parent
+ # TODO Search clients too
+ # In fact, list of everything we want to search, iterate
+ # over it.
+ try:
+ if searchTerm in ival.__getattribute__(field):
+ continue
+ except AttributeError:
+ # We're primarily interested in the children
+ pass
+ # Check children
+ for j in keys:
+ if searchTerm not in ival.wins[j].__getattribute__(field):
+ ival.del_win(j)
+ #wins.append(ival.wins[j])
+ if not ival.is_empty():
+ valid_sessions[i] = ival
+ self.sessions = valid_sessions
+
+def _looks_like_size(l):
+ return l[0] == "[" and l[-1] == "]"
+def dump(obj):
+ return obj.__dump__()
+def dump_verbose(obj):
+ return obj.__dump_verbose__()
+
+def get_indent(line):
+ """Get the indent of the line, then strip it and return both"""
+ c = 0
+ while line[c] == " ":
+ c += 1
+ return c, line[c:]
+
+class Data(object):
+ def iteritems(self):
+ return self.__dict__.iteritems()
+ def __repr__(self):
+ return repr(self.__dict__)
+
+class IndexData(object):
+ def __init__(self, data):
+ for key, value in data.iteritems():
+ self.__setattr__(key, value)
+ def __dump__(self):
+ return repr(self)
+class Session(IndexData):
+ def __init__(self, *args):
+ self.wins = {}
+ IndexData.__init__(self, *args)
+ def add_win(self, dat):
+ self.wins[dat.winno] = Win(dat)
+ return self.wins[dat.winno]
+ def del_win(self, idx):
+ del self.wins[idx]
+ def is_empty(self):
+ if len(self.wins):
+ return False
+ return True
+ def __repr__(self):
+ """Primarily debugging"""
+ return "%s: %s" % (self.sessno, repr(self.wins))
+
+class Win(IndexData):
+ def __init__(self, *args):
+ self.panes = []
+ self.pty = ''
+ IndexData.__init__(self, *args)
+ def __repr__(self):
+ """Primarily debugging"""
+ info = [self.name]
+ if self.panes > 1:
+ info.append("(%i panes) %s" % (len(self.panes), self.pty))
+ return " ".join(info)
+ def add_pane(self, dat):
+ self.panes.append(Pane(dat))
+ self.pty = ' '.join([i.pty for i in self.panes])
+ def __dump_verbose__(self):
+ return "%s [%ix%i] %s (%i panes) containing %s" % (
+ self.name, self.sizex, self.sizey,
+ self.extra, len(self.panes), self.pty)
+
+class Client(IndexData):
+ def __repr__(self):
+ """Primarily debugging"""
+ return "%s" % (self.pty)
+ def __dump_verbose__(self):
+ return "%s%s-> %s [%ix%i] [%s] %s" % ( os.linesep,
+ self.pty, self.idx, self.sizex, self.sizey,
+ self.term, self.extra.replace(os.linesep, ""))
+
+class Pane(IndexData):
+ pass
+
+
+class Watcher(object):
+ def __init__(self, ph):
+ # Ensure iterator is implemented
+ self.ph = ph
+ self.next()
+ def next(self):
+ if self.ph:
+ self.line = self.ph.readline()
+ if not self.line:
+ self.ph.close()
+ self.ph = None
+ return self.line
+ else:
+ return None
+ def __bool__(self):
+ return bool(self.line)
+ def __nonzero__(self):
+ return bool(self.line)
+
+def throw_error(msg):
+ print >>sys.stderr, msg
+ exit()
+
+def check_args(args):
+ if args.attach and not args.search:
+ throw_error("You cannot attach without searching")
+
+def main():
+ args = parser.parse_args()
+ check_args(args)
+ tinfo = TMuxInfo()
+ if args.verbose:
+ tinfo.dump_func = dump_verbose
+ else:
+ tinfo.dump_func = dump
+
+ tinfo.build()
+ if args.search:
+ tinfo.search(' '.join(args.search))
+ if args.searchpty:
+ tinfo.search(args.searchpty, field='pty')
+ if args.attach:
+ if tinfo.count_sessions() != 1:
+ throw_error("You must have exactly one result to attach (%i found)"
+ % (tinfo.count_sessions()))
+ else:
+ os.execvpe('tmux', ('tmux', 'a', '-t',
+ tinfo.get_session_id_somehow()), os.environ)
+ else:
+ tinfo.dump()
+
+
+# FINAL
+if __name__ == "__main__": main()

0 comments on commit 591e00b

Please sign in to comment.