Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Branch: master
Fetching contributors…

Cannot retrieve contributors at this time

executable file 4595 lines (3703 sloc) 158.385 kB
#!/usr/bin/env python
# FileMap -
# Public Domain License:
# This program was prepared by Los Alamos National Security, LLC at
# Los Alamos National Laboratory (LANL) under contract
# No. DE-AC52-06NA25396 with the U.S. Department of Energy (DOE). All
# rights in the program are reserved by the DOE and Los Alamos
# National Security, LLC. Permission is granted to the public to copy
# and use this software without charge, provided that this Notice and
# any statement of authorship are reproduced on all copies. Neither
# the U.S. Government nor LANS makes any warranty, express or implied,
# or assumes any liability or responsibility for the use of this
# software.
# Author: Mike Fisk <>
import ConfigParser
import atexit
import base64
import bisect
import copy
import cPickle
import cStringIO
import errno
import fcntl
import glob
import grp
import gzip
import itertools
import math
import optparse
import os
import pipes
import pwd
import pydoc
import random
import re
import select
import shlex
import shutil
import signal
import socket
import stat
import string
import subprocess
import sys
import tempfile
import time
import traceback
import urlparse
import zlib
import hashlib # New for python 2.5
sha = hashlib.sha1
import sha
sha = sha.sha
# Globals
Verbose = 0
RegressionTest = False
Locations = None
ConfigApply = {
"ssh": os.path.expanduser,
"syncdir": os.path.expanduser,
"rootdir": os.path.expanduser,
"cowdir": os.path.expanduser,
"python": os.path.expanduser,
"fm": os.path.expanduser,
"share": float,
"cpusperjob": int,
"processes": int,
"dynamicload": float,
"demand": int,
"replication": int,
"umask": int,
"all2all_retries": int,
ConfigDefaults = {
"ssh": "ssh -o GSSAPIDelegateCredentials=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -Ax",
"rsync": "rsync -tO",
"share": "1",
"syncdir": "/tmp/fmsync",
"python": "python",
"processes": "1000",
"dynamicload": ".1",
"inactive": False,
"queuebyhost": False,
"hostname": None,
"cpusperjob": "1",
"cowdir": "",
"demand": "3",
"replication": "1",
"umask": "0002",
"pathextras": "",
"environ": None,
"all2all_retries": "7",
def dictListAppend(map, key, value):
if not map.has_key(key):
map[key] = [value]
def printableHash(s):
h = sha(s).digest()
h = base64.b64encode(h, '+_') # Normal b64 but / is _
h = h.rstrip('\n\r =')
return h
Hostname = socket.gethostname().split('.')[0]
My_file_prefix = "_".join([Hostname, str(os.getpid())])
def isremote(hname):
if not hname:
return False
hname = hname.split('.')[0]
if Hostname == hname:
return False
return True
nonalphanumre = re.compile('[^a-zA-Z0-9\_\%\^\(\)\~\,\:\.\-]')
def escape(s):
"""Take a list of strings and return a name usable as a filename."""
(s, num) = nonalphanumre.subn(lambda m: "=%02X" % ord(, string.join(s))
s = s.replace('=20', '+')
base = os.path.basename(s)
# Make sure it's not too long to be a filename
if len(base) > 220:
# Use a hash of the remainder
base = base[:220] + printableHash(base[220:])
s = os.path.join(os.path.dirname(s), base)
return s
quoprire = re.compile('=([0-9a-fA-F]{2})')
def lsUnescapeChr(m):
c ='hex')
if c == '/':
c = '\\/'
if c == '\\':
c = '\\\\'
return c
def tabulate(lst, width, ncols=None):
if not lst:
return None
smallest = min([len(l) for l in lst])
if ncols == None:
# Initial case, start with each column being as wide as narrowest element
ncols = (width+2) / (smallest+2)
ncols = max(1, ncols)
# See if we can build a table with this many columns
minwidths = [smallest] * ncols
nrows = int(math.ceil(len(lst)*1.0 / ncols))
for c in range(0, ncols):
colwidth = minwidths[c]
for r in range(0, nrows):
i = c * nrows + r
if i > (len(lst)-1): break
lel = len(lst[c*nrows + r])
if lel > colwidth:
colwidth = lel
minwidths[c] = lel
# But expanding this one may mean we can have fewer columns
startcol = c
if ncols > 1 and (sum(minwidths) + 2*len(minwidths)) > (width+2):
return tabulate(lst, width, ncols-1)
# If we got here, then the sizes fit!
for r in range(0, nrows):
line = []
for c in range(0, ncols):
i = c*nrows + r
if i > (len(lst)-1):
line.append("%-*s" % (minwidths[c], lst[i]))
line = string.join(line, ' ')
line = line.rstrip()
print line
def rewriteList(lst, f):
"""Apply a function to each element in a list.
If the function returns None, use the original item.
If the function returns a Tuple, replace with _contents_ of tuple.
Else replace the element with what the function returns.
ret = []
for l in lst:
out = f(l)
if out == None:
elif type(out) == type(()):
for i in out:
return ret
if RegressionTest:
def rewriteTest(item):
if item == 1: return 2
if item == 2: return 'a','b'
if item == 3: return ['a', 'b']
if item == 4: return ()
assert( rewriteList([1,2,3,4,5], rewriteTest1) == [2, 'a', 'b', ['a', 'b'], 5])
def collapseIntervals(lst):
"""Take list of (start,end) interval tuples and return a similar list with all overlapping intervals combined."""
intervals = []
for (sstart,send) in lst:
found = False
for i in range(0, len(intervals)):
(start,end) = intervals[i]
if (sstart <= end and send > end) or (send >= start and sstart < start):
# Expand interval (note this may make this interval overlap with one of the ones already in the list)
#print >>sys.stderr, "Changing", intervals[i], "to", (start,end)
intervals[i] = (start,end)
found = True
elif send <= end and start >=start:
# Already encompassed
found = True
if not found:
# Add new interval to list
#print >>sys.stderr, "Adding", (sstart,send), intervals
# Repeat until no more merges left
while lst != intervals:
lst = intervals
intervals = collapseIntervals(lst)
return intervals
if RegressionTest:
assert(collapseIntervals([(1,3), (2,4), (2,3), (1,4), (0,2), (5,6), (4,4.5)]) == [(0,4.5), (5,6)])
def mkdirexist(path):
except OSError, e:
if e.errno == errno.EEXIST:
def coallesce(outputs, labels):
"""Take a list of multi-line strings and an equally long list of labels and return a string summarizing which labels had what output."""
assert(len(outputs) == len(labels))
sameas = {}
result = ""
for i in range(0, len(outputs)):
if outputs[i]:
header = [labels[i]]
for j in range(i+1, len(outputs)):
if outputs[j] and outputs[i].getvalue() == outputs[j].getvalue():
outputs[j] = None
header = string.join(header, ',')
header += ": "
for line in outputs[i]:
result += header + line
return result
def multiglob(globs):
"""Take a list of globs and return a list of all of the files that match any of those globs."""
assert(type(globs) == type([]))
ret = []
for g in globs:
ret += glob.glob(g)
return ret
num_re = re.compile('^(([\D]+)|(\d+))')
def numcmptokenize(x):
xlist = []
while x:
xn = num_re.match(x)
if not xn:
return x
xlen = len(
token = x[:xlen]
token = int(token)
x = x[xlen:]
return xlist
def numcmp(x, y):
"""Determine order of x and y (like cmp()), but with special handling for
strings that have non-numeric and/or numeric substrings. Numeric
substrings are ordered numerically even if they have differing numbers
of digits."""
# Turn each input in to a tokenized list that we can cmp() natively
xlist = numcmptokenize(x)
ylist = numcmptokenize(y)
r = cmp(xlist, ylist)
return r
def statOverlap(joba, jobb):
"""Return true iff duration of joba does not overlap with duration of jobb."""
if jobb['start'] > joba['finish'] or joba['start'] > jobb['finish']:
return False
return True
def makeVnodes(nodes):
# We may have multiple jobs at a time per node (e.g. SMP nodes), so
# treat those as multiple nodes. Since we don't have an explicit thread id,
# just assign them arbitrarily in a non-overlapping way
newnodes = {}
for nodename in nodes.keys():
vnodes = []
nodes[nodename].sort(lambda a,b: cmp(a['start'], b['start']))
for job in nodes[nodename]:
found = False
for vnode in vnodes:
if not statOverlap(job, vnode[-1]):
# Does not overlap with last job in this vnode, so append it here
found = True
if not found:
# Make new node
#print "now", vnodes
#print "Replacing", nodename, "with", len(vnodes)
for i in range(0, len(vnodes)):
newnodes[nodename + ";" + str(i)] = vnodes[i]
#print nodes.keys()
return newnodes
def plot(inputfile, pngfile, options):
import matplotlib.figure
import matplotlib.collections
import matplotlib.backends.backend_agg
stats = cPickle.load(inputfile)
# Load all stats and keep track of min and max inputsizes along the way
nodes = {}
jobs = {}
cmds = {}
copies = {}
minsize = None
mintime = None
maxsize = 0
maxtime = 0
totalsize = 0
nodename = 'nodename'
if options.group_host:
nodename = 'hostname'
stats.sort(lambda a,b: cmp(a['start'], b['start']))
for s in stats:
s['finish'] = s['timestamp']
if not nodes.get(s[nodename]):
nodes[s[nodename]] = []
if minsize == None:
minsize = s['inputsize']
minsize = min(minsize, s['inputsize'])
maxsize = max(maxsize, s['inputsize'])
if mintime == None:
mintime = s['start']
mintime = min(mintime, s['start'])
maxtime = max(maxtime, s['finish'])
totalsize += s['inputsize']
jobname = s['jobname']
cmds[jobname] = jobname[:4] + ': ' + ' '.join(s['cmd'])
# Keep track of earliest start time
if jobname in jobs:
jobs[jobname] = min(jobs[jobname], s['start'])
jobs[jobname] = s['start']
# Count duplicate processing of a file and assign each element a serial number (in order of start, not completion)
copies.setdefault(s['statusfile'], 0)
copies[s['statusfile']] += 1
if options.redundant and copies[s['statusfile']] > 1:
print s['statusfile'], 'redundant', s['start'], s['finish'], s['nodename']
s['copies'] = copies[s['statusfile']]
# Reduce to overlapping windows
intervals = [(i['start'], i['finish']) for i in stats]
intervals = collapseIntervals(intervals)
active = sum([i[1] - i[0] for i in intervals])
gaps = []
if intervals:
prev = intervals.pop(0)
for i in intervals:
# If more than 10% of the total duration is skipped here, then edit it out
if i[0] - prev[1] > 0.1*(active):
# A noticeable gap occurs between these
gaps.append((prev[1], i[0]))
#print >>sys.stderr, "Gap between", prev[1], i[0]
prev = i
nodes = makeVnodes(nodes)
if Verbose > 1:
print >>sys.stderr, "Data spans", seconds2human(maxtime-mintime), "with", seconds2human(active), "active"
print >>sys.stderr, "Packed into", len(nodes), "timelines"
# invert the jobs->start map
start = {}
for j in jobs:
start[jobs[j]] = j
starts = start.keys()
# make jobs a list sorted in order of start time
jobs = [start[j] for j in starts]
if Verbose > 2:
print >>sys.stderr, "Assigning colors to", len(jobs), "jobs"
colormap = ['blue','red','green','black', 'magenta', 'purple', 'cyan', 'yellow']
color = {}
fakebars = []
legends = []
for j in jobs:
color[j] = jobs.index(j)
color[j] = colormap[color[j] % len(colormap)]
if Verbose > 2:
print >>sys.stderr, "Job %s %s" % (color[j],j)
# broken_bars don't generate a legend, so make bars for legend manually
fakebars.append(matplotlib.patches.Rectangle((0, 1), 1, 1, fc=color[j]))
fig = matplotlib.figure.Figure(figsize=(8,10.5))
ax = fig.add_subplot(111)
# Add the legend
fontsize = 800. / max([len(j) for j in jobs]) #Scale for line width
fontsize = min(fontsize, 30. / len(jobs)) # Scale for number of lines
fontsize = min(10, fontsize) # No bigger than 10pt
fontsize = max(1, fontsize) # No smaller than 1pt
ax.legend(fakebars, legends, bbox_to_anchor=(0,1,1,1), frameon=False, loc=8, prop={'size': fontsize}, title="%s processed" % (bytecount2str(totalsize)))
ax.legend(fakebars, legends, bbox_to_anchor=(0,1,1,1), loc=8, prop={'size': fontsize}, title="%s processed" % (bytecount2str(totalsize)))
nodenames = nodes.keys()
# Build map of nodenames to y coordinates
nodes2y = {}
i = 0
for n in nodenames:
nodes2y[n] = i
i += 1
lines = []
if not maxsize:
maxsize = 0
maxsize = math.log(maxsize)
if not minsize:
minsize = 0
minsize = math.log(minsize)
for node in nodenames:
y = nodes2y[node]
if Verbose > 2:
print >>sys.stderr, "Plotting line", node, "with", len(nodes[node]), "elements"
for s in nodes[node]:
finish = s['finish']
start = s['start']
# Size is a log-scale number between 0 and 1 where 0 is the minimum size seen in this run and 1 is the max size seen in this run
insize = s['inputsize']
if insize:
insize = math.log(insize)
insize = 0
size = (insize-minsize)/(maxsize-minsize)
size = size * 0.7 + 0.3 #Makem min size be 30%
# CPU is % of wallclock time in user or system CPU time
if s['time'] == 0:
cpu = 1
cpu = (s['utime'] + s['stime']) / s['time']
# Adjust for time gaps
delta = 0
for g in gaps:
if g[1] <= start:
delta += g[1] - g[0]
lw = min(1, s['copies'] - 1) # Outline duplicates more and more
ax.broken_barh([(start-mintime-delta, finish-start)], [y-0.5*size,size], alpha=0.3+cpu, lw=lw, facecolor=color[s['jobname']])
# Minor (gap) labels
xlabels = []
xticks = []
delta = 0
for g in gaps:
xticks.append(g[0] - delta - mintime)
xlabels.append(seconds2human(g[1]-g[0]) + " idle")
delta += g[1] - g[0]
ax.set_xticks(xticks, minor=True)
ax.set_xticklabels(xlabels, minor=True, rotation=30, ha='right', size=8)
# Major (min/max) labels
t = maxtime-mintime-delta
ax.set_xticks([0, t])
ax.set_xticklabels([0, seconds2human(t)])#, rotation=30, ha='right')
ax.set_xlim(xmin=0, xmax=t)
majorlabels = []
majorlocs = []
minorlabels = []
minorlocs = []
majors = []
for n in nodenames:
if n.endswith(";0"):
name = n[:-2]
if "." in n:
major,minor = name.split(".", 1)
major = name
minor = None
if major in majors:
if minor:
minorlabels.append("." + minor)
ax.set_yticklabels(majorlabels, size=6)
if len(majorlocs) < 50:
ax.set_yticks(minorlocs, minor=True)
ax.set_yticklabels(minorlabels, size=4, minor=True)
ax.set_ylim(ymin=-2, ymax=len(nodenames)+2)
#ax.set_title("%s processed" % (bytecount2str(totalsize)))
canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(fig)
canvas.print_figure(pngfile, antialias=False, dpi=600)
return None
def bytecount2str(n):
unit = 'B'
units = ['kB', 'MB', 'GB', 'TB', 'PB']
while n >= 1000 and len(units):
n /= 1000.0
unit = units.pop(0)
return ("%3.1f" % n) + ' ' + unit
def seconds2human(s):
if s < 60:
return "%.0fs" % s
if s < 3600:
return "%.0fm" % (s/60.)
if s < 86400:
return "%.1fh" % (s/3600.)
if s < 864000:
return "%.1fd" % (s/86400.)
return "%.1fwk" % (s/604800.)
class MethodOptionParser(optparse.OptionParser):
"""OptionParser to be instantiated by dispatch methods of a class. Sets %prog to be the calling function name and epilog to be the doc string for that function. Also prints options in usage string more like man conventions instead of just [options]."""
def __init__(self, *args, **kwargs):
if not kwargs.has_key('epilog'):
caller = sys._getframe(1)
pmethod = caller.f_code.co_name
pself = caller.f_locals['self']
method = pself.__class__.__dict__[pmethod]
kwargs['prog'] = pmethod
self.Epilog = method.__doc__ or ""
if kwargs.has_key('fixed'):
self.fixed = kwargs['fixed']
del kwargs['fixed']
self.fixed = None
self.numrequired = 0
if self.fixed:
for i in self.fixed:
if i[0] != "[":
self.numrequired += 1
optparse.OptionParser.__init__(self, *args, **kwargs)
def parse_args(self, args):
(options, args) = optparse.OptionParser.parse_args(self, args)
if len(args) < self.numrequired:
print >>sys.stderr, "Required argument(s) missing"
return (options, args)
def get_usage(self):
if self.fixed != None:
flags = []
options = ''
for o in self.option_list:
if o.action == 'store_true':
flags += o._short_opts[0].lstrip('-')
elif o.action == 'help':
opts_tmp_fmts = []
opts_tmp_vals = []
if o._short_opts:
if o._long_opts:
options += ("[" + " ".join(opts_tmp_fmts) + "]") % tuple(opts_tmp_vals)
if o.action == 'append':
options += '...'
options += ' '
flags = string.join(flags, '')
if flags:
options += "[-" + flags + "] "
self.usage = "%prog " + options + string.join(self.fixed)
return optparse.OptionParser.get_usage(self)
def print_help(self):
r = optparse.OptionParser.print_help(self)
# For python < 2.5, we have to print our own epilog
print self.Epilog
return r
class SmartDict(dict):
def __getattr__(self, name):
return self[name]
except KeyError, e:
raise AttributeError(e)
def __setattr__(self, name, value):
self[name] = value
def tryrmtree(dir):
def mkstemp(prefix, suffix):
dir = "/%s/fm-%s/%d" % (os.environ.get('TMPDIR', '/tmp'), os.environ.get('USER', 'none'), os.getpid())
atexit.register(tryrmtree, dir)
return tempfile.mkstemp(prefix=prefix, dir=dir, suffix=suffix)
def mkdtemp(prefix):
dir = "/%s/fm-%s/%d" % (os.environ.get('TMPDIR', '/tmp'), os.environ.get('USER', 'none'), os.getpid())
atexit.register(tryrmtree, dir)
return tempfile.mkdtemp(prefix=prefix, dir=dir)
class FmLocations:
def __init__(self, copyFrom=None, locs={}, slave=False):
self.procs = None = None
self.cleanup = None
self.cached_dstnodes = {}
if copyFrom and not slave:
self.config = copy.deepcopy(copyFrom.config)
self.config.locs = locs
self.procs = copyFrom.procs =
self.this = copyFrom.this
elif slave:
self.config = copyFrom.config
# Keep a copy of the config around so our children (like redistribute) can reference it
fd, name = mkstemp(prefix="locations", suffix=".pickle")
#print >>sys.stderr, "locations is", name
self.cleanup = name
os.environ['FMCONFIG'] = name
fh = os.fdopen(fd, 'w')
self.config = SmartDict() # like a named tuple but for older python
self.config.locs = {}
filename = os.environ.get('FMCONFIG')
#print >>sys.stderr, "Env filename is", filename
if not filename:
if os.path.isfile("filemap.conf"):
filename = "filemap.conf"
elif os.path.isfile(os.path.expanduser("~/.filemap.conf")):
filename = os.path.expanduser("~/.filemap.conf")
filename = "/etc/filemap.conf"
os.environ['FMCONFIG'] = filename
fh = open(filename)
raise Exception("Unable to locate config file. Set FMCONFIG or place filemap.conf in . or /etc")
if filename.endswith('.pickle'):
self.config = cPickle.load(fh)
if self.this.pathextras:
os.environ['PATH'] += ":" + self.this.pathextras
if self.this.environ:
for pair in self.this.environ.split():
k,v = pair.split('=')
os.environ[k] = v
if self.this.umask:
if not = self.getSynchronizer()
if not self.procs:
self.procs = Procs()
def parseConfigFile(self, fh):
"""Populate self.config from config file."""
# Read from file
self.configparser = ConfigParser.SafeConfigParser()
#print >>sys.stderr, "Reading config", filename, self.configparser.sections()
self.config.slurmfe = self.config_get("global", "slurmfe", None)
if self.config.slurmfe == "localhost":
self.config.slurmfe = "sh -c"
# Unless we were passed an explicit list of locations, grab all
# from the config file.
if not self.config.locs:
nl = self._getSlurmNodes(self)
for s in self.configparser.sections():
if s == "global":
r = self._expandList(s)
for (vals,name) in r:
self.parseLocation(s, name=name, listvalues=vals)
# Use global config values by default (thisis() can override later)
# Once all config files settings are imported, prep the location for use (apply defaults, etc.)
for l in self.config.locs.values():
if nl:
# Check SLURM_NODELIST to make sure we've been allocated this node
if l.hostname not in nl:
l.inactive = True
#print "Marking",, "inactive per SLURM", l.hostname, nl
# Also prep the global section
def _getSlurmNodes(self, alloc=True):
nl = os.environ.get("SLURM_NODELIST")
if not nl and not self.config.slurmfe:
# Must not be running under SLURM
return None
if not nl:
nl = subprocess.Popen(self.config.slurmfe.split() + ["squeue -t RUNNING -u $USER -o %i/%j/%N"], stdout=subprocess.PIPE, stdin=None)
for line in nl.stdout:
line = line.strip()
(id,name,nl) = line.split('/')
#print >> sys.stderr, "SLURM jobid", id, name, nl
if name != "FM":
nl = None
os.environ["SLURM_NODELIST"] = nl
os.environ["SLURM_JOBID"] = str(id)
#print >> sys.stderr, "SLURM jobid", id, nl
if not nl:
if alloc:
if Verbose > 0: print >>sys.stderr, "Starting new FM job in SLURM"
subprocess.Popen(self.config.slurmfe.split() + ["salloc -k -J FM --nodes 1-9999 --overcommit --share --ntasks-per-node 2 --no-shell"], stdout=sys.stdout, stderr=sys.stderr, stdin=None).wait()
return self._getSlurmNodes(alloc=False)
print >>sys.stderr, "Unable to find or start FM job found in SLURM"
# Check SLURM_NODELIST to make sure we've been allocated this node
nl = self._expandList(nl)
if Verbose > 0: print >>sys.stderr, "Active SLURM nodes: ", nl
nl = [i[1] for i in nl]
return nl
def __del__(self):
if self.cleanup:
def _expandList(self, liststr, values=[]):
"""Returns a list of tuples where each tuple contains a list of 0 or more interval values and the resulting string from those values."""
#Support () or [] syntax for SLURM or ConfigParser cases
if not hasattr(self, 'range_fe'):
self.range_re = re.compile("([^\(\[]*)[\(\[]([^\)\]]*)[\)\]](.*)$")
r = self.range_re.match(liststr)
if not r:
return [(values, liststr)]
prefix =
lst =
expanded = []
suffix =
first = None
for interval in lst.split(","):
if "-" in interval:
(min,max) = interval.split("-")
expanded += range(int(min), int(max)+1)
if not first: first = min
node = int(interval)
node = interval
if not first: first = interval
#print >>sys.stderr, 'Expanding using %s as template' % first
# Recursively try expanding each result (which may have a second list in it) and flatten the resulting lists
ret = []
for i in expanded:
# It may be padded with leading zeros, the following will preserve that formatting:
# First is the first example node number string.
ret += self._expandList(prefix + str(first)[:-len(str(i))] + str(i) + suffix, values+[i])
return ret
def parseLocation(self, stanza, name=None, listvalues=[]):
if name:
nodename = str(name)
nodename = str(stanza)
# Reuse (add attributes to) existing Location if we already saw this nodename
l = self.config.locs.get(nodename)
if not l:
l = FmLocation()
if stanza != "global":
self.config.locs[nodename] = l
self.this = l # Assume by default that we're the head node and use global defaults = nodename
l.config = {}
l.listvalues = listvalues
elif not l.listvalues:
l.listvalues = listvalues
#elif [str(i) for i in l.listvalues] != [str(i) for i in listvalues]:
#print >>sys.stderr, "Error: conflicting range information for node", nodename, l.listvalues, listvalues
# Import all configparser settings into a dictionary
for opt,val in self.configparser.items(stanza, raw=True):
l[opt] = val
def prepLocation(self, l):
# Inherit all global defaults if not over-ridden
for k,v in self.configparser.items("global", raw=True):
if not l.has_key(k):
l[k] = v
# Inherit all hard-coded defaults if not over-ridden
for k,v in ConfigDefaults.items():
if not l.has_key(k):
l[k] = v
# Apply conversion/expansion functions
for k,f in ConfigApply.items():
if l.has_key(k):
l[k] = f(l[k])
l.sshcmd = shlex.split(l.ssh, comments=True)
l.rsynccmd = shlex.split(l.rsync, comments=True)
l.rsynccmd[0] = os.path.expanduser(l.rsynccmd[0])
if != "global":
if l.values:
if l.hostname:
#if '%' not in l.hostname:
#print >>sys.stderr, "Warning %s does not reference given instance %s" % (l.hostname, str(l.values))
l.hostname = l.hostname.format(*l.listvalues)
print >>sys.stderr, "Error formatting", l.hostname, "with", l.listvalues
l.rootdir = l.rootdir.format(*l.listvalues)
l.jobdir = l.rootdir + "/jobs"
if l.queuebyhost:
l.qname = l.hostname
l.qname =
if 'fm' not in l: = l.rootdir + "/sys/fm"
l.fmcmd = [l.python,]
def config_get(self, section, key, defval=None, raw=False, merge=False, expanduser=False):
if self.configparser.has_option(section, key):
val = self.configparser.get(section, key, raw=raw)
elif merge:
# Use previous value
if self.configparser.has_option("global", key):
val = self.configparser.get("global", key, raw=raw)
val = defval
if expanduser and val:
val = os.path.expanduser(val)
return val
def thisis(self, nodename):
self.thisname = nodename
if nodename == "global":
self.this = self.config.locs['global']
self.this = self.config.locs[nodename] = self.getSynchronizer()
def getSynchronizer(self):
if self.this.syncdir.startswith("redis:"):
return RedisSynchronizer(self)
return FileSynchronizer(self)
def pickHosts(self):
"""Return a subset of locations containing only one location per hostname"""
hosts = set()
newlocs = {}
for l in self.config.locs.values():
if l.inactive or l.hostname in hosts:
newlocs[] = l
newlocs = FmLocations(copyFrom=self, locs=newlocs)
newlocs.this = self.this
return newlocs
def nodes(self, seed=None):
"""Generate a list of pseudo-random locations.
Specify a seed to get deterministic results (e.g. for a given extension
number, file name, path, etc). Generate a list of node names."""
# This implementation uses a Chord-like structure where nodes map to points on a ring.
# The seed determines a point of interest on the ring.
# Yield the nodes closest to that point (but after the point, for implementation simpicity)
# This Chord approach allows the map of seed->nodes to be fairly stable as nodes are added or deleted
# Build the node points for the Chord algorithm used by nodes()
if 'chord' not in self.config:
#print >>sys.stderr, "Generating chord assignments"
candidates = self.config.locs.keys()
mapping = {}
for node in candidates:
point = sha(node)
# For small numbers of nodes, we need multiple points per node in order to have any stability
for _ in range(int(10 * self.config.locs[node].share)):
mapping[point.hexdigest()] = node
# Use a hash feedback loop to generate pseudo-random list of additional points
point = sha(point.digest() + node)
points = mapping.keys()
self.config.chord = mapping
self.config.chord_points = points
#print >>sys.stderr, "Getting chord for this"
if not seed:
seed = str(random.random())
#print >>sys.stderr, self.config
# Add point to set
seed_point = sha(seed).hexdigest()
# Construct generator of points after seed_point
# Bisect the sequence
point = bisect.bisect_left(self.config.chord_points, seed_point)
# Build a generator of indices
indexes = itertools.chain(xrange(point, len(self.config.chord_points)), xrange(0, point))
# Turn into a generator of points
candidate_points = (self.config.chord_points[i] for i in indexes)
#print >>sys.stderr, "generating chord"
# Generate nodes without duplicates
used_candidates = set()
for point in candidate_points:
node = self.config.chord[point]
if node not in used_candidates:
yield node
def pickn(self, seed=None, n=None):
"""Pick some pseudo-random locations. The number of locations
chosen is based on the replication factor in the config.
Specify a seed to get deterministic results (e.g. for a given
extension number.) The return value is the list of node names."""
# Try to yield the right $n$ for this seed and if none of those are active, the next replicate after that
foundactive = False
if not n:
n = self.this.replication
nodes = self.nodes(seed=seed)
i = 0
foundactive = 0
for l in nodes:
i += 1
if foundactive > n:
# Stop if we've examined $n$ and at least 1 was active
if not self.config.locs[l].inactive:
foundactive += 1
yield l
if i < n:
print >>sys.stderr, "Cannot pick %d locations" % (n)
def pick(self, n=None, seed=None):
"""Return a new FmLocations instance that uses a subset of the
locations of this instance."""
used = self.pickn(seed, n=n)
useddict = {}
for u in used:
useddict[u] = self.config.locs[u]
return FmLocations(copyFrom=self, locs=useddict)
def forAllLocal(self, cmd):
"""Same ase forAll(), but CMD will be run as arguments to "fm _local"."""
if Verbose:
verbose = "-" + "v" * Verbose
lcmd = ["fm", verbose, "_local", "-n", "%(NODE)"]
lcmd = ["fm", "_local", "-n", "%(NODE)"]
inputs = {'cmd': cmd, 'config': self.config}
return self.forAll(lcmd, subst=True, stdinstr=zlib.compress(cPickle.dumps(inputs)), tag=string.join(cmd))
def forAll(self, Cmdv, src=[], dst=[], absolute=False, subst=False, trailer=None, glob=True, stdinstr=None, call=False, tag=None, quote=True):
if src and type(src) != type([]):
src = [src]
if dst and type(dst) != type([]):
dst = [dst]
if not tag:
tag = string.join(Cmdv + src + dst)
if stdinstr:
stdin = subprocess.PIPE
for loc in self.config.locs.values():
if loc.inactive:
if subst:
cmd = loc.subst(Cmdv)
src = loc.subst(src)
cmd = list(Cmdv)
if absolute:
s = list(src)
d = list(dst)
s = [loc.rootdir + "/" + f for f in src]
d = [loc.rootdir + "/" + f for f in dst]
if cmd[0] == "fm":
cmd = loc.fmcmd + cmd[1:]
if isremote(loc.hostname):
if quote and "SLURM_JOBID" not in os.environ:
# The remote sshd/shell will expand globs (unless it's SLURM's srun), so quote all the individual args
s = ["'" + i + "'" for i in s]
d = ["'" + i + "'" for i in d]
filestr = ' '.join(s+d)
if "SLURM_JOBID" in os.environ and not quote and ('*' in filestr or '?' in filestr):
# Assume srun. Srun doesn't expand globs or run a shell automatically, so we have to explicitly run /bin/sh
cmd = loc.sshcmd + [loc.hostname, "sh", "-c", subprocess.list2cmdline(cmd + s + d)]
cmd = loc.sshcmd + [loc.hostname] + cmd + s + d
if s and glob:
#time.sleep(random.random()) #RHEL5 + thumper glob issues
s = multiglob(s)
if not s:
if d and glob:
d = multiglob(d)
if not d:
cmd += s + d
if trailer:
cmd += trailer
self.procs.Popen(cmd, call=call, stdinstr=stdinstr, queue=loc.qname, tag=tag)
return self.procs
def expanddir(self, path):
items = []
for root,dirs,files in os.walk(path):
for f in files:
fp = os.path.join(root, f)
return items
def put(self, srclist, dst, relativeroot=False, procs=None, pickn=False, hashbasename=False, serialize=False, remove=False):
# We will glob this ourselves since we implicitly expand directories to the next level and, in the case of hashbasename, have to get the final filenames rather than globs in order to know where to put them.
if not procs:
procs = self.procs
relative = {}
# Pull out any directories we have to recurse.
# Imagine a list like the following:
# a (a,)
# /b/ (/b/c,b)
# (b/c/d,b/c)
# /e (/e,)
# /f/ (/f/g,f)
# (/f/h,f)
# i/ (i/j,i)
# (i/k,i
# We can only do this with fully qualified paths
# Luckily, we use rsync which allows us to use "/./" elements to show which part of the path to preserve on the other side
# When "relativeroot" is true, we're copying between nodes within the virtual filesystem and want to preserve paths
if relativeroot:
assert(dst == "/")
# An intervening /./ in the path is how we denote the leading part that shouldn't be duplicated on the destination
srcs = [self.this.rootdir + "/./" + s for s in srclist]
srcs = []
for s in srclist:
s = s.rstrip('/')
dirname = os.path.dirname(s) or '.'
s = dirname + '/./' + os.path.basename(s)
srcs = multiglob(srcs)
if not srcs:
print >>sys.stderr, "No such file:", srclist
return procs
srcfiles = []
for a in srcs:
# Convert directory arguments to lists of files so we can assign them to the proper nodes
if os.path.isdir(a):
srcfiles += self.expanddir(a)
tag = "put %s %s" % (srclist, dst)
redundants = self.pickput(srcfiles, dst, procs, tag, pickn, hashbasename, serialize, remove, relativeroot)
if remove and redundants:
errors, results = procs.collect()
# If no errors, then remove redundants
if results and not errors:
if Verbose > 1:
print >>sys.stderr, "Removing %d of %d: %s" % (len(redundants), len(srcs), list(redundants))
for r in redundants:
if os.path.isdir(r):
return procs
def pickput(self, srcs, dst, procs, tag, pickn, hashbasename, serialize, remove, relativeroot):
nodesrcs = {}
relative = None
for s in srcs:
if hashbasename:
seed = os.path.basename(s)
elif "/./" in s:
# Relative path
seed = "/".join([dst, s.split("/./")[1]])
while '//' in seed:
seed = seed.replace('//','/')
assert(relative != False)
relative = True
seed = "/".join([dst, os.path.basename(s)])
assert(not relative)
relative = False
choices = self.pickdstnodes(seed, pickn, hashbasename)
for c in choices:
dictListAppend(nodesrcs,, s)
if remove:
redundants = set(srcs)
if self.thisname in nodesrcs:
#print >>sys.stderr, 'Remove', nodesrcs[self.thisname], 'from', redundants
redundants -= set(nodesrcs[self.thisname])
redundants = None
if relativeroot and self.thisname in nodesrcs:
# Remove no-op copy to self (which could actually delete the files)
self.rsyncput(nodesrcs, dst, serialize, procs, tag)
return redundants
def pickdstnodes(self, dst, pickn, hashbasename):
# We always specify a seed when calling pick() so there is stability in which nodes work is mapped to.
# It's nice if they go to nodes that might have already received the store in a previous execution.
dst = dst.replace("/./","/")
dst = dst.replace("//","/")
if pickn:
# Pick which nodes get this file
if hashbasename:
seed = os.path.basename(dst)
# If we're just using basename, we're probably in a redistribute and there will be a lot of reoccurrences of the same basename, so cache the pickn() results.
if pickn not in self.cached_dstnodes:
self.cached_dstnodes[pickn] = {}
if seed in self.cached_dstnodes[pickn]:
#print >>sys.stderr, "PICK CACHE", hashbasename, pickn, seed, [c.hostname for c in self.cached_dstnodes[pickn][seed]]
return self.cached_dstnodes[pickn][seed]
seed = dst
if type(pickn) is int:
choices = self.pickn(seed=seed, n=pickn)
# use default n
choices = self.pickn(seed=seed)
choices = [self.config.locs[c] for c in choices]
if pickn and hashbasename:
# Add to cache
self.cached_dstnodes[pickn][seed] = choices
#print >>sys.stderr, "PICK", hashbasename, pickn, seed, choices, [c.hostname for c in choices]
return choices
# Use all destinations that are active
retlist = []
for c in self.config.locs:
if not self.config.locs[c].inactive:
return retlist
def rsyncput(self, nodesrcs, dstdir, serialize, procs, tag):
# nodesrcs is a dictionary indexed by nodenames each containing a list of fully-qualified local srcs.
# Iff relative is True, then the local source should contain a /./ element and path elements after /./ will be created in the dst.
# Iff serialize is True, then do one node at a time.
# tag is used solely for verbose status reporting.
dsts = nodesrcs.keys()
i = 0
for dst in dsts:
i += 1
if serialize:
if serialize > 1:
queue = "rr" + str(i % serialize)
queue = "serialize"
queue = dst
self.rsync(False, dst, nodesrcs[dst], dstdir, queue, procs, tag)
def rsync(self, is_get, node, inputlist, dstdir, queue, procs, tag):
# Build rsync style "hostname:/path/to/root" specifications
loc = self.config.locs[node]
remoteroot = loc.rootdir
remotepath = remoteroot
if isremote(loc.hostname):
remotepath = loc.hostname + ":" + remotepath
if is_get:
# The remote paths may contain wildcards, so we have to use --include-from instead of --files-from
# Also the remote side might be a directory, so we use -r
srcpath = remotepath + "/"
dstpath = dstdir
flags = ["-rp", "--include-from=-", "--exclude=*"]
il = []
for name in inputlist:
# Clean up double slashes
while '//' in name:
name = name.replace('//','/')
# remote source doesn't seem to process /./ relative paths (which we rely on for local source)
name = name.replace('/./', '/')
# Any name could be a directory, so add a recursive mask to get its children
il.append(os.path.join(name, "***"))
# Any name could be a file, so add it without a /***
# we have to explicitly include every parent directory as well
name = os.path.dirname(name)
while name and name != "/":
il.append(name + "/")
name = os.path.dirname(name)
inputlist = il
# We have resolved all the wildcards for node assignment, so just generate a list of files
if inputlist[0][0] == "/":
srcpath = "/"
srcpath = "."
dstpath = remotepath + "/" + dstdir
while '//' in dstpath:
dstpath = dstpath.replace('//','/')
flags = ["-R", "--files-from=-"]
inputstr = '\n'.join(inputlist)
cmd = self.this.rsynccmd + ["-utLde", loc.ssh] + flags + [srcpath, dstpath]
#print >>sys.stderr, "inputlist:", inputlist
procs.Popen(cmd, queue=queue, stdinstr=inputstr, node=node, tag=tag)
def get(self, args, procs=None, outfile=sys.stdout, relative=False):
"""Copy files from the cloud to local storage.
Note that if the source is wildcard, then output will be put in a
sub-directory of the destination. The name of that subdirectory
will be the directory name preceding the first wildcard."""
p = MethodOptionParser(fixed=["src... [dst]"])
p.add_option("-c", "--cat", action="store_true", help="Cat files to stdout (do not specify a destination)")
p.add_option("-n", "--name", action="store_true", help="Show file name when catting files")
p.add_option("-e", "--errors", action="store_true", help="Handle .stderr files specially")
(options, args) = p.parse_args(args)
if (not and (len(args) < 2):
return 1
## rsync really (suprisingly) only supports relative anyway
#if relative:
# args = ["/./" + a for a in args]
# args = [os.path.dirname(a) + '/./' + os.path.basename(a) for a in args]
dst = mkdtemp(prefix="get") + "/"
srcs = args
dst = args[-1]
srcs = args[:-1]
for loc in self.config.locs.values():
if loc.inactive: continue
self.rsync(True,, srcs, dst,, self.procs, "get %s" % (string.join(args)))
empty = True
for (dirpath, dirnames, filenames) in os.walk(dst):
for f in filenames:
if f == ".status":
#cat'ing .status files is never(?) the right answer and rsync tends to fetch .* along with *
fname = os.path.join(dirpath, f)
if os.path.getsize(fname):
relname = fname[len(dst):]
if f == ".stderr" and options.errors:
print >>sys.stderr, "=== %s ===" % relname
print >>sys.stderr, "======"
print >>outfile, "=== %s ===" % relname
empty = False
if and not empty:
print >>outfile, "======"
return self.procs
def processes(self):
return sum(i.cpusperjob for i in self.config.locs.values())
class FmLocation(SmartDict):
def subst(self, args):
return [n.replace("%(ROOTDIR)", self.rootdir).replace("%(SYNCDIR)", self.syncdir).replace("%(NODE)", for n in args]
class Poll:
def __init__(self):
self.pollobj = select.poll()
self.kqueue = None
self.kqueue = select.kqueue()
self.pollobj = None
self.reads = set()
self.writes = set()
def add(self, fd, for_read=False, for_write=False):
if self.pollobj:
mask = 0
if for_read:
mask |= select.POLLIN
if for_write:
mask |= select.POLLOUT
self.pollobj.register(fd, mask|select.POLLHUP|select.POLLERR)
#print >>sys.stderr, "POLL add", fd, for_read, for_write
if for_read:
self.kqueue.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD, fflags=select.KQ_NOTE_LOWAT)], 0)
if for_write:
self.kqueue.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)], 0)
def unregister(self, fd):
if self.pollobj:
#print >>sys.stderr, "POLL unregister", fd, fd in self.reads
if fd in self.reads:
self.kqueue.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
if fd in self.writes:
self.kqueue.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
def poll(self, timeout):
if self.pollobj:
l = self.pollobj.poll(timeout)
except select.error, e:
if e[0] == errno.EINTR:
for fd,event in l:
if event & select.POLLIN:
yield (fd, 'r')
if event & select.POLLOUT:
yield (fd, 'w')
#print >>sys.stderr, "POLL poll", timeout
if not self.reads and not self.writes:
#print >>sys.stderr, "Skipping poll with nothing registered"
if timeout == -1:
timeout = None
events = self.kqueue.control(None, 1, timeout)
for e in events:
if e.filter == select.KQ_FILTER_READ:
yield (e.ident, 'r')
elif e.filter == select.KQ_FILTER_WRITE:
yield (e.ident, 'w')
print >>sys.stderr, "unknown filter", e
class Procs:
"""This class is used to launch some number of child processes and reap them.
It provides methods to instantiate a process for each node."""
def __init__(self, retries=0):
self.queues = {}
self.poll = Poll()
self.fd2proc = {}
self.pids = {}
self.numprocspolling = 0
self.retries = retries
def isempty(self):
return (not self.pids)
def Popen(self, cmdv, **kwargs):
kwargs['cmdv'] = cmdv
queue = kwargs.get('queue')
if not kwargs.get('tag'):
kwargs['tag'] = cmdv[0]+"..."
if not kwargs.get('retries'):
kwargs['retries'] = self.retries
if queue and self.queues.get(queue) != None:
# If queue exists (even if it is an empty queue), then something is running
if Verbose > 2:
print >>sys.stderr, "Queueing", kwargs['tag'], "to", queue, len(self.queues[queue])
# No queue, means we can run now, but create an empty queue to block subsequent tasks
self.queues[queue] = []
return self._PopenSave(**kwargs)
def _PopenSave(self, **kwargs):
# Call _PopenNow, but save a copy of the kwargs for later retries
proc = self._PopenNow(**kwargs)
proc.kwargs = kwargs
return proc
def PopenWithPipes(self, cmdv, stdin=None, stdout=None, stderr=None, **kwargs):
"""This is a wrapper for subprocess.Popen that supports pipelines as in Popen(["ls", "|", "wc", "-l"])"""
results = []
thiscmd = []
for i in range(len(cmdv)):
if cmdv[i] == "|":
# There are more words, so make an intermediate pipe
r = subprocess.Popen(thiscmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True, **kwargs)
print >>sys.stderr, "Error", sys.exc_value, "executing", thiscmd
# Next element of the pipe gets this output
stdin = r.stdout
thiscmd = []
# This is the last command in the pipeline
if thiscmd:
r = subprocess.Popen(thiscmd, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, **kwargs)
print >>sys.stderr, "Error", sys.exc_value, "executing", thiscmd
# Here's how we handle these pipelines outside of this function. One (that last one)
# will be returned up the call stack and tracked as the unit of work. All
# will be placed in the reaping and polling datastructures and handles accordingly.
# Only when all of the parts left will the tracked one be returned up and passed to FinalizeItem.
# Now close our reference to intermediate pipes so we don't poll on them
for i in range(1, len(results)):
if results[i].stdin:
results[i].stdin = None
if len(results) > 1:
for i in range(len(results)):
results[i].partof = results[-1]
# We will count this down (including self) to known when this pipeline/Item is done computing
results[-1].parts_left = len(results)
results[-1].partof = None
for i in range(0, len(results) - 1):
results[i].stdout = None
results[-1].parts = results[:-1]
return results
def add_poll(self, fh, r, for_read=False, for_write=False):
fd = fh.fileno()
self.poll.add(fd, for_read, for_write)
self.fd2proc[fd] = r
# make non-blocking file
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def _PopenNow(self, cmdv=[], prefix=None, stderr=subprocess.PIPE, stdout=subprocess.PIPE, call=False, tag=None, queue=None, node=None, stdinstr=None, cwd=None, retries=None):
# This function should only be called by _PopenSave
# Note, we prefer to have a stdout so that poll() can tell when it closes and then reap the child
if stdinstr:
stdin = subprocess.PIPE
stdin = file('/dev/null')
first = True
for r in self.PopenWithPipes(cmdv, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd):
r.stdinstr = None # Default to nothing to finalize
if r.stderr or r.stdout or (first and stdinstr):
self.numprocspolling += 1
if r.stdout:
self.add_poll(r.stdout, r, for_read=True)
r.stdoutbuf = cStringIO.StringIO()
if r.stderr:
self.add_poll(r.stderr, r, for_read=True)
r.stderrbuf = cStringIO.StringIO()
if first and stdinstr:
self.add_poll(r.stdin, r, for_write=True)
r.stdinstr = stdinstr
first = False
r.tag = tag
r.start = time.time()
r.queue = queue
if queue and not node:
node = queue
r.node = node
r.cmd = cmdv
self.pids[] = r
if Verbose > 2:
print >>sys.stderr,, cmdv, r.node
return r #Last one, and the one that others are "partof"
def wakeup(self, qname):
if not qname:
#Wake-up anything on this queue
if self.queues[qname]:
kwargs = self.queues[qname].pop(0)
# Leave queue, even if empty, so things wait for us to finish
if Verbose > 2:
print >>sys.stderr, "Waking", kwargs
# Queues was already empty, so now delete it
del self.queues[qname]
def collect(self, ignoreErrors=False, labelPrint=False, maxErrors=None, markInactive=False, stopOnError=False):
"""Execute any active or queued processes and return, a count of the number of errors and a list of the processes objects that ran."""
results = []
errors = 0
total = 0
stopping = False
while True:
# Block if not stopping (if stopping, reap everything that's done already and then stop)
p = self.waitpid(not stopping)
if p == None:
total += 1
if p.status:
errors += 1
if stopOnError:
print >>sys.stderr, "Terminating early due to an error"
stopping = True
if Verbose > 2:
ec = os.WEXITSTATUS(p.status)
print >>sys.stderr, "%s: %s status %d for %s" % (p.queue, seconds2human(p.time), ec, p.tag)
waiting = [l.queue for l in self.pids.values()]
if waiting and len(waiting) < len(results):
print >>sys.stderr, "Still waiting on", string.join(waiting)
if not results:
return None, results
tag = "for " + results[0].tag #XXX not al tags identical
if stopping or (Verbose > 1):
print >>sys.stderr, "--- Report", tag
tag = ""
mean = 0.0
for p in results:
mean += p.time
mean /= len(results)
stddev = 0.0
for p in results:
stddev += (p.time - mean)**2
stddev /= len(results)
stddev = math.sqrt(stddev)
if stddev > 0:
for p in results:
deviations = (p.time - mean) / stddev
if deviations > 2:
print >>sys.stderr, "Node %s slow by %g standard deviations" % (p.node, deviations)
print >>sys.stderr, "Average time %gs, max %gs, stddev %gs %s" % (mean, max([p.time for p in results]), stddev, tag)
if not stopping:
for k,v in self.queues.items():
assert(not v)
assert(not self.pids)
assert(self.numprocspolling == 0)
self.poll = Poll() #Reinit, just in case not everything was unregister()'d
if not ignoreErrors and errors:
print >>sys.stderr, "Error, %d/%d subprocess(es) returned error %s" % (errors, total, tag)
if labelPrint:
sys.stdout.write(coallesce([f.stdout for f in results], [f.node for f in results]))
if stopping or labelPrint or Verbose > 0:
sys.stderr.write(coallesce([f.stderr for f in results], [f.node for f in results]))
if Verbose > 1:
print >>sys.stderr, "---"
if stopping:
return None
if not ignoreErrors and errors:
if maxErrors != None and errors > maxErrors:
print >>sys.stderr, "Terminating early due to %d errors" % errors
if markInactive:
failed = [p.node for p in results if p.status]
if failed:
print >>sys.stderr, "Marking nodes inactive: %s" % ','.join(failed)
for f in failed:
markInactive.locs[f].inactive = True
return errors, results
def poll_remove(self, fh):
# Make blocking again
fd = fh.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl & (~os.O_NONBLOCK))
del self.fd2proc[fd]
def finalizeProc(self, p):
# Record compute time
after = os.times()
p.time = time.time() - p.start
p.utime = after[2] - p.starttimes[2]
p.stime = after[3] - p.starttimes[3]
del p.starttimes
# Finish building the output buffers
if p.stdout != None:
#print >>sys.stderr, "finalized",, p.stdout.fileno()
p.stdout = p.stdoutbuf
if p.stderr != None:
p.stderr = p.stderrbuf
if p.stdinstr != None:
if p.stderr or p.stdout:
self.numprocspolling -= 1
del self.pids[]
if p.partof:
p.partof.parts_left += -1
#print >>sys.stderr, "PARTOF STATUS", p, p.status, p.time
# only the acountable child should impact queueing, and doesn't really count to caller
if p.partof.parts_left:
return None
# Now that p has finished, p.partof is complete, so process the overall (which may or may not be p)
p = p.partof
# XXX: ssh returns 255 if it can't connect and rsync returns 12
if p.status and p.kwargs['retries'] and (os.WEXITSTATUS(p.status) == 255 or os.WEXITSTATUS(p.status) == 12):
# Requeue if retries enabled and child returned 255 (SSH failed to connect)
if Verbose > 0:
print >>sys.stderr, p.kwargs['retries'], "more retries for", p.node
p.kwargs['retries'] -= 1
time.sleep(2**(self.retries - p.kwargs['retries'] - 4) + 0.5 * random.random())
return None
return p
def waitpid(self, block=False):
"""Check for any completed child, remove them from the procs dictionary and return them."""
while not self.isempty():
r = self.checkwait(block=block)
if r:
return r
if not block:
return None
def checkwait(self, block=False):
if block:
# First, the block argument is just a hint, we're not obligated to block.
# Second, we may have children that are blocking on I/O to us, so we have use poll().
# But we may have children that have no shared fd with us and that can only be reaped with waitpid().
# So we need to figure out which case it is to determine where we can actually block.
if self.numprocspolling == len(self.pids):
# In this case, waitpid() is superfluous, so don't block on it, but
# block on poll()
waitopts = os.WNOHANG
pollopts = -1 # Block
elif self.numprocspolling:
# In this case we have to bounce back between waitpid() and poll()
# We don't want to have a complete busy loop, so we give poll a small timeout.
# This creates some potential latency, but this is not a expected case
print >>sys.stderr, "Debug: inefficient when only some children have fds we manage"
waitopts = os.WNOHANG
pollopts = 2500 # block for 2500ms
# In this case poll() is superfluous and we should just do a blocking waitpid()
waitopts = 0 #block
pollopts = 0 #do not block
# Simple case
pollopts = 0 # poll only
waitopts = os.WNOHANG
#print >>sys.stderr, "block case", waitopts, pollopts
# Check for completed children.
# Waiting register's child's CPU times, so measure right before and after
starttimes = os.times()
(pid, status) = os.waitpid(0, waitopts)
if pid:
p = self.pids[pid]
p.status = status
p.starttimes = starttimes
return self.finalizeProc(p)
except OSError, e:
if e.errno == errno.ECHILD:
# If we're here its probably because in python 2.4, subprocess reaps other children
# whenever making a new one. So one of the children we thought was running had already
# been waitpidded.
# So, poll on each of our children to find the one to finalize:
for p in self.pids.values():
# First look for things reaped by the signal handler
if p.__dict__.has_key('error'):
return self.finalizeProc(p)
# Since the child is already reaped, we'll probably measure no CPU time for it. oh well
starttimes = os.times()
status = p.poll()
if status != None:
p.status = status
return self.finalizeProc(p)
print >>sys.stderr, "Unexpected: no child", self.pids
# Look for children that may be blocking on writes to stdout, or that have finished
# this won't catch children that we don't have share a fd with
# Make sure we get interrupted if a child exits (in case we're not connected to any of its fds)
for fd, event in self.poll.poll(pollopts):
p = self.fd2proc[fd]
if event == 'r':
if fd == p.stdout.fileno():
s =
assert(fd == p.stderr.fileno())
s =
if event == 'w':
assert(fd == p.stdin.fileno())
return None
def processStdin(self, p):
assert(p.stdinstr != None)
# Child could quit before we get it all out
bytes = os.write(p.stdin.fileno(), p.stdinstr)
#print >>sys.stderr, "%s: wrote %d of %d bytes" % (p, bytes, len(p.stdinstr))
p.stdinstr = p.stdinstr[bytes:]
except IOError:
# assume that child exit status will tell the story
# if it ignored some input and exited with 0, we'll be oblivious
p.stdinstr = None
if not p.stdinstr:
#print >>sys.stderr, "%s: closing stdin pipe, %d" % (p, p.stdin.fileno())
del self.fd2proc[p.stdin.fileno()]
# May be closed
except ValueError:
p.stdinstr = None #Tell finalize not to bother closing
# The design is as follows:
# 1. Jobs are submitted in an uncoordinated manner, so they have unique
# identifiers that aren't generated by the user. but that are identical
# across all nodes running a job.
# 2. To manage the problem of listing jobs and removing jobs, we use a
# directory structure to contain all current jobs. Deleting a file should
# (eventually) lead to a job not running further.
# 3. New jobs should be invoked synchrously by calling a job scheduler with a
# hint that points to the job. Failure to send this hint should only delay
# the job scheduler discovering the job since it should periodically poll
# for changes in the job directory.
class JobScheduler:
"""JobScheduler maintains a set of child processes working on a set of jobs."""
def __init__(self, locs): = {} # A dictionary of job objects indexed by job name
self.procs = Procs(retries=0)
self.options = locs.this
self.freethreads = self.options.processes = locs.getSynchronizer()
if self.options.dynamicload:
global cpu
cpu = CpuProfile(locs, self.options.dynamicload)
def ReadJobDir(self):
"""Check for new/removed jobs"""
jobs = os.listdir(self.options.jobdir)
removed = set( - set(jobs)
recent = set(jobs) - set(
for j in removed:
#print "Job", j, "removed"
for j in recent:
if j.startswith("."):
# Ignore dot files (probably rsync droppping something in the directory)
#print "Job", j, "added"
jobuid = os.stat(self.options.jobdir + "/" + j).st_uid
if jobuid != os.getuid():
#print >>sys.stderr, "Skipping job owned by UID %d", jobuid
try:[j] = FmJob(self.options.jobdir + "/" + j, cpusPerJob=self.options.cpusperjob,
print >>sys.stderr, "Error parsing job description", j, "; skipping"
def RunOnce(self, invalidateOnly=False):
"""Try to launch a work item and return True on success (None->nothing to do)"""
if self.options.dynamicload and not self.procs.isempty() and not cpu.available(len(self.procs.pids)):
# If using dynamic scheduling and something is running and system is not idle,
# then don't schedule more work.
return None
self.freethreads -= 1
# Look for something with work to do
jobs =
for jobname in jobs:
j =[jobname]
if (not self.options.dynamicload) and (len(j.running) >= j.threads):
proc = j.compute(self.options, self.procs, invalidateOnly)
if invalidateOnly:
if self.options.dynamicload:
# Restart our checking of idleness
#print >>sys.stderr, "Launched", proc
if proc:
return True
# No more work to do, check to see if job is complete
# Note, job cannot be complete if parent is still active
if not j.continuous and not j.running and (not j.parent or not
self.freethreads += 1
return None
def JobDone(self, jobname):
"""A job has run to completion, remove it locally."""
#print >>sys.stderr, "Job", jobname, "done"
os.unlink(self.options.jobdir + "/" + jobname)
def RunUntilDone(self):
"""Run until there is no more input on it. Return True iff we did something"""
didSomething = False
sleep = 0.01
# Invalidate anything that is stale from previous runs
while True:
if self.freethreads:
if self.procs.isempty() and not len(
# Doing nothing and no more jobs to run
return didSomething
if self.RunOnce():
didSomething = True
block = False
# Nothing to do right now
# Don't return because we're still working,
# but wait before we look for more work to do.
#print >>sys.stderr, "Wait for completion (and/or more work)"
# XXX. Check syncfiles for all active procs and see if somebody
# finishes before us. If so, abort (we may be the a slow node).
# Only check here since that's when we're on our last work item (no more work to do)
# and could accelerate our wait finishing sooner by terminating.
#for p in self.procs:
# check to see if somebody beat us
# Sleeping hurts latency for detecting new inputs, but avoids busy waits.
# So we do an exponential backoff in how long we sleep with a max of 1 second
sleep *= 1.1
if sleep > 1:
sleep = 1 # Max out at 1 sec sleep
block=False # Don't block since new work could arrive before a child finishes
block = True
assert (not self.procs.isempty())
p = self.procs.waitpid(block=block)
#print >>sys.stderr, "WAITPID", block, p
if p:
self.freethreads += 1
if p.status and p.job.checkexit != None:
if isinstance(p.inputs, list) and len(p.inputs) > 1:
inputstr = str("%s and %d others" % (str(p.inputs[0]), len(p.inputs)-1))
inputstr = str(p.inputs)
print >>sys.stderr, "Node terminating job %s due to exit status %d on input %s:" % (p.jobname, p.status, inputstr)
except IOError:
# .stderr file might not exist
# Do not cleanup job because the presence of jobs after the scheduler exits is how the waiter detects an error
if self.options.dynamicload:
sleep = 0.01
if not block:
def finalizeItem(self, p):
stats = {}
stats['status'] = p.status
stats['time'] = p.time
stats['utime'] = p.utime
stats['stime'] = p.stime
stats['inputsize'] = p.inputsize
stats['jobname'] = p.jobname
stats['start'] = p.start
stats['nodename'] =
stats['hostname'] = Hostname
stats['timestamp'] = time.time()
stats['cmd'] = p.cmd
assert(not p.parts_left)
for r in
if stats['status'] == 0:
# Promote from non-error to error (like pipefail)
stats['status'] = r.status
stats['time'] += r.time
stats['utime'] += r.utime
stats['stime'] += r.stime
stats['start'] = min(stats['start'], r.start)
dir = p.outdirname + "/." + My_file_prefix + "_" + p.outbasename
f = file(dir + "/.status", "w")
except IOError:
# Perhaps we were invalidated while running. Don't sweat it.
cPickle.dump(stats, f)
# Remove empty outputs to avoid unnecessary downstream processing
for f in "stdout", ".stderr":
outf = dir + "/" + f
if os.path.getsize(outf) == 0:
os.unlink(outf), p.start)
dst = p.outdirname + "/" + p.outbasename
tmp = p.outdirname + "/.del." + My_file_prefix + "_" + p.outbasename
# By removing this dst(if present) , we're invalidating any previously derived data, but our redo-if-newer-than logic will handle the compute.
# XXX. There is still a chance for a race if a child already was in process on the old dir and in between multiple files.
os.rename(dst, tmp)
except OSError, e:
if e.errno == errno.ENOENT:
# Didn't exist (or race to delete). This is common and okay.
os.rename(dir, dst)
except OSError, e:
if e.errno == errno.EEXIST or e.errno == errno.ENOTEMPTY:
print >>sys.stderr, "Warning: unable to finalze to %s (okay if this is a shared directory):" % dst
class CpuProfile:
def __init__(self, locations, interval):
self.interval = interval
self.idle = 0 = 0
self.HZ = os.sysconf('SC_CLK_TCK')
self.ncpus = os.sysconf('SC_NPROCESSORS_ONLN')
self.divisor = 0
hostname = locations.this.get('hostname')
for l in locations.config.locs.values():
if (hostname == l.get('hostname')):
self.divisor += 1
if Verbose > 2:
print >>sys.stderr, "Host", hostname, "shared with", self.divisor
if not self.divisor:
print >>sys.stderr, "Warning: could not determine number of nodes on this host"
self.divisor = 1
def update(self, force=False):
"""Try to update CPU idle figures. Return True iff we updated the idle figures."""
fields = open('/proc/stat').readline().split()
print >>sys.stderr, "Warning: scheduler cannot determine CPU idle % on this platform"
return False
assert(fields[0] == "cpu")
fields = [int(x) for x in fields[1:]]
total = sum(fields)
# Make sure our update is large enough to be meaningful
duration = float(total -
duration_wallclock = duration / self.HZ / self.ncpus
if (not force) and duration_wallclock < self.interval:
if Verbose > 2:
print >>sys.stderr, time.asctime(), "DynamicLoad: Waiting to gather more stats"
# We don't sleep since we're called by RunOnce which is an event management loop
return False
idlecpu = fields[3]
if duration:
self.idle_fraction = (idlecpu - self.idle) / duration
self.idle_fraction = 0
if (not force) and Verbose > 1:
print >>sys.stderr, time.asctime(), "DynamicLoad: CPU ", self.idle_fraction * 100, "% idle over ", duration_wallclock, "seconds"
self.idle = idlecpu = total
return True
def memory_usage(self):
#return 1 - (os.sysconf('SC_AVPHYS_PAGES') / float(os.sysconf('SC_PHYS_PAGES')))
f = file("/proc/meminfo")
free = 0.0
for line in f:
if line.startswith("MemTotal"):
total = float(line.split()[1])
if line.startswith("MemFree"):
free += float(line.split()[1])
if line.startswith("Cached"):
free += float(line.split()[1])
return 1 - (free/total)
assert(not "Unable to parse /proc/meminfo")
def available(self, nprocs):
"""Return True iff the system is neither CPU nor I/O bound."""
if not self.update():
return False
percent_per_proc = (1.0 - self.idle_fraction) / self.divisor / nprocs
percent_per_cpu = 1.0 / self.ncpus
# Use a threshold that assumes that a job will use at least 1 CPU (percent_per_cpu) and perhaps much more (percent_per_proc).
# If the actual requirement is less than 1 CPU's worth, we'll add more in the futures.
threshold = max(percent_per_cpu, percent_per_proc)
# Check for idle CPU time
if self.idle_fraction < (1.0 / self.ncpus):
# Less than 1 CPU's worth of resources is available, so don't add more
if Verbose > 2:
print >>sys.stderr, time.asctime(), "DynamicLoad: Waiting for CPU to be more idle"
return False
mem_usage = self.memory_usage()
# Assume current memory usage is evenly distributed across our peer schedulers:
# our_usage = mem_usage / self.divisor
# Assume how much memory a new child would take:
# needed = our_usage / nprocs
# Assume available memory is evenly distributed across schedulers
# available_share = (1.0 - mem_usage) / self.divisor
# We should wait if (needed > available_share)
# = mem_usage * (1.0 / nprocs) > (1.0 - mem_usage)
# = mem_usage + mem_usage * (1.0 / nprocs) > 1.0
# = mem_usage * (1 + (1.0 / nprocs)) > 1.0
# = mem_usage > 1.0 / (1 + (1.0 / nprocs))
# = mem_usage > 1.0 / ((nprocs + 1) / nprocs)
# = mem_usage > nprocs / (nprocs + 1)
# = mem_usage > nprocs / (nprocs + 1)
if nprocs:
threshold = nprocs / (nprocs + 1.0)
if mem_usage > threshold:
# Wait for more memory
if Verbose > 1:
print >>sys.stderr, time.asctime(), "DynamicLoad: Waiting for memory usage to drop to %d%% from %d%%" % (int(100*(threshold)), int(100*mem_usage))
return False
# Account for D state:
# If we're in I/O wait (e.g. for a write), then we're not idle,
# so that's already accounted for. But if we're in D, then we are
# idle, but we don't want more D. The kernel only seems to tally D
# in the loadavg, but that's exponentially slow, we need to go tally it
# ourself
# XXX. would like to limit this to just our disk
unint = 0
for f in os.listdir('/proc'):
if unicode(f).isnumeric(): # looks like a pid
line = open('/proc/' + f + '/stat').readline().split()
# probably finished, just ignore
if line[2] == 'D':
unint += 1
if not unint:
return True
if Verbose > 1:
print >>sys.stderr, time.asctime(), "DynamicLoad: Waiting for less D state (currently %d)" % (unint)
#Restart accounting so we don't include previous D time
return False
class FmJob:
"""Each FmJob object represents a job, as specified in a job file, and
provides methods for identifying and processing inputs for that job."""
def __init__(self, fname, cpusPerJob = 1, locker = None): = locker
config = ConfigParser.SafeConfigParser()
def config_get(field, default=None, raw=False):
if config.has_option("mrjob", field):
return config.get("mrjob", field, raw=raw)
return default
processed =
if not processed:
raise IOError(errno.ENOENT, "Could not open file", fname)
self.jobname = os.path.basename(fname)
self.cmd = config_get("cmd", raw=True)
self.cmd = eval(self.cmd)
self.cmd = self.cmd.split()
self.inputs = config_get("inputs")
self.cachedinputs = None
self.cachedinputs_deferred = []
self.previousinputs = None
self.continuous = config.has_option("mrjob", "continuous")
self.reduce = config_get("reduce")
self.threads = int(config_get("procspercpu", 1))
self.threads *= cpusPerJob
self.running = []
self.parent = config_get("parent")
self.checkexit = config_get("checkexit")
if self.checkexit != None:
self.checkexit = int(self.checkexit)
def compute(self, options, procs, invalidateOnly):
# First time thru we expand the input globs to a file list
# Unless this is a continuous job, this will be the only time we enumerate the files
if not self.cachedinputs:
glob = [options.rootdir + "/" + x for x in self.inputs.split()]
glob = multiglob(glob)
if options.cowdir:
cows = multiglob([options.cowdir + "/" + x for x in self.inputs.split()])
if self.reduce:
glob += cows
# Assume that COWdir is globally shared and manage competition/duplication of processing it.
# Use demand setting to limit contention (unlike files in the rootdir)
n = Locations.this.demand
if n == 0:
# Process everything in sight
glob += cows
for i in cows:
choices = list(Locations.pickn(i[len(options.cowdir):], n=n))
if Locations.thisname in choices:
#print >>sys.stderr, i[len(options.cowdir):], "hashes to", list(choices), i
if self.previousinputs: # Updating list
self.cachedinputs = list( set(glob) - self.previousinputs )
else: # First pass
self.cachedinputs = glob
if not invalidateOnly:
self.previousinputs = set(glob)
# Randomize inputs. Use nodename as lightweight seed guaranteed to be different on each node
if self.reduce:
parts = [os.path.basename(i) for i in self.cachedinputs]
parts = list(set(parts)) # Get unique partitions
#print >>sys.stderr, "Cachedinputs", len(self.cachedinputs), "in", len(parts), "parts"
putargs = "argv"
if "-" in self.reduce:
putargs = "stdin"
for part in parts:
#print >>sys.stderr, "PART", part
# Don't process .d directories as named inputs (feedback loop)
if part[-2:] == ".d":
# Check to see if this node should proess this partition
choices = list(Locations.pickn(part, n=1))
if Locations.thisname not in choices:
#print >>sys.stderr, Locations.thisname, "only hashes", part, "to", list(choices)
#print >>sys.stderr, part, "processes on this node", list(choices)
#Get list of files with this partition
files = []
remaininginputs = []
for i in self.cachedinputs:
if os.path.basename(i) == part:
# Remove these from the list of inputs to process later
self.cachedinputs = remaininginputs
outfilename = "/reduce/" + self.jobname + "/" + part + ".d"
p = self.computeItem(files, outfilename, options, procs, invalidateOnly, canDefer=False, putargs=putargs)
if p:
#print >>sys.stderr, "Reduce part", part, "on node", Locations.thisname, files, "of", self.inputs
return p
return None # Nothing to do
while self.cachedinputs or self.cachedinputs_deferred:
# If we exhaust cachedinputs, fall-over to the deferred inputs
if not self.cachedinputs:
self.cachedinputs = self.cachedinputs_deferred
self.cachedinputs_deferred = None # Keep track that we can't defer anymore
i = self.cachedinputs.pop()
if i[-2:] == ".d":
# Don't process .d directories as named inputs (feedback loop)
if i.startswith(options.rootdir):
relativename = i[len(options.rootdir):]
elif options.cowdir and i.startswith(options.cowdir):
relativename = i[len(options.cowdir):]
assert(not i)
outfilename = relativename + ".d/" + escape(self.cmd)
p = self.computeItem(i, outfilename, options, procs, invalidateOnly, self.cachedinputs_deferred != None)
if p == 'defer' and False:
# XXX. Limit to things we would pick()
elif p:
return p
return None # Nothing to do
def computeItem(self, inputs, outfilename, options, procs, invalidateOnly, canDefer, putargs="argv"):
"""Start (but don't wait for completion) running this job on of the next unprocessed input for this job."""
if type(inputs) != type([]):
inputs = [inputs]
obase = options.rootdir + "/" + outfilename
statfile = obase + "/" + ".status"
# We invalidate outputs if inputs are newer, so find the newest input
newestinput = 0
for i in inputs:
newestinput = max(newestinput, os.path.getmtime(i))
except OSError:
# Input no longer present, so we don't want to rerun
return None
# First check locally to see if output exists and is current
lasttime = os.path.getmtime(statfile)
lasttime = None
if lasttime:
if newestinput <= lasttime:
#print >>sys.stderr, "Local results current for", statfile, self.checkexit
if self.checkexit == None:
return None
stats = cPickle.load(file(statfile))
print >>sys.stderr, "Invalidating previous output based on unreadable stats file", statfile
if stats['status'] != self.checkexit:
print >>sys.stderr, "Invalidating previous output based on exit status", statfile, stats['status'], "instead of", self.checkexit
return None
#Remove it now before somebody uses it for something
dir = os.path.dirname(statfile)
print >>sys.stderr, "Removing stale results", dir, lasttime, newestinput
parent = os.path.dirname(dir)
dirname = os.path.basename(dir)
tmp = parent + "/.del" + My_file_prefix + dirname
print >>sys.stderr, "Moving", dir, tmp
os.rename(dir, tmp)
except OSError, e:
if e.errno == errno.ENOENT:
# Somebody else just removed it, so let them have it
print >>sys.stderr, "Looks like somebody else just removed %s, so skipping it" % (dir)
return None
assert(not os.path.exists(statfile))
# #print >>sys.stderr, "No local results", statfile
# If we get here, then we're locally prepared to rerun
# Check global synchronization to see if another node already ran it
l =, newestinput, exclusive=canDefer)
if l == 'done':
# Somebody already completed
return None
elif l == False:
# Did not get (exclusive was requested and not available)
return 'defer'
assert(l == True)
# We are ready to start processing
if invalidateOnly:
# Don't actually do anything right now
return None
# Now we proceed to execute.
# XXX. our scheduler should check and kill us off if somebody finishes first
# Now insert a "." at the beginning of the basename for partial output
odirname = os.path.dirname(obase)
obasename = os.path.basename(obase)
obase = odirname + "/." + My_file_prefix + "_" + obasename
oname = obase + "/stdout"
ename = obase + "/.stderr"
if os.path.exists(obase):
sout = os.fdopen(, os.O_CREAT|os.O_WRONLY|os.O_TRUNC), "w")
serr = os.fdopen(, os.O_CREAT|os.O_WRONLY|os.O_TRUNC), "w")
# Make a local copy of cmd so we can substitute this input file in it
cmd = copy.copy(self.cmd)
if putargs == "argv":
subed = False
for i, c in enumerate(cmd):
if '%(input)' in c:
cmd[i] = c.replace('%(input)', " ".join(inputs))
subed = True
if c == "|" and not subed:
cmd[i:1] = inputs
subed = True
if not subed:
cmd += inputs
inputlist = "\n".join(inputs)
i = 0
while i < len(cmd):
if cmd[i] == "fm" and (i == 0 or cmd[i-1] == "|"):
cmd[i:i+1] = options.fmcmd
i += len(options.fmcmd)
i += 1
cmd = [os.path.expanduser(c) for c in cmd]
if putargs == "argv":
p = procs.Popen(cmd, stdout=sout, stderr=serr, cwd=obase)
if putargs == "stdin":
p = procs.Popen(cmd, stdinstr=inputlist, stdout=sout, stderr=serr, cwd=obase)
#print >>serr, "Error", sys.exc_value, "executing", cmd
print >>sys.stderr, "Error", sys.exc_value, "executing", cmd
#print >>sys.stderr, "JOB RUNNING", self.jobname, p, procs.pids, inputs
p.inputsize = 0
p.inputs = inputs
for i in inputs:
p.inputsize += os.path.getsize(i)
p.outdirname = odirname
p.outbasename = obasename
p.outrelpath = outfilename
p.cmd = self.cmd
p.jobname = self.jobname
p.job = self
#print >>sys.stderr, "computeItem", cmd, Hostname,
return p
class CommandSetBase(object):
"""This is a base-class for defining methods which are used as
command line-based commands. You should inherit from it and define
methods. All methods will be callable. Usage and help information
will only include methods that do not begin with an underscore."""
def __method(self, mname):
return self.__class__.__dict__[mname]
def _usage(self):
"""Return a multi-line usage string based on the defined methods and
their doc strings."""
usage = 'For more help, specify one of the following commands followed by -h option:\n'
keys = self.__class__.__dict__.keys()
for cmd in keys:
if cmd[0] == "_":
method = self.__method(cmd)
#docstr = pydoc.text.indent(pydoc.text.document(method)).replace("(self, args)","")
docstr = method.__doc__
if docstr:
docstr = docstr.split("\n")[0].strip().rstrip(".")
usage += " %-8s - %s\n" % (cmd, docstr)
return usage
def __init__(self, args, optParser=None):
if not optParser:
optParser = MethodOptionParser()
optParser.set_usage("""%prog command args...\n""" + self._usage())
(options, args) = optParser.parse_args(args)
if not args:
method = self.__method(args[0])
print >>sys.stderr, "Unknown command", args[0]
sys.exit(method(self, args[1:]))
def _optionHook(self, optParser):
class RedisSynchronizer:
def __init__(self, locs):
url = locs.this.syncdir
import redis
self.redis = redis.client.Redis.from_url(url)
self.hostname = socket.gethostname()
self.procs = locs.procs
self.locs = locs
def _canonicalize(self, pth):
return pth.replace('//', '/').lstrip('/')
def invalidate(self, lst):
nonglobs = []
lst = [self._canonicalize(l) for l in lst]
for i in lst:
if '*' in i:
# We could explicitly register this so we can call it multiple times a little more efficiently (maybe)
script = self.redis.register_script("local r ='keys', KEYS[1]); if table.getn(r) > 0 then'del', unpack(r)); end")
if nonglobs:
def start(self, filename, newestinput, exclusive):
"""Semantics here are complicated. Grab this (and return True) if any of the following are true:
0. Nobody has grabbed this yet
1. Our newestinput is newer than the timestamp
2. The node that claimed this before was us
3. The node that claimed this before is now inactive
4. Somebody else has started, is not us, is not inactive, is not finished, and exclusive==False
If #4 is true, except that exclusive==True, then return 'defer'.
In any other case, return False.
filename = self._canonicalize(filename)
#XXX Should this be nodename (for multiple nodes per host) insted of hostname?
#XXX Make this a Lua script so it's atomic
r = self.redis.get(filename)
#print >>sys.stderr, "REDIS GET", filename, r
if r:
r = r.split()
if r[0] != self.hostname and r[0] in self.locs.config.locs and not self.locs.config.locs[r[0]].inactive:
if r[1] >= newestinput:
if len(r) == 3 and r[2] == ".":
return 'done'
if len(r) < 3 and exclusive:
return False
self.redis.set(filename, self.locs.thisname + " " + str(newestinput))
return True
def done(self, filename, timestamp):
filename = self._canonicalize(filename)
self.redis.set(filename, self.locs.thisname + " " + str(timestamp) + " .")
class FileSynchronizer:
def __init__(self, locs):
self.locs = locs
self.syncdir = locs.this.syncdir
def invalidate(self, lst):
# Delete sync files so that computation is forced to (re-)execute
# These files are shared, so we pick a minimum number of nodes to do this on
syncfiles = [self._filename2lock(i) for i in lst]
return self.locs.pick().forAllLocal(["localrm", "-R", "--absolute"] + syncfiles)
# We _desire_ near-atomicity on first to claim a job (since others will prefer to avoid duplication)
# race conditions should result in multiple holders rather than no holders
# We don't worry so much about races between multiple finishers (any can win; they've already done the work),
# race conditions should result in multiple holders rather than no holders
# but we want others testing for done-ness to promptly know if anybody has finished.
# We also want to keep track of how many are in process so that we don't over-duplicate
def start(self, filename, newestinput, exclusive):
# Locally we want to (re-)create output.
# exclusive is True iff we want the first lock (not if anybody else has started)
# Return 'done' if the status is complete
# Return False if somebody else has started and exclusivity was requested
# Return True if we should start (given the status and requested exclusivity)
# Doing an atomic operation is expensive and we're really only worried about simultaneous creation,
# so first just stat() to see if it file already exists.
syncfilename = self._filename2lock(filename)
take = False
contents = file(syncfilename).read()
except IOError, e:
if e.errno == errno.ENOENT:
# Nobody has claimed it yet, so we will
take = True
elif e.errno == errno.ESTALE:
# Looks like NFS file got deleted after opening
take = True
if not take:
# Check to see if we think we need to invalidate it.
contents = contents.split()
node = contents[0]
if ((node != self.locs.thisname)
# If we did it last time, then redo it now (start() is called iff we have newer inputs than outputs)
and (node in self.locs.config.locs)
# if the node that did it is no longer active
and (not self.locs.config.locs[node].inactive)
# Somebody else did it, but we have newer inputs, so redo it
# XXX. this doesn't remove the output elsewhere so if we shuffle both, we may get the old input
# Hopefully if our input changed, their's will too. If a node is offline while the input updates
# and comes back later, it eneds to invalidate its result. Perhaps we should only redistribute
# things we have current locks for. Hopefully rsync -u will get the newer file in the end.
and (newestinput <= os.path.getmtime(syncfilename))):
# If we get here, then we do NOT want to invalidate
if len(contents) > 1:
# This means the output is complete
return 'done'
return (not exclusive)
except OSError, e:
if e.errno == errno.ENOENT:
# Somebody else just removed it, so let them have it
return (not exclusive)
# If we get here, then we are going to invalidate and take the lock.
except OSError, e:
if e.errno == errno.ENOENT:
# We may be racing with another process that is just ahead of us.
# In that case, we may both unlink sucessfully and both restart the work.
# Somebody else unlinked (but hasn't re-created yet). Let them have it
return (not exclusive)
if self._atomicCreate(syncfilename):
# We were the first
return True
# This is unlikely, but somebody else must have just started
return (not exclusive)
def done(self, filename, timestamp):
# XXX. Uses current time (as recorded by NFS server on file modification) instead of timestamp
syncfilename = self._filename2lock(filename)
# This should only happen if somebody else just finished too
return self._atomicCreate(syncfilename, ' .')
def _atomicCreate(self, lockfilename, contents=''):
myname = lockfilename + "_" + My_file_prefix
syncfile = os.fdopen(, os.O_CREAT|os.O_SYNC|os.O_WRONLY), "w")
syncfile.write(self.locs.thisname + contents)
success = False
try:, lockfilename)
success = True
except OSError, e:
if e.errno != errno.EEXIST:
# Otherwise, somebody just beat us to it -- let them have it
# Get rid of private name for it, successful or not
return success
def _filename2lock(self, filename):
# This converts to an output directory path (that will contain .status, stdout, etc. files)
# to a unique filename in the syncdir. The path is mostly the same,
# except since the syncdir is a single directory across all nodes
# Consider a virtual directory structure like the following
# /data/<timestamp>.d/cmd
# with numerous <timestamp> values stored across nodes.
# If we recreate that hierarchy in a centralized place, we may
# have more directory entries in /data than the system supports.
# Thus, we convert each element in the path from /foo to /XX/YY/foo
# Where XX/YY is based on a hash of foo. This should divide directory
# entries across a large number of parent directories. Using a base64
# alphabet for XX/YY, this should support clusters of up to 2^24 nodes
# with intermediate directories containing no more than 4096 entries.
# We convert that to:
# /data/XX/YY/<timestamp>.d/cmd
# to avoid running out of diretory entries in /data/ in the syncdir
# Where XX and YY are derived from hash(<timestamp>.d)
inpath = filename.replace("//", "/")
inpath = inpath.strip('/')
outpath = [self.syncdir]
while inpath:
if '/' in inpath:
parent, inpath = inpath.split('/', 1)
parent = inpath
inpath = None
if '*' in parent or '?' in parent:
# Wildcards propagate to hash
filenamehash = printableHash(parent)
syncfilename = '/'.join(outpath)
return syncfilename
def print_df_line(disk, size, available):
fmt = "%20s %10s %10s %5s"
if not disk:
print fmt % ("Filesystem", "Size ", "Free ", "Usage")
if size:
usage = (size-available) * 100.0 /size
usage = -1
print fmt % (disk, bytecount2str(size), bytecount2str(available), "%d%%" % usage)
class FmCommands(CommandSetBase):
def __init__(self, args):
"""FileMap is a file-based map-reduce system.
You can think of it is parallel and distributed xargs or make.
It features replicated storage and intermediate result caching.
self._locs = None
p = MethodOptionParser()
p.set_usage("""fm command args...\n""" + self._usage())
p.add_option("-v", "--verbose", action="count", help="Increase verbosity (can be used multiple times)")
(options, args) = p.parse_args(args)
if options.verbose:
global Verbose
Verbose += options.verbose
CommandSetBase.__init__(self, args, p)
def _Locations(self):
if not self._locs:
self._locs = FmLocations()
return self._locs
def partition(self, args):
"""Partition an input file. Split an inputfile into n pieces.
Specify a whitespace-delimited field with -f. All lines with
the same key will be placed in the same output file. The -r
option can be used to specify a Perl-compatible regular
expression that matches the key. If the regex contains a group,
then what matches the group is the key; otherwise, the whole
match is the key.
If nways is greater than 0, output files are numbered 1 to n.
If nways is 0, then files will be named for the values of the keys.
If nways is set and no field or regex is specified, then lines
are partitioned round-robin. All output files are gzip
compressed and created in the current directory. Input files
ending in .gz will automatically be decompressed.
p = MethodOptionParser()
p.add_option("-n", "--nways", help="Number of output files to use (0 means each value will get its own file)")
p.add_option("-r", "--regex", help="Regex that matches the key portion of input lines")
p.add_option("-f", "--field", help="Use field # or range (numbered starting with 1).")
p.add_option("-Z", "--no-compress", action="store_true", help="Do NOT compress output files")
(options, args) = p.parse_args(args)
infiles = args
if options.nways == None:
p.error("-n option required")
options.nways = int(options.nways)
if options.regex:
options.regex = re.compile(options.regex)
if options.field:
if "-" in options.field:
options.field = options.field.split("-")
options.field = [int(i)-1 for i in options.field]
options.field[1] += 1
options.field = [int(options.field) - 1, int(options.field)]
files = {}
i = 0
if not infiles:
infiles = ["-"]
for infile in infiles:
if infile == "-":
f = sys.stdin
elif infile.endswith("gz"):
f = gzip.GzipFile(infile)
f = file(infile)
for line in f:
if options.regex:
key =
if key:
g = key.groups()
if len(g):
key = g[0]
key =
print >>sys.stderr, "Key not found in line:", line.rstrip()
elif options.field:
words= line.split()
#print words[options.field[0]:options.field[1]]
key = words[options.field[0]:options.field[1]]
fname = None
if options.nways:
if options.regex or options.field:
i = int(sha(str(key)).hexdigest(), 16) % options.nways
i = (i + 1) % options.nways
if not files.has_key(i):
fname = str(i+1)
i = ','.join(key)
if not files.has_key(i):
fname = escape([i])
if fname:
if not options.no_compress:
files[i] = gzip.GzipFile(fname + ".gz", "w")
files[i] = file(fname, "w")
for f in files.values(): f.close()
def kill(self, args):
"""Kill a job."""
p = MethodOptionParser()
p.add_option('-s', '--sched', action='store_true', help="Kill all remote schedulers and their children", default=False)
(options, args) = p.parse_args(args)
args = ["/jobs/" + a for a in args]
if args:
self._Locations().forAll(["rm", "-f"], args).collect()
if options.sched:
def mv(self, args):
"""Rename files in the cloud"""
(options, args) = MethodOptionParser(fixed=["src...", "dst"]).parse_args(args)
dst = args[-1]
each = args[:-1]
if len(each) > 1: dst += "/"
self._Locations().forAll(["mv"], each, dst, quote=False).collect()
def mkdir(self, args, absolutes=[], async=False):
"""Make a directory (or directories) on all nodes.
Has unix "mkdir -p" semantics."""
(options, args) = MethodOptionParser(fixed=["dir..."]).parse_args(args)
# Make sure destination exists
p = self._Locations().forAll(["mkdir", "-p", "-m", "2775"] + absolutes, args, subst=True, glob=False)
if not async:
def jobs(self, args):
"""Show all of the jobs still active."""
(options, args) = MethodOptionParser(fixed=[]).parse_args(args)
pkls = self._local_pickles(["localjobs"])
jobs = {}
for mx in pkls:
for jname,j in pkls[mx].iteritems():
if not jobs.get(jname):
jobs[jname] = j
j.nodes = [mx]
# Second pass to get stats for all jobs (even on nodes that didn't have the job running)
for j in jobs.values():
statglobs = [i + ".d/" + escape(j.cmd) + "/.status" for i in j.inputs.split()]
j.stats = list(itertools.chain(*self._local_pickles(["localstats"] + statglobs).values()))
first = True
for jname,j in jobs.iteritems():
inputlist = j.inputs.split()
flags = ""
if j.reduce:
flags = j.reduce + " "
if not first:
print jname + "\t" + j.uid + "\t" + pipes.quote(flags + ' '.join(j.cmd))
flags = ""
if j.continuous:
flags += "-c "
if j.checkexit != 0:
flags += "-e "
print "\t%s-i '%s'" % (flags, "' -i '".join(inputlist))
nodestr = ''
if len(j.nodes) < 10:
nodestr = ': ' + (','.join(j.nodes))
print "\t%d nodes still processing%s" % (len(j.nodes), nodestr)
self._tallyStats(j.stats, inputlist, indent='\t')
first = False
def df(self, args):
"""Show free space on each node."""
(options, args) = MethodOptionParser(fixed=[]).parse_args(args)
dfs = self._local_pickles(["df"])
disks = []
for mx in dfs:
for d in dfs[mx]:
size, available = dfs[mx][d]
# Really only 1 disk per node, so just use node name
disks.append((mx, size, available))
# Sort by available, reversed
disks.sort(lambda a,b: cmp(b[2], a[2]))
print_df_line(None, None, None) #Defaults to header
for d in disks:
print_df_line(None, None, None) #Defaults to header
print_df_line("Total (%d disks)" % len(disks),
sum([i[1] for i in disks]),
sum([i[2] for i in disks]))
def loadavg(self, args):
"""Show load average."""
(options, args) = MethodOptionParser(fixed=[]).parse_args(args)
pkls = self._local_pickles(["localload"], unique_hosts=True)
loadtotal = [0,0,0]
cpus = 0
for mx in pkls:
loadtotal[0] += float(pkls[mx]['loadavg'][0])
loadtotal[1] += float(pkls[mx]['loadavg'][1])
loadtotal[2] += float(pkls[mx]['loadavg'][2])
cpus += pkls[mx]['cpus']
#print pkls[mx]
print "Total load average: %.2f (%d%%) %.2f %.2f" % (loadtotal[0], 100.0 * loadtotal[0] / cpus, loadtotal[1], loadtotal[2])
def chmod(self, args, absolutes=[], ignoreErrors=False):
"""Change permissions on files in the cloud."""
(options, args) = MethodOptionParser(fixed=["mode", "files..."]).parse_args(args)
self._Locations().forAll(["chmod", args[0]] + absolutes, args[1:], subst=True, quote=False).collect(ignoreErrors=ignoreErrors)
def chgrp(self, args, absolutes=[]):
"""Change GID on files in the cloud."""
(options, args) = MethodOptionParser(fixed=["perm", "files..."]).parse_args(args)
self._Locations().forAll(["chgrp", args[0]] + absolutes, args[1:], subst=True, quote=False).collect()
def store(self, args):
"""Store one or more files into the cloud."""
"""src... dst
Copy the specified file(s) into the virtual store.
p = MethodOptionParser(fixed=["files...", "dst"])
p.add_option('-s', '--serialize', action='store', help="Number of nodes to store to concurrently", default=False, type=int)
(options, args) = p.parse_args(args)
dst = args[-1]
args = args[:-1]
args = multiglob(args)
if len(args) > 1:
dst += "/"
replargs = [dst + os.path.basename(i) for i in args]
failover = 3
while failover:
errors, results = self._Locations().put(args, dst, pickn=1, serialize=options.serialize).collect(markInactive=self._Locations().config)
if not errors:
failover -= 1
def _putJobs(self, tmpdir, maxErrors):
self._Locations().put([tmpdir + "*"], "/jobs/").collect(maxErrors=maxErrors, markInactive=self._Locations().config)
def map(self, args):
"""Launch a computation on a set of input files in the cloud.
Run the specified command on each input file (in the virtual store)
described by the inputglob. Multiple inputglob arguments can be
given. The -c option says that the commond should continue to run on
new inputs as they arrive.
Multiple commands can be chained together with |. Each output file of
the first command becomes an input for the next command in the
The -f option says that any previously cached output should be ignored
and the program re-run.
p = MethodOptionParser(fixed=["cmd [| cmd...]"])
p.add_option("-i", "--inputglob", action="append", help="Glob of input files (in root)")
p.add_option("-c", "--continuous", action="store_true", help="Continue to look for new input files to arrive")
p.add_option("-f", "--fresh", action="store_true", help="Do not use any cached output")
p.add_option("-q", "--quiet", action="count", help="Do not fetch and display stdout (or stderr if -qq)")
p.add_option("-o", "--optimistic", action="store_true", help="Try to run even if there are many failures submitting job")
p.add_option("-p", "--procspercpu", action="store", help="Number of processes to run per CPU (default=1)", default=1)
p.add_option("-s", "--statusonly", action="store_true", help="Don't wait for completion; fetch any available results now")
p.add_option("-e", "--checkexit", action="store_false", default=True, help="Do NOT stop on error or remove output from any previous run that had a non-zero exit status")
p.add_option("-m", "--mail", action="store_true", help="E-mail results")
p.add_option("-x", "--mx", action="store", help="SMTP server")
(options, cmd) = p.parse_args(args)
if not options.inputglob:
p.error("-i or --inputglob must be specified")
options.inputglob = set(options.inputglob)
if "-" in options.inputglob:
options.inputglob.update([l.strip() for l in sys.stdin])
options.inputglob = list(options.inputglob)
fresh = options.fresh # options.fresh gets rewritten later
cmdwords = shlex.split(string.join(cmd))
cmd = ' '.join(cmdwords)
cmds = []
# parse/replace pipe and reduce operators
# < redistribute
# > reduce (inputs on argv, presumed composable)
# >- reduce (inputs on stdin)
# Can be composed such as ">.<>." which reduces locally, redistributes, then reduces again
# The scheduler looks for reduce operations with the substrings:
# "stdin" which signifies to pass inputs on stdin rather than argv and
# "part" which signifies to only reduce inputs who are in a partition owned by this node (which is what you do after a redistribute).
# First split ><... and <>... into two tokens
while i < len(cmdwords):
if cmdwords[i].startswith("><"):
cmdwords.insert(i+1, cmdwords[i][1:])
cmdwords[i] = cmdwords[i][0]
if cmdwords[i].startswith("<>"):
cmdwords.insert(i+1, cmdwords[i][1:])
cmdwords[i] = cmdwords[i][0]
i += 1
#print >>sys.stderr, "cmdwords:", cmdwords
# Then split-up into pipeline elements (separated by a word that starts with [|<>.]+ word)
pipeline = [[]]
while cmdwords:
w = cmdwords.pop(0)
if w in ["||", ">", "<", ">-"]:
# Make a new pipeline element
if w != "||":
if pipeline[0] == []:
#print >>sys.stderr, "Pipeline:", pipeline
# Duplicate cmd when there are multiple reduces
i = 0
while i < len(pipeline):
if pipeline[i][0].startswith(">") and (len(pipeline[i]) == 1):
# Copy reduce command and modifier from next pipeline element
if i+2 >= len(pipeline) or pipeline[i+1][0] != "<":
print >>sys.stderr, "Cannot parse command ending in >"
return False
pipeline[i][1:] = pipeline[i+2][1:]
if pipeline[i+2][0].endswith("-"):
pipeline[i][0] += "-"
if pipeline[i][0] == "<" and len(pipeline) > i+1:
pipeline[i+1][0] += "part"
i += 1
#print >>sys.stderr, "Pipeline2:", pipeline
if options.mail:
import smtplib
import socket
import email.mime.multipart
import email.mime.image
import email.mime.text
import email.mime.base
import email.encoders
user = pwd.getpwuid(os.getuid()).pw_name
host = socket.getfqdn()
domain = '.'.join(host.split('.')[1:])
mx =
import dns.resolver
mx = dns.resolver.query(domain, 'MX')[0]
mx = str(
mx = "mail"
# If we couldn't get a good FQDN from the default interface,
# try again on the interface used to talk to the MX
if not domain or domain == "localdomain":
# Try on an actual socket to the mx
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((mx, 25))
print >>sys.stderr, "Could not connect to mail exchanger '%s'" % mx
return 1
host = socket.gethostbyaddr(s.getsockname()[0])[0]
domain = '.'.join(host.split('.')[1:])
fromaddr = '@'.join([user,host])
toaddr = '@'.join([user,domain])
print "Results will be in mail from %s to %s via %s" % (fromaddr, toaddr, mx)
msg = email.mime.multipart.MIMEMultipart()
msg['Subject'] = "Filemap results for '%s'" % ("' '".join(args))
msg['From'] = fromaddr
msg['To'] = toaddr
stdout = cStringIO.StringIO()
realstderr = sys.stderr
sys.stdout = stdout
sys.stderr = stdout
finished = self._domap(options, pipeline)
if options.mail:
output = ""
for line in stdout.getvalue().split("\n"):
#if len(line) > 500:
#line = line[:500] + "..."
output += line + "\n"
msg.attach(email.mime.text.MIMEText('<pre>' + output + '</pre>', 'html'))
sys.stderr = realstderr
sys.stdout = realstderr
if finished:
imgfile = cStringIO.StringIO()
obj = email.mime.base.MIMEBase('application','octet-stream')
obj.add_header('Content-Disposition', 'attachment', filename="fm.stats.gz")
self.plot([".fm.stats.gz", imgfile])
img = email.mime.image.MIMEImage(imgfile.getvalue())
img.add_header('Content-Disposition', 'inline', filename="fmstats.png")
print >>sys.stderr, "Unable to generate plot:"
# Send the email via our own SMTP server.
s = smtplib.SMTP(mx)
s.sendmail(fromaddr, toaddr, msg.as_string())
if options.mail:
sys.stderr = realstderr
print >>realstderr, stdout.getvalue()
return None
def _domap(self, options, pipeline):
if options.statusonly:
tmpdir = None
tmpdir = mkdtemp(prefix="map") + "/"
inglobs = options.inputglob
allinglobs = set(inglobs)
outdirglobs = []
statglobs = []
assert (type(inglobs) == type([]))
starttime = None
jobname = None
joblist = []
if options.optimistic:
maxErrors = None
maxErrors = 0
# If replicate uses pickn=True, then use self._Locations().this.replication-1 here
# since pickn defaults to false, to default to no errors allowed
for cmd in pipeline:
reduce = False
# Global barrier
if cmd[0] == "<" or cmd[0][0] == ">":
if jobname and not options.statusonly:
# Install the jobs on each node
self._putJobs(tmpdir, None)
if not options.statusonly:
tmpdir = mkdtemp(prefix="map") + "/"
if not starttime:
starttime = time.time()
if not options.statusonly:
if jobname:
args = ["wait", jobname]
jobname = None # This won't be a parent of the first thing after the barrier
args = []
if cmd[0] == "<":
args += ["redistribute"] + inglobs
if args:
# Barrier; Wait for that job (and its parents) to finish:
if Verbose > -1:
print >>sys.stderr, "Waiting for %s" % ' '.join(args)