diff --git a/pyppl/__init__.py b/pyppl/__init__.py index 209fcb59..4d72757d 100644 --- a/pyppl/__init__.py +++ b/pyppl/__init__.py @@ -1,5 +1,6 @@ from pyppl import pyppl from pyppl import proc +from pyppl import job from pyppl import aggr from pyppl import channel from pyppl import utils diff --git a/pyppl/helpers/__init__.py b/pyppl/helpers/__init__.py index fddf7303..8e188783 100644 --- a/pyppl/helpers/__init__.py +++ b/pyppl/helpers/__init__.py @@ -1,4 +1,5 @@ from channel import channel from proc import proc from aggr import aggr +from job import job import utils \ No newline at end of file diff --git a/pyppl/helpers/channel.py b/pyppl/helpers/channel.py index 5cf624f1..a2452b48 100644 --- a/pyppl/helpers/channel.py +++ b/pyppl/helpers/channel.py @@ -1,4 +1,5 @@ from copy import copy as pycopy +import utils class channel (list): @@ -13,8 +14,6 @@ def create (l = []): @staticmethod def fromChannels (*args): ret = channel.create() - if not args: - return ret ret.merge (*args) return ret @@ -43,33 +42,30 @@ def fromPairs (pattern): return c @staticmethod - def fromArgv (width = 1): + def fromArgv (): from sys import argv + ret = channel.create() args = argv[1:] alen = len (args) - if alen == 0: return channel() - if width == None: width = alen - if alen % width != 0: - raise Exception('Length (%s) of argv[1:] must be exactly divided by width (%s)' % (alen, width)) + if alen == 0: return ret - ret = channel() - for i in xrange(0, alen, width): - tmp = () - for j in range(width): - tmp += (args[i+j], ) - ret.append (tmp) + width = None + for arg in args: + items = channel._tuplize(utils.split(arg, ',')) + if width is not None and width != len(items): + raise ValueError('Width %s (%s) is not consistent with previous %s' % (len(items), arg, width)) + width = len(items) + ret.append (items) return ret @staticmethod def _tuplize (tu): - if isinstance(tu, str) or isinstance(tu, unicode): + if isinstance(tu, (str, unicode)): tu = (tu, ) else: - try: - iter(tu) - except: - tu = (tu, ) - return tu + try: iter(tu) + except: tu = (tu, ) + return tuple(tu) # expand the channel according to the files in , other cols will keep the same # [(dir1/dir2, 1)].expand (0, "*") will expand to @@ -106,11 +102,9 @@ def copy (self): return pycopy(self) def width (self): - if not self: - return 0 + if not self: return 0 ele = self[0] - if not isinstance(ele, tuple): - return 1 + if not isinstance(ele, tuple): return 1 return len(ele) def length (self): @@ -124,69 +118,76 @@ def filter (self, func): def reduce (self, func): return channel.create(reduce(func, self)) - - def merge (self, *args): - if not args: return - maxlen = max(map(len, args)) - minlen = min(map(len, args)) - if maxlen != minlen: - raise Exception('Cannot merge channels with different length (%s, %s).' % (maxlen, minlen)) - clen = len (self) - if clen != 0 and clen != maxlen: - raise Exception('Cannot merge channels with different length (%s, %s).' % (maxlen, clen)) - - for i in range(maxlen): - tu = () if clen==0 else channel._tuplize(self[i]) - for arg in args: - tu += channel._tuplize (arg[i]) - if clen == 0: - self.append(tu) - else: - self[i] = tu + + def rbind (self, row): + row = channel._tuplize(row) + if self.length() != 0 and self.width() != len(row): + raise ValueError ('Cannot bind row (len: %s) to channel (width: %s): width is different.' % (len(row), self.width())) + self.append (row) return self - def insert (self, idx, val): - idx = self.width() if idx is None else idx - if not isinstance(val, list): - val = [val] - if len (val) == 1: - val = val * len (self) - elif len(val) != len(self): - raise Exception('Cannot merge channels with different length (%s, %s).' % (len(self), len(val))) - for i in range(len(self)): - ele = list (self[i]) - newele = ele[:idx] + list(channel._tuplize(val[i])) + ele[idx:] - self[i] = tuple (newele) + def rbindMany (self, *rows): + for row in rows: self.rbind(row) return self - def split (self): - ret = [] - for i, tu in enumerate(self): - if isinstance(tu, str): - tu = (tu, ) - try: - iter(tu) - except: - tu = (tu, ) - if i == 0: - for t in tu: ret.append(channel()) - - for j, t in enumerate(tu): - ret[j].append(channel._tuplize(t)) + def cbind (self, col): + if not isinstance(col, list): col = [col] + if len (col) == 1: col = col * max(1, self.length()) + if self.length() == 0 : + for ele in col: self.append (channel._tuplize(ele)) + elif self.length() == len (col): + for i in range (self.length()): + self[i] += channel._tuplize(col[i]) + else: + raise ValueError ('Cannot bind column (len: %s) to channel (length: %s): length is different.' % (len(col), self.length())) + return self + + def cbindMany (self, *cols): + for col in cols: + self.cbind(col) + return self + + def merge (self, *chans): + for chan in chans: + if not isinstance(chan, channel): chan = channel.create(chan) + cols = [x.toList() for x in chan.split()] + self.cbindMany (*cols) + return self + + def colAt (self, index): + return self.slice (index, 1) + + def slice (self, start, length = None): + if start is None: start = self.width() + if length is None: length = self.width() + if start < 0: start = start + self.width() + if start >= self.width(): return channel.create() + ret = channel.create() + if length == 0: return ret + for ele in self: + row = tuple (ele[start:start+length]) + ret.rbind (row) return ret + def insert (self, index, col): + if not isinstance(col, list): col = [col] + if len (col) == 1: col = col * max(1, self.length()) + part1 = self.slice (0, index) + part2 = self.slice (index) + del self[:] + self.merge (part1) + self.cbind (col) + self.merge (part2) + return self + + def split (self): + return [ self.colAt(i) for i in range(self.width()) ] + def toList (self): # [(a,), (b,)] to [a, b], only applicable when width =1 if self.width() != 1: - raise Exception ('Width = %s, but expect width = 1.' % self.width()) + raise ValueError ('Width = %s, but expect width = 1.' % self.width()) - ret = [] - for e in self: - if isinstance(e, list): - v = e # support list elements - else: - (v, ) = e - ret.append(v) - return ret + return [ e[0] if isinstance(e, tuple) else e for e in self ] diff --git a/pyppl/helpers/job.py b/pyppl/helpers/job.py new file mode 100644 index 00000000..be4a4043 --- /dev/null +++ b/pyppl/helpers/job.py @@ -0,0 +1,111 @@ +from hashlib import md5 +from glob import glob +from os import path, remove, symlink, makedirs +from shutil import rmtree, copytree, copyfile, move +import gzip +import utils + + +class job (object): + + def __init__(self, index, workdir, input = None, output = None): + + self.script = path.join (workdir, "scripts", "script.%s" % index) + self.rcfile = path.join (workdir, "scripts", "script.%s.rc" % index) + self.outfile = path.join (workdir, "scripts", "script.%s.stdout" % index) + self.errfile = path.join (workdir, "scripts", "script.%s.stderr" % index) + self.input = {'var':[], 'file':[], 'files':[]} if input is None else input + self.output = {'var':[], 'file':[]} if output is None else input + self.index = index + + def signature (self): + sSig = utils.fileSig(self.script) + iSig = '' + oSig = '' + for val in self.input['var']: + val = str(val) + iSig += "|var:" + md5(val).hexdigest() + for val in self.input['file']: + iSig += "|file:" + utils.fileSig(val) + iSig += "|files" + for val in self.input['files']: + for v in val: iSig += ':' + utils.fileSig(v) + + for val in self.output['var']: + val = str(val) + oSig += "|var:" + md5(val).hexdigest() + for val in self.output['file']: + oSig += "|file:" + utils.fileSig(val) + return md5(sSig + ';' + iSig + ';' + oSig).hexdigest() + + def rc (self): + if not path.exists (self.rcfile): return -99 + rccodestr = open (self.rcfile).read().strip() + return int(rccodestr) if rccodestr else -99 + + # whether output files are generated + def outfileGenerated (self): + for outfile in self.output['file']: + if not path.exists (outfile): + raise Exception ('[Job#%s]: Output file not generated: %s' % (self.index, outfile)) + + # use export as cache + def exportCached (self, exdir, how, log): + if how == 'symlink': + raise ValueError ('Unable to use export cache when you export using symlink.') + if not exdir: + raise ValueError ('Output files not exported, cannot use them for caching.') + + exfiles = [path.join (exdir, path.basename(outfile)) for outfile in self.output['file']] + for i, exfile in enumerate(exfiles): + outfile = self.output['file'][i] + if how == 'gzip': + gzfile = exfile + '.gz' + tgzfile = exfile + '.tgz' + if not path.exists(gzfile) and not path.exists(tgzfile): return False + if path.exists(outfile): + log ('Overwrite file/dir to use export for caching: %s' % outfile, 'warning') + if path.isdir (outfile): rmtree (outfile) + else: remove (outfile) + if path.exists(gzfile): + utils.ungz (gzfile, outfile) + elif path.exists(tgzfile): + makedirs(outfile) + utils.untargz (tgzfile, outfile) + else: + if not path.exists (exfile): return False + if path.exists(outfile): + log ('Overwrite file/dir to use export for caching: %s' % outfile, 'warning') + if path.isdir (outfile): rmtree (outfile) + else: remove (outfile) + symlink (exfile, outfile) + return True + + def export (self, exdir, how, ow, log): + for outfile in self.output['file']: + bn = path.basename (outfile) + target = path.join (exdir, bn) + + if path.exists (target): + if ow: + if path.isdir (target):rmtree (target) + else: remove (target) + else: + log('%s (target exists, skipped)' % target, 'warning') + if not path.exists (target): + log ('%s (%s)' % (target, how), 'info', 'EXPORT') + if how == 'copy': + if path.isdir (outfile): copytree (outfile, target) + else: copyfile (outfile, target) + elif how == 'move': + move (outfile, target) + symlink(target, outfile) # make sure dependent proc can run + elif how == 'symlink': + symlink (outfile, target) + elif how == 'gzip': + + if path.isdir (outfile): + utils.targz (target + '.tgz', outfile) + else: + utils.gz (target + '.gz', outfile) + \ No newline at end of file diff --git a/pyppl/helpers/proc.py b/pyppl/helpers/proc.py index 91d207fe..018b8e06 100644 --- a/pyppl/helpers/proc.py +++ b/pyppl/helpers/proc.py @@ -1,14 +1,11 @@ -import logging, os, pickle, shlex, shutil, threading, sys +import os, pickle, shlex, shutil, threading, sys import copy as pycopy -from random import randint -from glob import glob from time import sleep -from traceback import extract_stack +from random import randint from channel import channel from aggr import aggr +from job import job as pjob import utils -from md5 import md5 -from re import split from subprocess import Popen, PIPE from multiprocessing import cpu_count from Queue import Queue @@ -22,8 +19,18 @@ class proc (object): runners = {} ids = {} + alias = { + 'exdir': 'exportdir', + 'exhow': 'exporthow', + 'exow': 'exportow', + 'errhow': 'errorhow', + 'errntry': 'errorntry', + 'lang': 'defaultSh', + 'rc': 'retcodes', + } def __init__ (self, tag = 'notag'): + # computed props self.__dict__['props'] = {} # configs @@ -36,7 +43,7 @@ def __init__ (self, tag = 'notag'): # where cache file and wdir located self.config['tmpdir'] = os.path.abspath("./workdir") self.config['forks'] = 1 - self.config['cache'] = True + self.config['cache'] = True # False or 'export' or 'export+' (do True if failed do export) self.config['retcodes'] = [0] self.config['echo'] = False self.config['runner'] = 'local' @@ -53,16 +60,13 @@ def __init__ (self, tag = 'notag'): self.config['afterCmd'] = "" self.config['workdir'] = '' self.config['args'] = {} - self.config['channel'] = channel() - self.config['callback'] = None - self.config['callfront'] = None + self.config['channel'] = channel.create() self.config['aggr'] = None + self.config['callback'] = None # init props # id of the process, actually it's the variable name of the process self.props['id'] = pid - # whether the process is cached or not - #self.props['cached'] = True # the tag self.props['tag'] = tag @@ -72,10 +76,6 @@ def __init__ (self, tag = 'notag'): self.props['depends'] = [] # the script self.props['script'] = "" - # the script prepend to the script - self.props['preScript'] = "" - # the script append to the script - self.props['postScript'] = "" self.props['input'] = {} self.props['output'] = {} @@ -95,37 +95,36 @@ def __init__ (self, tag = 'notag'): self.props['errorhow'] = self.config['errorhow'] self.props['errorntry'] = self.config['errorntry'] self.props['jobs'] = [] + self.props['ncjobids'] = [] # non-cached job ids self.props['defaultSh'] = self.config['defaultSh'] - self.props['isValid'] = True - self.props['channel'] = channel() + self.props['channel'] = channel.create() self.props['length'] = 0 self.props['sets'] = [] - self.props['infiletime'] = 0 - self.props['outfiles'] = [] - self.props['infiles'] = [] self.props['procvars'] = {} self.props['workdir'] = '' - self.props['logger'] = logging.getLogger(__name__) + self.props['logger'] = utils.getLogger('debug', self.__class__.__name__ + utils.randstr()) self.props['args'] = self.config['args'] - self.props['callback'] = self.config['callback'] - self.props['callfront'] = self.config['callfront'] self.props['indir'] = '' self.props['outdir'] = '' - self.props['cached'] = False self.props['aggr'] = self.config['aggr'] + self.props['callback'] = self.config['callback'] def __getattr__ (self, name): - if not self.props.has_key(name) and not name.endswith ('Runner'): - raise AttributeError('Property %s not found in ' % name) + if not self.props.has_key(name) and not proc.alias.has_key(name) and not name.endswith ('Runner'): + raise ValueError('Property %s not found in pyppl.proc' % name) + if proc.alias.has_key(name): name = proc.alias[name] return self.props[name] def __setattr__ (self, name, value): - if not self.config.has_key(name) and not name.endswith ('Runner'): - raise AttributeError('Cannot set %s for ' % name) + if not self.config.has_key(name) and not proc.alias.has_key(name) and not name.endswith ('Runner'): + raise ValueError('Cannot set property "%s" for ' % name) + if proc.alias.has_key(name): name = proc.alias[name] + self.config[name] = value self.props [name] = value - self.props['sets'].append(name) + self.sets.append(name) + if (name == 'output' or name == 'input') and isinstance(value, list) and isinstance(value[0], tuple): self.config[name] = OrderedDict(value) self.props [name] = OrderedDict(value) @@ -135,33 +134,34 @@ def __setattr__ (self, name, value): self.props['depends'] = [self.depends] elif isinstance(self.depends, aggr): self.props['depends'] = self.depends.ends - for depend in self.props['depends']: - depend.props['nexts'].append (self) + for depend in self.depends: + depend.nexts.append (self) def setLogger (self, logger): self.props['logger'] = logger def log (self, msg, level="info", flag=None): - flag = level.upper().rjust(7) if flag is None else flag + if flag is None: flag = level + flag = flag.upper().rjust(7) flag = "[%s]" % flag title = "%s%s.%s:" % (("%s -> " % self.aggr if self.aggr else ""), self.id, self.tag) func = getattr(self.logger, level) func ("%s %s %s" % (flag, title, msg)) def copy (self, tag=None, newid=None): - newproc = pycopy.deepcopy (self) - if tag is not None: - newproc.tag = tag - + newproc = pycopy.copy (self) + if tag is not None: newproc.tag = tag pid = utils.varname('\w+\.' + self.copy.__name__, 3) newproc.props['pid'] = pid if newid is None else newid return newproc def _suffix (self): - config = pycopy.copy(self.config) - if config.has_key('workdir'): - del config['workdir'] - + config = { key:val for key, val in self.config.iteritems() if key not in ['workdir'] } + + if config.has_key ('callback'): + config['callback'] = utils.funcSig(config['callback']) + + # proc is not picklable if config.has_key('depends'): depends = config['depends'] pickable_depends = [] @@ -172,116 +172,119 @@ def _suffix (self): for depend in depends: pickable_depends.append(depend.id + '.' + depend.tag) config['depends'] = pickable_depends - - if config.has_key('nexts'): - nexts = config['nexts'] - pickable_nexts = [] - if isinstance(nexts, proc): - nexts = [nexts] - for n in nexts: - pickable_nexts.append(ndepend.id + '.' + n.tag) - config['nexts'] = pickable_nexts - - if config.has_key ('callback'): - config['callback'] = utils.funcSig(config['callback']) - - if config.has_key ('callfront'): - config['callfront'] = utils.funcSig(config['callfront']) + # lambda not pickable if config.has_key ('input') and isinstance(config['input'], dict): config['input'] = pycopy.deepcopy(config['input']) for key, val in config['input'].iteritems(): config['input'][key] = utils.funcSig(val) if callable(val) else val - - signature = pickle.dumps(config) + '@' + pickle.dumps(sorted(sys.argv)) + + signature = pickle.dumps(config) return utils.uid(signature) def _tidyBeforeRun (self): self._buildProps () - self.log (self.workdir, 'info', 'RUNNING') self._buildInput () self._buildOutput () self._buildScript () def _tidyAfterRun (self): - if self._checkStatus (): - self._export () + sucJids = self._checkStatus () + if sucJids == False: return + self._doCache (sucJids) + if len (sucJids) != self.length: + log ('Export and callback will not be performed until all jobs run successfully.', 'warning') + else: + self._export () if callable (self.callback): - self.log ('Calling callback ...', 'debug') - #self.logger.info ('[ DEBUG] %s%s.%s: Calling callback ...' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag)) - self.callback (self) - self._doCache () + self.log('Calling callback ...', 'debug') + self.callback (self) - def _init (self, config): + def run (self, config = {}): + self.logger.info ('[ START] ' + utils.padBoth(' ' + self.id + '.' + self.tag + ' ', 70, '>', '<')) self._readConfig (config) - self.props['cached'] = self._isCached() - if self.cached: return False - if callable (self.callfront): - self.log ('Calling callfront ...', 'debug') - #self.logger.info ('[ DEBUG] %s%s.%s: Calling callfront ...' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag)) - self.callfront (self) - self.props['infiles'] = [] - self.props['outfiles'] = [] - self.props['jobs'] = [] - ''' - for n in self.nexts: # if i am not cached, then none of depends - self.logger.debug ('[ DEBUG] %s%s.%s: I`m not cached, so my dependent %s have to rerun.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, n.id)) - n.cache = False - ''' self._tidyBeforeRun () - return True - - def run (self, config = {}): - if not self._init(config): - self.log ('Calling callfront ...', 'debug') - #self.logger.info ('[ CACHED] %s%s.%s: %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, self.workdir)) - #self._tidyAfterRun () - return - if self._runCmd('beforeCmd') != 0: - raise Exception ('Failed to run ') - self._runJobs() - self._runCmd('afterCmd') + raise Exception ('Failed to run beforeCmd: %s' % self.beforeCmd) + if not self._isCached(): + self.log (self.workdir, 'info', 'RUNNING') + self._runJobs() if self._runCmd('afterCmd') != 0: - raise Exception ('Failed to run ') + raise Exception ('Failed to run afterCmd: %s' % self.afterCmd) self._tidyAfterRun () - def _checkStatus (self): # whether return values allowed or outfiles generated - for i in range(self.length): - rcfile = os.path.join (self.workdir, 'scripts', 'script.%s.rc' % i) - rc = 0 - with open (rcfile, 'r') as f: - rc = f.read().strip() - rc = -1 if rc == '' else int(rc) - if rc not in self.retcodes: - if not self.echo: - errfile = os.path.join (self.workdir, 'scripts', 'script.%s.stderr' % i) - errmsgs = ['[ ERROR] ! ' + line.strip() for line in open(errfile)] - if not errmsgs: errmsgs = ['[ ERROR] ! '] - self.log('See STDERR below', 'error') - #self.logger.info ('[ ERROR] %s%s.%s: See STDERR below.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag)) - for errmsg in errmsgs: - self.logger.error (errmsg) - raise Exception ('[#%s]: Return code is %s, but %s expected.' % (i, rc, self.retcodes)) - - for of in self.outfiles: - if not os.path.exists (of): - raise Exception ('Output file %s not generated.' % (of)) - return True - + # False: totally failed + # [1,2,3,...]: successful, successful job indices + def _checkStatus (self): + ret = [] + for i in self.input['#']: + job = self.jobs[i] + job.outfileGenerated() + rc = job.rc () + if rc in self.retcodes: + ret.append (i) + continue + + if not self.echo: # if echo is on, errors are already printed + self.log('See STDERR below for job#%s' % i, 'error') + errmsgs = [] + if os.path.exists (job.errfile): + errmsgs = ['[ ERROR] ! ' + line.strip() for line in open(job.errfile)] + if not errmsgs: errmsgs = ['[ ERROR] ! '] + for errmsg in errmsgs: self.logger.error(errmsg) + raise Exception ('[Job#%s]: Return code is %s, but %s expected.' % (i, rc, self.retcodes)) + + if not ret: return False + return ret + + # create link in indir and set input + def _prepInfile (self, infile, key, index, multi=False): + if not self.input.has_key(key): self.input[key] = [''] * self.length + if not self.input.has_key(key + '.bn'): self.input[key + '.bn'] = [''] * self.length + if not self.input.has_key(key + '.fn'): self.input[key + '.fn'] = [''] * self.length + if not self.input.has_key(key + '.ext'): self.input[key + '.ext'] = [''] * self.length + + job = self.jobs[index] + srcfile = os.path.abspath(infile) + bn = os.path.basename(srcfile) + infile = os.path.join (self.indir, bn) + fn, ext = os.path.splitext (os.path.basename(infile)) + if not os.path.exists (infile): + os.symlink (srcfile, infile) + else: + log ('Input file already exists: %s' % infile, 'warning') + + if multi: + if not isinstance(self.input[key][index], list): + self.input[key][index] = [infile] + self.input[key + '.bn'][index] = [bn] + self.input[key + '.fn'][index] = [fn] + self.input[key + '.ext'][index] = [ext] + else: + self.input[key][index].append(infile) + self.input[key + '.bn'][index].append(bn) + self.input[key + '.fn'][index].append(fn) + self.input[key + '.ext'][index].append(ext) + job.input['files'].append(infile) + else: + self.input[key][index] = infile + self.input[key + '.bn'][index] = bn + self.input[key + '.fn'][index] = fn + self.input[key + '.ext'][index] = ext + job.input['file'].append(infile) + def _buildProps (self): #print getsource(self.input.values()[0]) if isinstance (self.retcodes, int): self.props['retcodes'] = [self.retcodes] if isinstance (self.retcodes, str): - self.props['retcodes'] = [int(i) for i in split(r'\s*,\s*', self.retcodes)] + self.props['retcodes'] = [int(i) for i in utils.split(self.retcodes, ',')] key = self.id + '.' + self.tag if key in proc.ids and proc.ids[key] != self: raise Exception ('A proc with id %s and tag %s already exists.' % (self.id, self.tag)) - else: - proc.ids[key] = self + proc.ids[key] = self if not 'workdir' in self.sets and not self.workdir: self.props['workdir'] = os.path.join(self.tmpdir, "PyPPL.%s.%s.%s" % (self.id, self.tag, self._suffix())) @@ -306,117 +309,94 @@ def _buildProps (self): def _buildInput (self): # if config.input is list, build channel from depends # else read from config.input - input0 = self.config['input'] - if isinstance(input0, list): - input0 = ', '.join(input0) - if isinstance(input0, str): - cs = channel.fromChannels(*[d.channel for d in self.depends]) if self.depends else channel.fromArgv(None) - input0 = {input0: cs} + input = self.config['input'] - if not isinstance(input0, dict): - raise Exception('Expect , or as input.') - - self.props['input'] = {} - for key, val in input0.iteritems(): - if callable (val): - #print getsource(val) - val = val (*[d.channel.copy() for d in self.depends]) if self.depends else val (channel.fromArgv(None)) - if not isinstance (val, channel): - val = channel.create(val) - - keys = split(r'\s*,\s*', key) - if self.length == 0: - self.props['length'] = val.length() - elif self.length != val.length(): - raise Exception ('Expect same lengths for input channels, but got %s and %s (keys: %s).' % (self.length, val.length(), keys)) - vals = val.split() - if len(keys) > len(vals): - raise Exception('%s%s.%s: Not enough data for input variables.\nVarialbes: %s\nData: %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, keys, vals)) + argvchan = channel.fromArgv() + depdchan = channel.fromChannels (*[d.channel for d in self.depends]) + if not isinstance (input, dict): + input = ','.join(utils.alwaysList (input)) + input = {input: depdchan if self.depends else argvchan} + + # expand to one key-channel pairs + inputs = {} + for keys, vals in input.iteritems(): + keys = utils.split(keys, ',') + if callable (vals): + vals = vals (depdchan if self.depends else argv) + vals = vals.split() + elif isinstance (vals, (str, unicode)): # only for files: "/a/b/*.txt, /a/c/*.txt" + vals = utils.split(vals, ',') + elif isinstance (vals, channel): + vals = vals.split() + elif isinstance (vals, list): + vals = channel.create(vals) + vals = vals.split() + else: + raise ValueError ("%s%s.%s: Unexpected values for input. Expect dict, list, str, channel, callable." % ( + ("%s -> " % self.aggr if self.aggr else ""), + self.id, self.tag)) + + width = len (vals) + if len (keys) > width: + raise ValueError ('%s%s.%s: Not enough data for input variables.\nVarialbes: %s\nData: %s' % ( + ("%s -> " % self.aggr if self.aggr else ""), + self.id, self.tag, + keys, vals)) + for i in range(width): + key = keys[i] + toExpand = (key.endswith(':files') or key.endswith(':paths')) and isinstance(vals[i], (str, unicode)) + chan = channel.fromPath(vals[i]) if toExpand else vals[i] + if self.length == 0: self.props['length'] = chan.length() + if self.length != chan.length(): + raise ValueError ('%s%s.%s: Expect same lengths for input channels, but got %s and %s (keys: %s).' % ( + ("%s -> " % self.aggr if self.aggr else ""), + self.id, self.tag, + self.length, chan.length(), key)) + inputs[key] = chan - self.props['input']['#'] = range(self.length) - for i, k in enumerate(keys): - vv = vals[i].toList() - if k.endswith (':files') or k.endswith (':paths'): - k = k[:-6] - for j, vs in enumerate(vv): - if not isinstance(vs, list): - vv[j] = glob(vs) # allow wildcard - for m, v in enumerate(vv[j]): - if not os.path.exists (v): - raise Exception('Input file %s does not exist.' % v) - v = os.path.abspath(v) - vv[j][m] = os.path.join(self.indir, os.path.basename(v)) - if v not in self.infiles: # doesn't need to do repeatedly - self.props['infiles'].append (v) - self.props['infiletime'] = max (self.infiletime, os.path.getmtime(v)) - - if os.path.islink(vv[j][m]): - self.log('Overwriting existing input file (link) %s' % vv[j][m], 'warning') - #self.logger.info ('[WARNING] %s%s.%s: Overwriting existing input file (link) %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, vv[j][m])) - os.remove (vv[j][m]) - if os.path.exists (vv[j][m]): - self.log('Overwriting existing file/dir %s' % vv[j][m], 'warning') - #self.logger.info ('[WARNING] %s%s.%s: Overwriting existing file/dir %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, vv[j][m])) - if os.path.isfile(vv[j][m]): - os.remove (vv[j][m]) - else: - shutil.rmtree(vv[j][m]) - os.symlink (v, vv[j][m]) - self.props['input'][k] = vv - self.props['input'][k + '.bn'] = [map (lambda x: os.path.basename(x), x) for x in vv] - self.props['input'][k + '.fn'] = [map (lambda x: os.path.basename(os.path.splitext(x)[0]), x) for x in vv] - self.props['input'][k + '.ext'] = [map (lambda x: os.path.splitext(x)[1], x) for x in vv] - elif k.endswith (':file') or k.endswith(':path'): - k = k[:-5] - for j, v in enumerate(vv): - #(v, ) = v - if not os.path.exists (v): - raise Exception('Input file %s does not exist.' % v) - v = os.path.abspath(v) - vv[j] = os.path.join(self.indir, os.path.basename(v)) - if v not in self.infiles: # doesn't need to do repeatedly - self.props['infiles'].append (v) - self.props['infiletime'] = max (self.infiletime, os.path.getmtime(v)) - - if os.path.islink(vv[j]): - self.log('Overwriting existing input file (link) %s' % vv[j], 'warning') - #self.logger.info ('[WARNING] %s%s.%s: Overwriting existing input file (link) %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, vv[j])) - os.remove (vv[j]) - if os.path.exists (vv[j]): - self.log('Overwriting existing file/dir %s' % vv[j], 'warning') - #self.logger.info ('[WARNING] %s%s.%s: Overwriting existing file/dir %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, vv[j])) - if os.path.isfile(vv[j]): - os.remove (vv[j]) - else: - shutil.rmtree(vv[j]) - os.symlink (v, vv[j]) - self.props['input'][k] = vv - self.props['input'][k + '.bn'] = map (lambda x: os.path.basename(x), vv) - self.props['input'][k + '.fn'] = map (lambda x: os.path.basename(os.path.splitext(x)[0]), vv) - self.props['input'][k + '.ext'] = map (lambda x: os.path.splitext(x)[1], vv) - - else: - if k.endswith(":var"): k = k[:-4] - self.props['input'][k] = vv + self.input = {'#': []} + for i in range (self.length): + self.jobs.append (pjob (i, self.workdir)) + self.input['#'].append(i) + self.ncjobids.append (i) + + for keyinfo, chan in inputs.iteritems(): + if keyinfo.endswith (':files') or keyinfo.endswith (':paths'): + key = keyinfo[:-6] + # [([f1,f2],), ([f3,f4],)] => [[f1,f2], [f3,f4]] + for i, ch in enumerate(chan.toList()): + for infile in ch: self._prepInfile (infile, key, i, True) + + elif keyinfo.endswith (':file') or keyinfo.endswith (':path'): + key = keyinfo[:-5] + for i, ch in enumerate(chan.toList()): + self._prepInfile (ch, key, i) + else: # var + if not keyinfo.endswith(':var'): keyinfo = keyinfo + ':var' + key = keyinfo[:-4] + self.input[key] = [] + for i, ch in enumerate(chan.toList()): + job = self.jobs[i] + job.input['var'].append (ch) + self.input[key].append (ch) ridx = randint(0, self.length-1) for key, val in self.input.iteritems(): self.log ('INPUT [%s/%s]: %s => %s' % (ridx, self.length-1, key, val[ridx]), 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s INPUT [%s/%s]: %s => %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, ridx, self.length-1, key, val[ridx])) # also add proc.props, mostly scalar values + alias = {val:key for key, val in proc.alias.iteritems()} for prop, val in self.props.iteritems(): if not prop in ['id', 'tag', 'tmpdir', 'forks', 'cache', 'workdir', 'echo', 'runner', 'errorhow', 'errorntry', 'defaultSh', 'exportdir', 'exporthow', 'exportow', 'args', 'indir', 'outdir', 'length']: continue if prop == 'args': for k, v in val.iteritems(): self.props['procvars']['proc.args.' + k] = v self.log('PROC_ARGS: %s => %s' % (k, v), 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s PROC_ARGS: %s => %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, k, v)) else: + if alias.has_key (prop): prop = alias[prop] + else: self.log ('PROC_VARS: %s => %s' % (prop, val), 'debug') self.props['procvars']['proc.' + prop] = val - self.log ('PROC_VARS: %s => %s' % (prop, val), 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s PROC_VARS: %s => %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, prop, val)) - + """ Output could be: 1. list: ['output:var:{input}', 'outfile:file:{infile.bn}.txt'] @@ -433,139 +413,66 @@ def _buildOutput (self): output = self.config['output'] - if isinstance(output, list): - output = ', '.join(output) - if isinstance(output, str): - output = {output: self.props['channel']} - - if not isinstance(output, dict): - raise Exception('Expect , or as output.') - - def sanitizeKey (key): - its = [it.strip() for it in utils.split(key, ':')] + if not isinstance (output, dict): + output = ','.join(utils.alwaysList (output)) + else: + output = ','.join([key + ':' + val for key, val in output.iteritems()]) - if len(its) == 1: - its = ['__out%s__' % sanitizeKey.out_idx, 'var', its[0]] - sanitizeKey.out_idx += 1 - elif len(its) == 2: - if its[0] in ['var', 'file', 'path']: - its = ['__out%s__' % sanitizeKey.out_idx, its[0], its[1]] - sanitizeKey.out_idx += 1 - else: - its = [its[0], 'var', its[1]] - elif its[1] not in ['var', 'file', 'path']: - raise Exception ('Expect type: var, file or path instead of %s' % items[1]) - return tuple (its) - sanitizeKey.out_idx = 1 - self.props['output'] = {} - for key, val in output.iteritems(): - keys = utils.split(key, ',') + for key in utils.split(output, ','): + (oname, otype, oexp) = utils.sanitizeOutKey(key) + if otype in ['file', 'path', 'dir']: oexp = os.path.join (self.outdir, oexp) + self.props['output'][oname] = [] - for k in keys: - (oname, otype, oexp) = sanitizeKey(k) - if self.input.has_key(oname): - raise Exception ('Ouput variable name %s is already taken by input' % oname) - - if otype in ['file', 'path']: - oexp = os.path.join (self.outdir, oexp) - # build channels - chv = [] - for i in range(self.length): - data = {} - for ink, inv in self.input.iteritems(): - data[ink] = inv[i] - data.update (self.procvars) - chv.append (utils.format (oexp, data)) - if otype in ['file', 'path']: - self.props['outfiles'] += chv - chv = channel.create (chv) - try: - val.merge(chv) - except Exception as e: - raise Exception('%s.%s: %s\nChannel 1: %s\nChannel 2: %s' % (self.id, self.tag, e, val[:3], chv[:3])) - if val != self.channel: - self.props['channel'].merge (chv) - self.props['output'][oname] = chv.toList() + for i in self.input['#']: + data = {key:val[i] for key, val in self.input.iteritems()} + data.update (self.procvars) + val = utils.format(oexp, data) + if otype == 'dir' and not os.path.exists (val): + os.makedirs (val) + self.props['output'][oname].append (val) + self.props['jobs'][i].output['var' if otype == 'var' else 'file'].append(val) + self.props['channel'].merge(self.props['output'][oname]) + + utils.sanitizeOutKey.index = 0 ridx = randint(0, self.length-1) for key, val in self.output.iteritems(): self.log ('OUTPUT [%s/%s]: %s => %s' % (ridx, self.length-1, key, val[ridx]), 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s OUTPUT [%s/%s]: %s => %s' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, ridx, self.length-1, key, val[ridx])) def _buildScript (self): # make self.jobs - if not self.script: - #raise Exception ('Please specify script to run') - self.log ('No script specified', 'warning') - #self.logger.warning ('[WARNING] %s%s.%s No script specified' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag)) + if not self.script: self.log ('No script specified', 'warning') scriptdir = os.path.join (self.workdir, 'scripts') + script = self.script.strip() - script = self.script.strip() if script.startswith ('template:'): tplfile = script[9:].strip() if not os.path.exists (tplfile): - raise Exception ('Script template file %s does not exist.' % tplfile) - with open (tplfile, 'r') as f: - script = f.read().strip() + raise ValueError ('Script template file "%s" does not exist.' % tplfile) + script = open(tplfile).read().strip() if not script.startswith ("#!"): script = "#!/usr/bin/env " + self.defaultSh + "\n\n" + script - - for i in range(self.length): - data = {} - idx = self.input['#'][i] - for k,v in self.input.iteritems(): - data[k] = v[idx] - for k,v in self.output.iteritems(): - data[k] = v[idx] + for index in self.input['#']: + data = {key:val[index] for key, val in self.input.iteritems()} + data.update({key:val[index] for key, val in self.output.iteritems()}) data.update(self.procvars) - script1 = utils.format (script, data) + jscript = utils.format (script, data) - scriptfile = os.path.join (scriptdir, 'script.%s' % idx) - with open(scriptfile, 'w') as f: - f.write (script1) - - self.jobs.append (scriptfile) + scriptfile = os.path.join (scriptdir, 'script.%s' % index) + open (scriptfile, 'w').write (jscript) def _export (self): if not self.exportdir: return if not os.path.exists(self.exportdir): os.makedirs (self.exportdir) - - for outfile in self.outfiles: - bn = os.path.basename (outfile) - target = os.path.join (self.exportdir, bn) - - if os.path.exists (target): - if self.exportow == True: - if os.path.isdir (target): - shutil.rmtree (target) - else: - os.remove (target) - else: - self.log('%s (target exists, skipped)' % target, 'warning') - #self.logger.info ('[ EXPORT] %s%s.%s: %s (target exists, skipped)' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, target)) - - if not os.path.exists (target): - self.log ('%s (%s)' % (target, self.exporthow), 'info') - #self.logger.info ('[ EXPORT] %s%s.%s: %s (%s)' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, target, self.exporthow)) - if self.exporthow == 'copy': - if os.path.isdir (outfile): - shutil.copytree (outfile, target) - else: - shutil.copyfile (outfile, target) - elif self.exporthow == 'move': - shutil.move (outfile, target) - os.symlink(target, outfile) # make sure dependent proc can run - elif self.exporthow == 'symlink': - os.symlink (outfile, target) + for job in self.jobs: + job.export (self.exportdir, self.exporthow, self.exportow, self.log) def _readConfig (self, config): - conf = pycopy.copy (config) - for s in self.sets: - if conf.has_key(s): del conf[s] + conf = { key:val for key, val in config.iteritems() if key not in self.sets } self.config.update (conf) for key, val in conf.iteritems(): @@ -577,80 +484,61 @@ def _readConfig (self, config): self._suffix() ) - def _doCache (self): + def _doCache (self, jids): cachefile = os.path.join (self.tmpdir, self.cachefile) - with open (cachefile, 'w') as f: - props = pycopy.copy(self.props) - if props.has_key('logger'): - del props['logger'] - if props.has_key('depends'): - del props['depends'] - if props.has_key('nexts'): - del props['nexts'] - if props.has_key ('callback'): - del props['callback'] - if props.has_key ('callfront'): - del props['callfront'] - if props.has_key ('input'): - del props['input'] - pickle.dump(props, f) + jobsigs = [None] * self.length + for i in jids: + jobsigs[i] = self.jobs[i].signature() + with open(cachefile, 'w') as f: + pickle.dump(jobsigs, f) def _isCached (self): - - if not self.cache: + if self.cache == False: self.log ('Not cached, because proc.cache = False', 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, because proc.cache = False.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag)) - return False - - cachefile = os.path.join (self.tmpdir, self.cachefile) - if not os.path.exists(cachefile): - self.log ('Not cached, cache file %s not exists.' % cachefile, 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, cache file %s not exists.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, cachefile)) return False - - with open(cachefile, 'r') as f: - props = pickle.load(f) - self.props.update(props) - - # check input files, outputfiles - for infile in self.infiles: - if not os.path.exists(infile): - self.log ('Not cached, input file %s not exists.' % infile, 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, input file %s not exists.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, infile)) - return False - if os.path.getmtime(infile) > self.infiletime and self.infiletime != 0: - self.log ('Not cached, input file %s is newer.' % infile, 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, input file %s is newer.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, infile)) - return False - inlink = os.path.join(self.indir, os.path.basename (infile)) - if not os.path.islink (inlink): - self.log ('Not cached, input file link %s not exists.' % inlink, 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, input file link %s not exists.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, inlink)) + + sigCachedJids = [] + exCachedJids = [] + if self.cache in [True, 'export+']: + cachefile = os.path.join (self.tmpdir, self.cachefile) + if not os.path.exists(cachefile): + self.log ('Not cached, cache file %s not exists.' % cachefile, 'debug') return False + jobsigs = pickle.load(open(cachefile)) + sigCachedJids = [i for i in self.input['#'] if jobsigs[i] == self.jobs[i].signature()] + + elif self.cache in ['export', 'export+']: + exCachedJids = [i for i in self.input['#'] if self.jobs[i].exportCached(self.exportdir, self.exporthow, self.log)] + + else: + raise ValueError ('Cache option expects True/False/"export"/"export+"') - for outfile in self.outfiles: - if not os.path.exists(outfile): - self.log ('Not cached, output file %s not exists.' % outfile, 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, output file %s not exists' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, outfile)) - return False + cachedJids = [i for i in sigCachedJids if i not in exCachedJids] + exCachedJids + if not cachedJids: + self.log ('Not cached, none of the jobs are cached.', 'debug') + return False - for d in self.depends: - if not d.cached: - self.log ('Not cached, because my dependent %s.%s is not cached.' % (d.id, d.tag), 'debug') - #self.logger.debug ('[ DEBUG] %s%s.%s: Not cached, because my dependent %s is not cached.' % (("[AGGR: %s] " % self.aggr if self.aggr else ""), self.id, self.tag, d.id)) - return False - + if len (cachedJids) < self.length: + self.log ('Partly cached, only run non-cached jobs.', 'debug') + self.props['ncjobids'] = [i for i in self.input['#'] if i not in cachedJids] + self.log ('Truely cached jobs: %s' % sigCachedJids, 'debug') + self.log ('Export cached jobs: %s' % exCachedJids, 'debug') + self.log ('Jobs to be running: %s' % self.ncjobids, 'debug') + return False + + self.log ('Skip running jobs.', 'info', 'CACHED') return True def _runCmd (self, key): - if not self.props[key]: - return 0 - p = Popen (shlex.split(utils.format(self.props[key], self.procvars)), stdin=PIPE, stderr=PIPE, stdout=PIPE) + if not self.props[key]: return 0 + cmd = utils.format(self.props[key], self.procvars) + self.log ('Running <%s>: %s' % (key, cmd), 'info') + p = Popen (shlex.split(cmd), stdin=PIPE, stderr=PIPE, stdout=PIPE) if self.echo: for line in iter(p.stdout.readline, ''): - sys.stdout.write (line) + self.logger.info ('[ STDOUT] - ' + line.rstrip("\n")) for line in iter(p.stderr.readline, ''): - sys.stderr.write (line) + self.logger.error ('[ ERROR] ! ' + line.rstrip("\n")) return p.wait() def _runJobs (self): @@ -658,21 +546,23 @@ def _runJobs (self): def sworker (q): while True: q.get().submit() + sleep(.1) q.task_done() # run and wait jobs to finish def rworker(q): while True: q.get().wait () + sleep(.1) q.task_done() sq = Queue() rq = Queue() - for job in self.jobs: - rjob = proc.runners[self.runner] (job, self.props) + for i in self.ncjobids: + rjob = proc.runners[self.runner] (self.jobs[i], self.props) sq.put (rjob) rq.put (rjob) - + # submit jobs nojobs2submit = min (self.forks, len(self.jobs), int(cpu_count()/2)) for i in range (nojobs2submit): diff --git a/pyppl/helpers/utils.py b/pyppl/helpers/utils.py index 6c4cd672..7e8efdae 100644 --- a/pyppl/helpers/utils.py +++ b/pyppl/helpers/utils.py @@ -34,12 +34,9 @@ def method (self): ``` """ def varname (func, maxline = 20): - import re, random, inspect + import re, inspect frame = inspect.currentframe() frames = inspect.getouterframes(frame) - # frames[0] : this frame - # frames[1] : the func/method calling this one - # frames[2] : assignment frame = frames[2] src = ''.join(frame[4]) @@ -50,7 +47,7 @@ def varname (func, maxline = 20): m = re.search(varpat, src) if m: return m.group(2) - suffix = ''.join([random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijkllmnopqrstuvwxyz1234567890") for _ in range(8)]) + suffix = utils.randstr(8) thefunc = func if not '\\.' in func else func.split('\\.')[1] m = re.search(funcpat, src) if m: return thefunc + '_' + suffix @@ -64,6 +61,9 @@ def varname (func, maxline = 20): return thefunc + '_' + suffix +def randstr (length = 8): + import random + return ''.join([random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijkllmnopqrstuvwxyz1234567890") for _ in range(length)]) def split (s, delimter): ret = [] @@ -115,7 +115,7 @@ def split (s, delimter): start = i + 1 else: slash = 0 - ret.append (s[start:]) + ret.append (s[start:].strip()) return ret def format (tpl, args): @@ -172,3 +172,119 @@ def uid(s, l = 8, alphabet='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnop base = alphabet[i] + base return base[:l] + +def targz (tgzfile, srcdir): + from tarfile import open as taropen + from glob import glob + from os import chdir, getcwd + cwd = getcwd() + tar = taropen(tgzfile, 'w:gz') + chdir (srcdir) + for name in glob ('./*'): + tar.add(name) + tar.close() + chdir (cwd) + +def untargz (tfile, dstdir): + import tarfile + tar = tarfile.open (tfile, 'r:gz') + tar.extractall (dstdir) + tar.close() + +def gz (gzfile, srcfile): + from gzip import open as gzopen + from shutil import copyfileobj + fin = open (srcfile, 'rb') + fout = gzopen (gzfile, 'wb') + copyfileobj (fin, fout) + +def ungz (gzfile, dstfile): + from gzip import open as gzopen + from shutil import copyfileobj + fin = gzopen (gzfile, 'rb') + fout = open (dstfile, 'wb') + copyfileobj (fin, fout) + +# file signature, use absolute path and mtime +def fileSig (fn): + from os.path import realpath, abspath, getmtime + from hashlib import md5 + fn = abspath(realpath(fn)) + mtime = str(getmtime(fn)) + return md5(fn + '@' + mtime).hexdigest() + +# convert str to list separated by , +def alwaysList (data): + if isinstance(data, (str, unicode)): + ret = split (data, ',') + elif isinstance(data, list): + ret = [] + for d in data: + if ',' in d: ret += split(d, ',') + else: ret.append (d) + else: + raise ValueError('Expect string or list to convert to list.') + return map (lambda x: x.strip(), ret) + +# sanitize output key +def sanitizeOutKey (key): + parts = split(key, ':') + + if len(parts) == 1: + sanitizeOutKey.index += 1 + return ('__out.%s__' % sanitizeOutKey.index, 'var', key) + + if len(parts) == 2: + if parts[0] in ['var', 'file', 'path', 'dir']: + sanitizeOutKey.index += 1 + return ('__out.%s__' % sanitizeOutKey.index, parts[0], parts[1]) + else: + return (parts[0], 'var', parts[1]) + + if len(parts) == 3: + if parts[1] not in ['var', 'file', 'path', 'dir']: + raise ValueError ('Expect type: var, file or path instead of %s' % parts[1]) + else: + raise ValueError ('You have extra colons in output key: %s' % key) + + return tuple (parts) +sanitizeOutKey.index = 0 + +# convert script file to executable or add extract shebang to cmd line +def chmodX (thefile): + import os, stat + thefile = os.path.realpath(thefile) + ret = [thefile] + try: + st = os.stat (thefile) + os.chmod (thefile, st.st_mode | stat.S_IEXEC) + except Exception as e1: + try: + shebang = open (thefile).read().strip().split("\n")[0] + if not shebang.startswith("#!"): + raise Exception() + ret = shebang[2:].strip().split() + [thefile] + except Exception as e2: + raise Exception("Cannot change %s as executable or read the shebang from it:\n%s\n%s" % (thefile, e1, e2)) + return ret + +def getLogger (level = 'info', name='PyPPL'): + import logging + ch = logging.StreamHandler() + ch.setFormatter (logging.Formatter("[%(asctime)-15s] %(message)s")) + logger = logging.getLogger (name) + logger.setLevel (getattr(logging, level.upper())) + logger.addHandler (ch) + return logger + +def padBoth (s, length, left, right = None): + if right is None: right = left + padlen = length - len (s) + if padlen%2 == 1: + llen = (padlen - 1)/2 + rlen = (padlen + 1)/2 + else: + llen = rlen = padlen/2 + lstr = (left * (llen/len (left)))[:llen] + rstr = (right * (rlen/len(right)))[:rlen] + return lstr + s + rstr \ No newline at end of file diff --git a/pyppl/pyppl.py b/pyppl/pyppl.py index 651cd7c6..a24b5d33 100644 --- a/pyppl/pyppl.py +++ b/pyppl/pyppl.py @@ -1,10 +1,21 @@ import logging, os, sys, random, json, copy from helpers import * from runners import * -VERSION = "0.3.0" +VERSION = "0.4.0" class pyppl (object): + tips = [ + "You can find the stdout in /scripts/script..stdout", + "You can find the stderr in /scripts/script..stderr", + "You can find the script in /scripts/script.", + "Check documentation at: https://github.com/pwwang/pyppl/blob/master/docs/DOCS.md", + "You cannot have two processes with same id(variable name) and tag", + "beforeCmd and afterCmd only run locally", + "If 'workdir' is not set for a process, it will be PyPPL... under default ", + "The default will be './workdir'", + ] + def __init__(self, config = {}, cfile = None): cfile = os.path.join (os.path.expanduser('~'), ".pyppl") if cfile is None else cfile @@ -18,30 +29,15 @@ def __init__(self, config = {}, cfile = None): if config.has_key('loglevel'): loglevel = config['loglevel'] del config['loglevel'] - ch = logging.StreamHandler() - ch.setFormatter (logging.Formatter("[%(asctime)-15s] %(message)s")) - logger = logging.getLogger ('PyPPL') - logger.setLevel (getattr(logging, loglevel.upper())) - logger.addHandler (ch) - - tips = [ - "You can find the stdout in /scripts/script..stdout", - "You can find the stderr in /scripts/script..stderr", - "You can find the script in /scripts/script.", - "Check documentation at: https://github.com/pwwang/pyppl/blob/master/docs/DOCS.md", - "You cannot have two processes with same id(variable name) and tag", - "beforeCmd and afterCmd only run locally", - "If 'workdir' is not set for a process, it will be PyPPL... under default " - ] - logger.info ('[ PyPPL] Version: %s' % (VERSION)) - logger.info ('[ TIPS] %s' % (random.choice(tips))) + suffix = utils.randstr () + self.logger = utils.getLogger (loglevel, self.__class__.__name__ + suffix) + self.logger.info ('[ PyPPL] Version: %s' % (VERSION)) + self.logger.info ('[ TIPS] %s' % (random.choice(pyppl.tips))) if os.path.exists (cfile): - logger.info ('[ CONFIG] Read from %s' % cfile) + self.logger.info ('[ CONFIG] Read from %s' % cfile) - self.logger = logger self.config = config self.heads = [] - #print config, 'config--------' def starts (self, *arg): for pa in arg: @@ -61,11 +57,9 @@ def starts (self, *arg): def run (self, profile = 'local'): config = {} if self.config.has_key('proc'): - #config.update(self.config['proc']) utils.dictUpdate(config, self.config['proc']) if self.config.has_key(profile): - #config.update(self.config[profile]) utils.dictUpdate(config, self.config[profile]) if not config.has_key('runner'): @@ -75,17 +69,13 @@ def run (self, profile = 'local'): finished = [] while next2run: - #print [x.id for x in next2run] next2run2 = [] for p in next2run: - #print hex(id(p.nexts[0])), p.nexts[0].id, 'changed' - #print config, 'xxxxxxxxxxxxxxxxxxxxx' p.setLogger(self.logger) p.run (config) finished.append (p) next2run2 += p.props['nexts'] next2run = [n for n in list(set(next2run2)) if n not in finished and all(x in finished for x in n.props['depends'])] - #next2run = list(set(next2run2)) # unique self.logger.info ('[ DONE]') diff --git a/pyppl/runners/runner_local.py b/pyppl/runners/runner_local.py index 97f16046..b9273e96 100644 --- a/pyppl/runners/runner_local.py +++ b/pyppl/runners/runner_local.py @@ -7,39 +7,18 @@ import os, stat, sys, logging from subprocess import Popen, PIPE from time import sleep +from ..helpers import utils class runner_local (object): - def __init__ (self, script, config = {}): - self.index = script.split('.')[-1] - self.script = runner_local.chmod_x(script) - self.outfile = script + '.stdout' - self.errfile = script + '.stderr' - self.rcfile = script + '.rc' + def __init__ (self, job, config = {}): + self.job = job + self.script = utils.chmodX(self.job.script) self.ntry = 0 self.config = config self.p = None self.outp = 0 self.errp = 0 - - @staticmethod - def chmod_x (thefile): - thefile = os.path.realpath(thefile) - ret = [thefile] - try: - st = os.stat (thefile) - os.chmod (thefile, st.st_mode | stat.S_IEXEC) - except: - try: - shebang = '' - with open (thefile, 'r') as f: - shebang = f.read().strip().split("\n")[0] - if not shebang.startswith("#!"): - raise Exception() - ret = shebang[2:].strip().split() + [thefile] - except Exception as e: - raise Exception("Cannot change %s as executable or read the shebang from it." % thefile) - return ret def _config (self, key, default = None): if '.' in key: @@ -57,22 +36,20 @@ def _config (self, key, default = None): return self.config[key] def submit (self): - if os.path.exists(self.rcfile): - os.remove(self.rcfile) + if os.path.exists(self.job.rcfile): + os.remove(self.job.rcfile) try: self.p = Popen (self.script, stdin=PIPE, stderr=PIPE, stdout=PIPE, close_fds=True) except Exception as ex: - open (self.errfile, 'w').write(str(ex)) - open (self.rcfile, 'w').write('-1') # not able to submit - # don't retry if failed to submit - sleep (0.1) + open (self.job.errfile, 'w').write(str(ex)) + open (self.job.rcfile, 'w').write('-1') # not able to submit def wait (self): - if self.rc() == -1: return + if self.job.rc() == -1: return while self.p is None: sleep (1) - open (self.rcfile, 'w').write(str(self.p.wait())) - with open (self.outfile, 'w') as fout, open(self.errfile, 'w') as ferr: + open (self.job.rcfile, 'w').write(str(self.p.wait())) + with open (self.job.outfile, 'w') as fout, open(self.job.errfile, 'w') as ferr: for line in iter(self.p.stderr.readline, ''): ferr.write(line) if self._config('echo', False): @@ -100,17 +77,9 @@ def retry (self): self.submit() self.wait() - def rc (self): - if not os.path.exists (self.rcfile): - return -99 - rccodestr = '' - with open (self.rcfile, 'r') as f: - rccodestr = f.read().strip() - - return -99 if rccodestr == '' else int(rccodestr) def isValid (self): - return self.rc () in self._config('retcodes', [0]) + return self.job.rc () in self._config('retcodes', [0]) def flushFile (self, fn = 'stdout'): diff --git a/pyppl/runners/runner_sge.py b/pyppl/runners/runner_sge.py index 1aaab55f..f924293f 100644 --- a/pyppl/runners/runner_sge.py +++ b/pyppl/runners/runner_sge.py @@ -6,10 +6,10 @@ class runner_sge (runner_local): - def __init__ (self, script, config = {}): - super(runner_sge, self).__init__(script, config) + def __init__ (self, job, config = {}): + super(runner_sge, self).__init__(job, config) # construct an sge script - sgefile = os.path.realpath(script + '.sge') + sgefile = os.path.realpath(self.job.script + '.sge') sgesrc = [ '#!/usr/bin/env bash', @@ -18,10 +18,15 @@ def __init__ (self, script, config = {}): #'#$ -e ' + self.errfile, #'#$ -cwd' ] + defaultName = '%s_%s.%s' % ( + self._config('id'), + self._config('tag'), + self.job.index + ) conf = copy.copy (self._config ('sgeRunner', {})) if not conf.has_key ('sge_N'): - sgesrc.append('#$ -N %s_%s.%s' % (self._config('id', os.path.basename (script) [:-len(self.index)-1]), self._config('tag', 'notag'), self.index)) # + self._config('id', os.path.basename (script)) + '.' + self._config('tag', 'notag')) + sgesrc.append('#$ -N %s' % defaultName) else: sgesrc.append('#$ -N %s' % conf['sge_N']) del conf['sge_N'] @@ -38,13 +43,13 @@ def __init__ (self, script, config = {}): sgesrc.append('#$ -o %s' % conf['sge_o']) del conf['sge_o'] else: - sgesrc.append('#$ -o %s' % self.outfile) + sgesrc.append('#$ -o %s' % self.job.outfile) if conf.has_key('sge_e'): sgesrc.append('#$ -e %s' % conf['sge_e']) del conf['sge_e'] else: - sgesrc.append('#$ -e %s' % self.errfile) + sgesrc.append('#$ -e %s' % self.job.errfile) sgesrc.append('#$ -cwd') @@ -65,38 +70,35 @@ def __init__ (self, script, config = {}): src += ' ' + str(v) sgesrc.append(src) sgesrc.append ('') - sgesrc.append ('trap "status=\$?; echo \$status > %s; exit \$status" 1 2 3 6 7 8 9 10 11 12 15 16 17 EXIT' % self.rcfile) + sgesrc.append ('trap "status=\$?; echo \$status > %s; exit \$status" 1 2 3 6 7 8 9 10 11 12 15 16 17 EXIT' % self.job.rcfile) sgesrc.append (self._config('sgeRunner.preScript', '')) sgesrc.append ('') sgesrc.append (list2cmdline(self.script)) sgesrc.append (self._config('sgeRunner.postScript', '')) - with open (sgefile, 'w') as f: - f.write ('\n'.join(sgesrc) + '\n') + open (sgefile, 'w').write ('\n'.join(sgesrc) + '\n') self.script = ['qsub', sgefile] def submit (self): - if os.path.exists(self.rcfile): - os.remove(self.rcfile) + if os.path.exists(self.job.rcfile): + os.remove(self.job.rcfile) try: self.p = Popen (self.script) rc = self.p.wait() if rc != 0: - open (self.errfile, 'w').write('Failed to submit job: %s.%s#%s' % (self._config('id'), self._config('tag'), self.index)) - open (self.rcfile, 'w').write('-1') + open (self.job.errfile, 'w').write('Failed to submit job: %s.%s#%s' % (self._config('id'), self._config('tag'), self.index)) + open (self.job.rcfile, 'w').write('-1') except Exception as ex: - open (self.errfile, 'w').write(str(ex)) - open (self.rcfile, 'w').write('-1') # not able to submit - # don't retry if failed to submit - sleep (0.1) + open (self.job.errfile, 'w').write(str(ex)) + open (self.job.rcfile, 'w').write('-1') # not able to submit def wait(self): - if self.rc() == -1: return + if self.job.rc() == -1: return while self.p is None: sleep (1) - while self.rc() == -99: + while self.job.rc() == -99: if self._config('echo', False): self.flushFile('stdout') self.flushFile('stderr') diff --git a/pyppl/runners/runner_ssh.py b/pyppl/runners/runner_ssh.py index 83da2fec..584d7d3f 100644 --- a/pyppl/runners/runner_ssh.py +++ b/pyppl/runners/runner_ssh.py @@ -2,16 +2,16 @@ from time import sleep from subprocess import Popen, list2cmdline import os, shlex, random, logging - +from ..helpers import utils class runner_ssh (runner_local): serverid = 0 - def __init__ (self, script, config = {}): - super(runner_ssh, self).__init__(script, config) + def __init__ (self, job, config = {}): + super(runner_ssh, self).__init__(job, config) # construct an ssh cmd - sshfile = script + '.ssh' + sshfile = os.path.realpath(self.job.script + '.ssh') servers = self._config('sshRunner.servers') if not servers: @@ -20,15 +20,14 @@ def __init__ (self, script, config = {}): sshsrc = [ '#!/usr/bin/env bash', '' - 'trap "status=\$?; echo \$status > %s; exit \$status" 1 2 3 6 7 8 9 10 11 12 15 16 17 EXIT' % self.rcfile, + 'trap "status=\$?; echo \$status > %s; exit \$status" 1 2 3 6 7 8 9 10 11 12 15 16 17 EXIT' % self.job.rcfile, 'ssh %s "cd %s; %s"' % (servers[serverid], os.getcwd(), list2cmdline(self.script)) ] runner_ssh.serverid += 1 - with open (sshfile, 'w') as f: - f.write ('\n'.join(sshsrc) + '\n') + open (sshfile, 'w').write ('\n'.join(sshsrc) + '\n') - self.script = self.chmod_x(sshfile) + self.script = utils.chmodX(sshfile) diff --git a/tests/channel.unittest.py b/tests/channel.unittest.py index 8b8bbd00..2b115ea9 100644 --- a/tests/channel.unittest.py +++ b/tests/channel.unittest.py @@ -1,4 +1,4 @@ -import sys, unittest, os +import sys, unittest, os, glob rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, rootdir) from pyppl import channel @@ -17,14 +17,7 @@ def testCreate (self): def testFromPath (self): c = channel.fromPath (os.path.join(rootdir, 'tests', '*.py')) - self.assertEqual (sorted(c), sorted([ - (os.path.join (rootdir, 'tests', 'aggr.unittest.py'),), - (os.path.join (rootdir, 'tests', 'runner.unittest.py'),), - (os.path.join (rootdir, 'tests', 'pyppl.unittest.py'),), - (os.path.join (rootdir, 'tests', 'proc.unittest.py'),), - (os.path.join (rootdir, 'tests', 'utils.unittest.py'),), - (os.path.join (rootdir, 'tests', 'channel.unittest.py'),), - ])) + self.assertEqual (sorted(c), sorted(map(channel._tuplize, glob.glob(os.path.join(rootdir, 'tests', '*.py'))))) def testFromPairs (self): files = [ @@ -52,15 +45,17 @@ def testFromPairs (self): def testFromArgv (self): sys.argv = ["0", "11", "22", "33", "44"] - c = channel.fromArgv(None) - self.assertEqual (c, [("11", "22", "33", "44")]) - c = channel.fromArgv(2) + c = channel.fromArgv() + self.assertEqual (c, [("11",), ("22",), ("33",), ("44",)]) + sys.argv = ["0", "11,22", "33,44"] + c = channel.fromArgv() self.assertEqual (c, [("11", "22"), ("33", "44")]) - self.assertRaises (Exception, channel.fromArgv, 3) + sys.argv = ["0", "11,22", "33"] + self.assertRaises (ValueError, channel.fromArgv) def testFromChannels (self): c1 = channel.create([("abc", "def"), ("ghi", "opq")]) - c2 = channel([("abc", "def"), ("ghi", "opq")]) + c2 = channel.create([("abc", "def"), ("ghi", "opq")]) c3 = channel.fromChannels (c1, c2) self.assertEqual (c3, [("abc", "def", "abc", "def"), ("ghi", "opq", "ghi", "opq")]) @@ -120,25 +115,12 @@ def testMerge(self): def testSplit(self): - c2 = channel.create (["abc", "def", "ghi", "opq"]) - c3 = channel.create(["1", '2', '3', '4']) - c4 = channel() - c4.merge(c2, c3) - self.assertEqual (c4, [("abc", "1"), ("def", '2'), ("ghi", '3'), ("opq", '4')]) - - c5 = [5,6,7,8] - self.assertEqual (c4.copy().merge(c5), [("abc", "1", 5), ("def", '2', 6), ("ghi", '3', 7), ("opq", '4', 8)]) - c4.merge(c5) - self.assertEqual (c4, [("abc", "1", 5), ("def", '2', 6), ("ghi", '3', 7), ("opq", '4', 8)]) + c4 = channel.create( [("abc", "1", 5), ("def", '2', 6), ("ghi", '3', 7), ("opq", '4', 8)] ) c6,c7,c8 = c4.split() self.assertEqual (c6, [("abc",), ("def",), ("ghi",), ("opq",)]) self.assertEqual (c7, [("1",), ('2',), ('3',), ('4',)]) self.assertEqual (c8, [(5,), (6,), (7,), (8,)]) - c12, c9, c10 = c2.copy().merge(c3, c5).split() - self.assertEqual (c12, c6) - self.assertEqual (c9, c7) - self.assertEqual (c10, c8) c11 = channel.create ([("abc",), ("def",), ("ghi",), ("opq",)]) self.assertEqual (c11.split(), [c11]) @@ -203,13 +185,39 @@ def testInsert (self): ret = [(1, x) for x in sorted(glob.glob("./*.py"))] self.assertEqual (sorted(ch1), ret) + #print channel.create(ret).insert(None, [1]) ch1.insert (None, [1]) ret = [(1, x, 1) for x in sorted(glob.glob("./*.py"))] self.assertEqual (sorted(ch1), ret) - ch1.insert (None, range(len(ch1))) + ch1.insert (None, range(ch1.length())) ret = [(1, x, 1, i) for i,x in enumerate(sorted(glob.glob("./*.py")))] self.assertEqual (sorted(ch1), ret) + def testCbind (self): + + chan = channel.create ([1,2,3,4,5]) + col1 = [2,4,6,8,10] + chan.cbind (col1) + self.assertEqual (chan, [(1,2), (2,4), (3,6), (4,8), (5,10)]) + col2 = [5,4,3,2,1] + chan.cbindMany (col1, col2) + self.assertEqual (chan, [(1,2,2,5), (2,4,4,4), (3,6,6,3), (4,8,8,2), (5,10,10,1)]) + chan.cbind(0) + self.assertEqual (chan, [(1,2,2,5,0), (2,4,4,4,0), (3,6,6,3,0), (4,8,8,2,0), (5,10,10,1,0)]) + + self.assertEqual (channel.create([(),(),(),(),()]).cbind(1), [(1,), (1,), (1,), (1,), (1,)]) + + def testSlice (self): + chan = channel.create([(1,2,2,5), (2,4,4,4), (3,6,6,3), (4,8,8,2), (5,10,10,1)]) + self.assertEqual (chan.slice(0,0), []) + self.assertEqual (chan.slice(0,1), [(1,),(2,),(3,),(4,),(5,)]) + self.assertEqual (chan.slice(0), chan) + self.assertEqual (chan.slice(2), [(2,5),(4,4),(6,3),(8,2),(10,1)]) + self.assertEqual (chan.slice(-2), [(2,5),(4,4),(6,3),(8,2),(10,1)]) + self.assertEqual (chan.colAt(-2), [(2,),(4,),(6,),(8,),(10,)]) + + + if __name__ == '__main__': unittest.main() diff --git a/tests/job.unittest.py b/tests/job.unittest.py new file mode 100644 index 00000000..5f2f13af --- /dev/null +++ b/tests/job.unittest.py @@ -0,0 +1,116 @@ +import unittest +import sys +import os, shutil, logging +logging.basicConfig (level = logging.DEBUG) +from time import sleep +rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) +sys.path.insert(0, rootdir) +from pyppl import utils, job + +class TestJob (unittest.TestCase): + + def testInit (self): + j = job (0, "./") + self.assertEqual (j.script, "./scripts/script.0") + self.assertEqual (j.rcfile, "./scripts/script.0.rc") + self.assertEqual (j.outfile, "./scripts/script.0.stdout") + self.assertEqual (j.errfile, "./scripts/script.0.stderr") + self.assertEqual (j.input, {'var':[], 'file':[], 'files':[]}) + self.assertEqual (j.output, {'var':[], 'file':[]}) + self.assertEqual (j.index, 0) + + def testSignature (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/input") + os.makedirs ("./test/output") + os.makedirs ("./test/scripts") + j = job (0, "./test/") + open (j.script, 'w').write('') + sig = j.signature() + sleep (.1) + open (j.script, 'w').write('') + self.assertNotEqual (sig, j.signature()) + shutil.rmtree ("./test/") + + def testRc (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/input") + os.makedirs ("./test/output") + os.makedirs ("./test/scripts") + j = job (0, "./test/") + self.assertEqual (j.rc(), -99) + open (j.rcfile, 'w').write('') + self.assertEqual (j.rc(), -99) + open (j.rcfile, 'w').write('140') + self.assertEqual (j.rc(), 140) + + shutil.rmtree ("./test/") + + def testOutfileGenerated (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/input") + os.makedirs ("./test/output") + os.makedirs ("./test/scripts") + j2 = job (1, "./test/") + j2.output['file'].append ("./test/output/out.txt") + self.assertRaisesRegexp(Exception, "Output file", j2.outfileGenerated) + open ("./test/output/out.txt", 'w').write('') + j2.outfileGenerated() + shutil.rmtree ("./test/") + + + def testExport (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + + logger = logging.getLogger() + def log (a, b="", c=""): + logger.info ("%s %s %s" % (a,b,c)) + os.makedirs ("./test/input") + os.makedirs ("./test/output") + os.makedirs ("./test/scripts") + j3 = job (1, "./test/") + j3.output['file'].append ("./test/output/out.txt") + open ("./test/output/out.txt", 'w').write('') + + + self.assertRaisesRegexp(ValueError, "Unable to use export cache", j3.exportCached, "", "symlink", log) + self.assertRaisesRegexp(ValueError, "Output files not exported", j3.exportCached, "", "copy", log) + + e = j3.exportCached ("./test/", "copy", log) + self.assertFalse (e) + + j3.export ("./test/", "copy", True, log) + self.assertTrue (os.path.exists ("./test/out.txt")) + e = j3.exportCached ("./test/", "copy", log) + self.assertTrue (e) + self.assertTrue (os.path.islink ("./test/output/out.txt")) + + shutil.rmtree ("./test/") + os.makedirs ("./test/input") + os.makedirs ("./test/output") + os.makedirs ("./test/scripts") + j4 = job (4, "./test/") + j4.output['file'].append ("./test/output/out.txt") + j4.output['file'].append ("./test/output/outdir") + open ("./test/output/out.txt", 'w').write('') + os.makedirs ("./test/output/outdir") + + j4.export ("./test/", "gzip", True, log) + + self.assertTrue (os.path.exists ("./test/out.txt.gz")) + self.assertTrue (os.path.exists ("./test/outdir.tgz")) + + e = j4.exportCached("./test/", "gzip", log) + self.assertTrue(e) + self.assertTrue (os.path.isfile("./test/output/out.txt")) + self.assertTrue (os.path.isdir("./test/output/outdir")) + + + shutil.rmtree ("./test/") + +if __name__ == '__main__': + unittest.main() diff --git a/tests/proc.unittest.py b/tests/proc.unittest.py index 13bd6c37..dcc6704a 100644 --- a/tests/proc.unittest.py +++ b/tests/proc.unittest.py @@ -1,7 +1,7 @@ import os, sys, unittest, pickle, shutil, copy rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, rootdir) -from pyppl import proc +from pyppl import proc, aggr from pyppl import channel, utils from md5 import md5 from StringIO import StringIO @@ -37,19 +37,25 @@ def testSuffix (self): p = proc ('tag_unique') config = copy.copy(p.config) del config['workdir'] - - if config.has_key ('callback'): - config['callback'] = utils.funcSig(config['callback']) - - if config.has_key ('callfront'): - config['callfront'] = utils.funcSig(config['callfront']) - + # proc is not picklable + if config.has_key('depends'): + depends = config['depends'] + pickable_depends = [] + if isinstance(depends, proc): + depends = [depends] + elif isinstance(depends, aggr): + depends = depends.procs + for depend in depends: + pickable_depends.append(depend.id + '.' + depend.tag) + config['depends'] = pickable_depends + if config.has_key ('input') and isinstance(config['input'], dict): config['input'] = copy.deepcopy(config['input']) for key, val in config['input'].iteritems(): config['input'][key] = utils.funcSig(val) if callable(val) else val - signature = pickle.dumps (config) + '@' + pickle.dumps(sorted(sys.argv)) - self.assertEqual (p._suffix(), utils.uid(signature)) + + signature = pickle.dumps (config) + self.assertEqual (len(p._suffix()), 8) def testInit (self): p = proc ('tag') @@ -82,7 +88,7 @@ def testInit (self): def testSetattr (self): p = proc ('tag') - self.assertRaises (AttributeError, p.__setattr__, 'a', 1) + self.assertRaises (ValueError, p.__setattr__, 'a', 1) p. tag = 'notag' self.assertEqual (p.tag, 'notag') self.assertEqual (p.config['tag'], 'notag') @@ -120,10 +126,35 @@ def testIscached (self): except: pass self.assertFalse (p._isCached()) - with open (cachefile, 'w') as f: - pickle.dump(config, f) + + p.input = {"a": [1,2,3,4,5]} + #p._tidyBeforeRun() + p.run() self.assertTrue (p._isCached()) - self.assertEqual (p.forks, 100) + + open (p.workdir + '/scripts/script.3', 'w').write('') + self.assertFalse (p._isCached()) + self.assertEqual (p.ncjobids, [3]) + + def testExportCache (self): + p = proc ('ec') + p.cache = 'export' + p.input = {"a": [1,2,3,4,5]} + p.output = "outfile:file:{{a}}.txt" + p._buildProps() + p._buildInput() + p._buildOutput() + exdir = "./test/" + p.exdir = exdir + if not os.path.isdir (exdir): + os.makedirs (exdir) + for i in [1,2,4,5]: + open (exdir + "%s.txt" % i, 'w').write ('') + self.assertFalse (p._isCached()) + self.assertEqual (p.ncjobids, [2]) # a==3 + shutil.rmtree (exdir) + + def testBuildProps (self): p1 = proc ('tag1') @@ -222,9 +253,9 @@ def testInputFiles (self): }) def testBuildOutput (self): - c1 = channel (["aa", "bb"]) - c2 = channel ([1, 2]) - c3 = channel (sorted(["channel.unittest.py", "proc.unittest.py"])) + c1 = channel.create (["aa", "bb"]) + c2 = channel.create ([1, 2]) + c3 = channel.create (sorted(["channel.unittest.py", "proc.unittest.py"])) pOP = proc() pOP.input = {'c1': c1, 'c2': c2, 'c3:file': c3} @@ -240,7 +271,6 @@ def testBuildOutput (self): ('aa', '1.0', os.path.join(pOP.outdir, "channel.unittest2.py")), ('bb', '4.0', os.path.join(pOP.outdir, "proc.unittest2.py")) ]) - self.assertEqual (pOP.outfiles, pOP.output['o3']) pOP.props['channel'] = channel() pOP.props['outfiles'] = [] @@ -248,7 +278,8 @@ def testBuildOutput (self): pOP._buildProps() pOP._buildInput() pOP._buildOutput() - self.assertEqual (pOP.output['__out1__'], ['aa', 'bb']) + print pOP.output + self.assertEqual (pOP.output['__out.1__'], ['aa', 'bb']) self.assertEqual (pOP.output['o2'], ['1.0', '4.0']) self.assertEqual (pOP.output['o3'], [os.path.join(pOP.outdir, "channel.unittest2.py"), os.path.join(pOP.outdir, "proc.unittest2.py")]) @@ -256,43 +287,35 @@ def testBuildOutput (self): ('aa', '1.0', os.path.join(pOP.outdir, "channel.unittest2.py")), ('bb', '4.0', os.path.join(pOP.outdir, "proc.unittest2.py")) ]) - self.assertEqual (pOP.outfiles, pOP.output['o3']) pOP.props['channel'] = channel() pOP.props['outfiles'] = [] pOP.forks = 5 - c1 = channel() - c2 = channel() - d = [("cc:{{c1}}", c1), ("var:{{c2 | __import__('math').pow(float(_), 2.0)}}, file:{{c3.fn}}{{proc.forks}}{{c3.ext}}", c2)] + + d = ["cc:{{c1}}", "var:{{c2 | __import__('math').pow(float(_), 2.0)}}, file:{{c3.fn}}{{proc.forks}}{{c3.ext}}"] pOP.output = d pOP._buildProps() pOP._buildInput() pOP._buildOutput() self.assertEqual (pOP.output['cc'], ['aa', 'bb']) - self.assertEqual (pOP.output['__out1__'], ['1.0', '4.0']) - self.assertEqual (pOP.output['__out2__'], [os.path.join(pOP.outdir, "channel.unittest5.py"), os.path.join(pOP.outdir, "proc.unittest5.py")]) + self.assertEqual (pOP.output['__out.1__'], ['1.0', '4.0']) + self.assertEqual (pOP.output['__out.2__'], [os.path.join(pOP.outdir, "channel.unittest5.py"), os.path.join(pOP.outdir, "proc.unittest5.py")]) - self.assertEqual (c1, [('aa',), ('bb', )]) - self.assertEqual (c2, [ - ('1.0', os.path.join(pOP.outdir, "channel.unittest5.py")), - ('4.0', os.path.join(pOP.outdir, "proc.unittest5.py")) - ]) self.assertEqual (pOP.channel, [ ('aa', '1.0', os.path.join(pOP.outdir, "channel.unittest5.py")), ('bb', '4.0', os.path.join(pOP.outdir, "proc.unittest5.py")) ]) - self.assertEqual (pOP.outfiles, pOP.output['__out2__']) def testBuildScript(self): ps = proc ('script') - self.assertRaises (Exception, ps._tidyBeforeRun) + # empty script does not raise Exception any more + #self.assertRaises (Exception, ps._tidyBeforeRun) ps.input = {"input": ["input"]} ps.script = "ls" - scriptdir = os.path.join (ps.workdir, 'scripts') - if os.path.exists (scriptdir): - shutil.rmtree (scriptdir) + ps._tidyBeforeRun() + scriptdir = os.path.join (ps.workdir, 'scripts') self.assertTrue (os.path.exists(scriptdir)) ps.script = "template:" + __file__ + '_notexist' @@ -305,8 +328,8 @@ def testBuildScript(self): ps.props['jobs'] = [] ps._tidyBeforeRun () - self.assertEqual (ps.jobs, [os.path.join(scriptdir, 'script.0')]) - self.assertTrue (open(ps.jobs[0]).read().startswith("#!/usr/bin/env bash")) + self.assertEqual (map(lambda x: x.script, ps.jobs), [os.path.join(scriptdir, 'script.0')]) + self.assertTrue (open(ps.jobs[0].script).read().startswith("#!/usr/bin/env bash")) os.remove (tplfile) with open (tplfile, 'w') as f: @@ -315,8 +338,8 @@ def testBuildScript(self): ps.props['jobs'] = [] ps._tidyBeforeRun () - self.assertEqual (ps.jobs, [os.path.join(scriptdir, 'script.0')]) - self.assertTrue (open(ps.jobs[0]).read().startswith("#!/usr/bin/env bash")) + self.assertEqual (map(lambda x: x.script, ps.jobs), [os.path.join(scriptdir, 'script.0')]) + self.assertTrue (open(ps.jobs[0].script).read().startswith("#!/usr/bin/env bash")) os.remove (tplfile) ps.output = "output:var:{{input}}2" @@ -324,8 +347,8 @@ def testBuildScript(self): ps.script = "ls {{proc.workdir}}\necho {{#}} {{input}}\necho {{output}}\necho {{proc.args.var1}} {{proc.args.var2}}" ps.props['jobs'] = [] ps._tidyBeforeRun () - self.assertEqual (ps.jobs, [os.path.join(scriptdir, 'script.0')]) - self.assertEqual (open(ps.jobs[0]).read(), "#!/usr/bin/env bash\n\nls %s\necho 0 input\necho input2\necho 1 2" % ps.workdir) + self.assertEqual (map(lambda x: x.script, ps.jobs), [os.path.join(scriptdir, 'script.0')]) + self.assertEqual (open(ps.jobs[0].script).read(), "#!/usr/bin/env bash\n\nls %s\necho 0 input\necho input2\necho 1 2" % ps.workdir) @@ -344,29 +367,16 @@ def testRunCmd (self): self.assertEqual (prc._runCmd('beforeCmd'), 0) self.assertEqual (prc._runCmd('afterCmd'), 1) - saved_stdout = sys.stdout - try: - out = StringIO() - sys.stdout = out - prc.beforeCmd = 'ls' - prc.echo = True - prc._tidyBeforeRun () - self.assertEqual (prc._runCmd('beforeCmd'), 0) - self.assertTrue ("proc.unittest.py" in out.getvalue()) - finally: - sys.stdout = saved_stdout - - saved_stderr = sys.stderr - try: - out = StringIO() - sys.stderr = out - prc.afterCmd = 'bash -c "echo 2 >&2; exit 1"' - prc.echo = False # anyway print stderr - prc._tidyBeforeRun () - self.assertEqual (prc._runCmd('afterCmd'), 1) - self.assertTrue ("2" in out.getvalue()) - finally: - sys.stderr = saved_stderr + prc.beforeCmd = 'ls' + prc.echo = True + prc._tidyBeforeRun () + self.assertEqual (prc._runCmd('beforeCmd'), 0) + + + prc.afterCmd = 'bash -c "echo 2 >&2; exit 1"' + prc.echo = False # anyway print stderr + prc._tidyBeforeRun () + self.assertEqual (prc._runCmd('afterCmd'), 1) def testRunJobs (self): pr = proc() @@ -391,17 +401,25 @@ def testExport (self): p.input = {'infile:file': channel.fromPath ("*.py")} p.output = 'outfile:file:{{infile.fn}}2{{infile.ext}}, var:{{infile.fn}}2{{infile.ext}}' - p.exportdir = rootdir + testdir = "./test/" + if not os.path.exists(testdir): + os.makedirs (testdir) + p.exdir = testdir p._tidyBeforeRun () self.assertEqual (p.forks, 1) p._runJobs() p._export() - for (_, bn) in p.channel: - exfile = os.path.join (rootdir, bn) - self.assertTrue (os.path.exists (exfile)) - os.remove(exfile) + for outfile in p.output['outfile']: + self.assertTrue (os.path.exists (os.path.join(testdir, os.path.basename(outfile)))) + + p.exhow = 'gzip' + p._export() + for outfile in p.output['outfile']: + self.assertTrue (os.path.exists (os.path.join(testdir, os.path.basename(outfile) + '.gz'))) + + shutil.rmtree(testdir) def testCheckStatus (self): p = proc('cs') @@ -425,14 +443,12 @@ def testDoCache (self): p._readConfig({}) cachefile = os.path.join (p.tmpdir, p.cachefile) if os.path.exists (cachefile): - self.assertTrue (p._isCached()) - #os.utime (p.infiles[0], None) - #self.assertFalse (p._isCached()) - else: - self.assertFalse (p._isCached()) - p._tidyBeforeRun() - p._runJobs() - p._tidyAfterRun() + os.remove (cachefile) + self.assertFalse (p._isCached()) + p._tidyBeforeRun() + p._runJobs() + p._tidyAfterRun() + self.assertTrue (p._isCached()) def testCopy (self): p = proc('copy') @@ -446,8 +462,20 @@ def testCopy (self): self.assertEqual (pCopy.exportdir, rootdir) self.assertEqual (pCopy.script, p.script) - - + def testAlias (self): + p = proc ('alias') + p.input = {'a':[1]} + testv = {} + for k,v in proc.alias.iteritems(): + testv[v] = utils.randstr() + if v == 'retcodes': testv[v] = [0,1,2] + p.__setattr__ (k, testv[v]) + p._tidyBeforeRun() + for k,v in proc.alias.iteritems(): + val1 = p.__getattr__(k) + val2 = p.__getattr__(v) + self.assertEqual (val1, testv[v]) + self.assertEqual (val2, testv[v]) if __name__ == '__main__': unittest.main() diff --git a/tests/pyppl.unittest.py b/tests/pyppl.unittest.py index 77c36b19..e88bdd80 100644 --- a/tests/pyppl.unittest.py +++ b/tests/pyppl.unittest.py @@ -9,7 +9,7 @@ from pyppl import aggr class TestPipelineMethods (unittest.TestCase): - + def test_init (self): ppl = pyppl({}, '') @@ -18,13 +18,14 @@ def test_init (self): self.assertEqual (ppl.heads, []) def test_factory (self): + ppl = pyppl ({'proc' : {'tmpdir': '/local2/tmp/m161047/abc'}}) p1 = proc('TAG') self.assertTrue (isinstance (p1, proc)) self.assertEqual (p1.tag, 'TAG') - - inch = channel(['a', 'b', 'c']) + + inch = channel.create(['a', 'b', 'c']) p1.tag = 'CREATE_FILE' p1.input = {'input':inch} p1.script = "echo {{input}} > {{outfile}}" @@ -34,26 +35,21 @@ def test_factory (self): p2 = proc("MOVE_FILE") p2.input = "input, infile:file" p2.output = "outfile:file:{{infile.fn}}-2.txt" - p2.script = "mv {{infile}} {{outfile}}" + p2.script = "mv {{infile}} {{outfile}}; ln -s {{outfile}} {{infile}}" p2.depends = p1 - p2.exportdir = './' + p2.exportdir = './test/' p2.cache = False - + ppl.starts (p1) + ppl.run() - - self.assertTrue (os.path.exists('./a-2.txt')) - self.assertTrue (os.path.exists('./b-2.txt')) - self.assertTrue (os.path.exists('./c-2.txt')) - - os.remove ('./a-2.txt') - os.remove ('./b-2.txt') - os.remove ('./c-2.txt') - - self.assertFalse (os.path.exists('./a-2.txt')) - self.assertFalse (os.path.exists('./b-2.txt')) - self.assertFalse (os.path.exists('./c-2.txt')) - + + self.assertTrue (os.path.exists('./test/a-2.txt')) + self.assertTrue (os.path.exists('./test/b-2.txt')) + self.assertTrue (os.path.exists('./test/c-2.txt')) + + shutil.rmtree('./test') + def test_dot (self): self.maxDiff = None ppl = pyppl () @@ -67,7 +63,7 @@ def test_dot (self): p8 = proc("H") p9 = proc("I") p1.script = "echo 1" - p1.input = {"input": channel(['a'])} + p1.input = {"input": channel.create(['a'])} p1.output = "{{input}}" p2.script = "echo 1" p2.output = "{{input}}" @@ -127,7 +123,7 @@ def test_dot (self): "p4.D" [shape=box, style=filled, color="#f0f998", fontcolor=red] } """.split("\n"))) - + def test_multideps (self): ppl = pyppl () pr1 = proc("A") @@ -136,8 +132,8 @@ def test_multideps (self): p1ch = [('a',), ('b',), ('c',)] p2ch = [(1,), (2,), (3,)] - pr1.input = {'input': channel(p1ch)} - pr2.input = {'input': channel(p2ch)} + pr1.input = {'input': channel.create(p1ch)} + pr2.input = {'input': channel.create(p2ch)} pr1.output = '{{input}}' pr2.output = '{{input}}' pr3.input = 'in1, in2' @@ -161,15 +157,15 @@ def test_multideps (self): def test_sge (self): ppl = pyppl () p1 = proc () - p1.input = {"input": channel([('a')] * 10)} + p1.input = {"input": channel.create([('a')] * 10)} p1.workdir = './test-sge' p1.forks = 3 p1.script = "echo {input}" #ppl.add(p1).run('sge') - + def test_batchjobs (self): p = proc ('batch') - p.input = {'input': channel([5, 2, 5, 2, 5, 2])} + p.input = {'input': channel.create([5, 2, 5, 2, 5, 2])} p.script = "cat {{proc.workdir}}/scripts/script.{{#}}.ssh | grep franklin" p.echo = True p.cache = False @@ -187,17 +183,17 @@ def test_batchjobs (self): 'loglevel': 'debug' }).starts(p).run() shutil.rmtree ('./test_batchjobs') - + def testCallback (self): p1 = proc ('callback') p2 = proc ('callback') - + def callback2 (s): ch = channel.create ([('a1','b'), ('x', 'y')]) s.channel.merge (ch) - - sys.argv = [0, 1, 2] - p1.input = {"input": channel.fromArgv(1)} + argv = sys.argv[:] + sys.argv = ['0', '1', '2'] + p1.input = {"input": channel.fromArgv()} p1.output = "output:{{input}}2" p1.script = "echo {{output}}" p1.callback = callback2 @@ -206,27 +202,8 @@ def callback2 (s): p2.script = "echo {{output}}" p2.output = "output:{{input}}.{{in1}}.{{in2}}" pyppl ().starts(p1).run() - - def testCallfront (self): - p1 = proc ('callfront') - p2 = proc ('callfront') - - sys.argv = [0, 1, 2] - p1.input = {"input": channel.fromArgv(1)} - p1.output = "output:{{input}}2" - p1.script = "echo {{output}}" - - def callfront (s): - ch = channel.create ([('a1','b'), ('x', 'y')]) - p1.channel.merge(ch) - - p2.depends = p1 - p2.input = "input, in1, in2" - p2.script = "echo {{output}}" - p2.output = "output:{{input}}.{{in1}}.{{in2}}" - p2.callfront = callfront - pyppl ({'loglevel':'debug'}).starts(p1).run() - + sys.argv = argv[:] + def testAggr (self): pa = proc ('aggr') pb = proc ('aggr') diff --git a/tests/runner.unittest.py b/tests/runner.unittest.py index 926b85d6..f85e04f9 100644 --- a/tests/runner.unittest.py +++ b/tests/runner.unittest.py @@ -1,8 +1,8 @@ import unittest, os -import sys +import sys, shutil rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, rootdir) -from pyppl import runner_local, runner_sge, runner_ssh +from pyppl import runner_local, runner_sge, runner_ssh, utils, job def tryRemove (file): try: @@ -14,7 +14,9 @@ class TestRunner (unittest.TestCase): def setUp (self): super(TestRunner, self).setUp() - tmpdir = os.path.dirname(__file__) + tmpdir = os.path.join(os.path.dirname(__file__), 'test') + if not os.path.exists (os.path.join(tmpdir, 'scripts')): + os.makedirs (os.path.join(tmpdir, 'scripts')) self.scripts = [ os.path.join(tmpdir, 'runner1.py'), os.path.join(tmpdir, 'runner2.py'), @@ -37,6 +39,12 @@ def setUp (self): f.write ('print "2"\n') f.write ('sys.stderr.write("3")\n') f.write ('sys.exit(1)\n') + + self.jobs = [] + for i, script in enumerate(self.scripts): + if not os.path.exists(os.path.join(tmpdir, 'scripts', 'script.%s' % i)): + os.symlink (os.path.abspath(script), os.path.join(tmpdir, 'scripts', 'script.%s' % i)) + self.jobs.append (job (i, tmpdir)) self.configs = [ {}, @@ -47,127 +55,121 @@ def setUp (self): def tearDown (self): super(TestRunner, self).tearDown() - for script in self.scripts: - tryRemove (script) - tryRemove (script + '.rc') - tryRemove (script + '.stderr') - tryRemove (script + '.stdout') - tryRemove (script + '.sge') - tryRemove (script + '.ssh') + shutil.rmtree (os.path.join(os.path.dirname(__file__), 'test')) def testLocalInit (self): - for script in self.scripts[:2]: - r = runner_local(script) + for j in self.jobs[:2]: + r = runner_local(j) self.assertTrue (isinstance(r, runner_local)) - self.assertRaises (Exception, runner_local, self.scripts[3]) + self.assertRaises (Exception, runner_local, self.jobs[3]) def testSGEInit (self): - for script in self.scripts[:2]: + for script in self.jobs[:2]: r = runner_sge(script) self.assertTrue (isinstance(r, runner_sge)) - self.assertRaises (Exception, runner_sge, self.scripts[3]) + self.assertRaises (Exception, runner_sge, self.jobs[3]) def testChmodX (self): - self.assertEqual ([os.path.realpath(self.scripts[0])], runner_local.chmod_x(self.scripts[0])) - self.assertEqual ([os.path.realpath(self.scripts[1])], runner_local.chmod_x(self.scripts[1])) - self.assertEqual (['/usr/bin/python', self.scripts[2]], runner_local.chmod_x(self.scripts[2])) - self.assertRaises (Exception, runner_local.chmod_x, self.scripts[3]) + self.assertEqual ([os.path.realpath(self.scripts[0])], utils.chmodX(self.scripts[0])) + self.assertEqual ([os.path.realpath(self.scripts[1])], utils.chmodX(self.scripts[1])) + self.assertEqual (['/usr/bin/python', self.scripts[2]], utils.chmodX(self.scripts[2])) + self.assertRaises (Exception, utils.chmodX, self.scripts[3]) def testConfig (self): - r0 = runner_local(self.scripts[0], self.configs[0]) - r1 = runner_local(self.scripts[1], self.configs[1]) + r0 = runner_local(self.jobs[0], self.configs[0]) + r1 = runner_local(self.jobs[1], self.configs[1]) self.assertEqual (r0._config('retcodes'), None) self.assertEqual (r0._config('retcodes', [0]), [0]) self.assertEqual (r1._config('retcodes', [0]), [0, 1]) - r2 = runner_ssh(self.scripts[0], { + r2 = runner_ssh(self.jobs[0], { 'sshRunner': {'servers': ['franklin01']} }) self.assertEqual (r2._config('sshRunner.servers', ['franklin02']), ['franklin01']) def testLocalRun (self): - r0 = runner_local(self.scripts[0]) - r1 = runner_local(self.scripts[1]) - r2 = runner_local(self.scripts[4]) + r0 = runner_local(self.jobs[0]) + r1 = runner_local(self.jobs[1]) + r2 = runner_local(self.jobs[4]) r0.submit() r0.wait() - self.assertTrue (os.path.exists(r0.rcfile)) - self.assertTrue (os.path.exists(r0.outfile)) - self.assertTrue (os.path.exists(r0.errfile)) - self.assertEqual (r0.rc(), 0) - self.assertEqual (open(r0.outfile).read().strip(), '0') + self.assertTrue (os.path.exists(r0.job.rcfile)) + self.assertTrue (os.path.exists(r0.job.outfile)) + self.assertTrue (os.path.exists(r0.job.errfile)) + self.assertEqual (r0.job.rc(), 0) + self.assertEqual (open(r0.job.outfile).read().strip(), '0') self.assertTrue (r0.isValid()) r1.submit() r1.wait() - self.assertTrue (os.path.exists(r1.rcfile)) - self.assertTrue (os.path.exists(r1.outfile)) - self.assertTrue (os.path.exists(r1.errfile)) - self.assertEqual (r1.rc(), 0) - self.assertEqual (open(r1.outfile).read().strip(), '1\n2') + self.assertTrue (os.path.exists(r1.job.rcfile)) + self.assertTrue (os.path.exists(r1.job.outfile)) + self.assertTrue (os.path.exists(r1.job.errfile)) + self.assertEqual (r1.job.rc(), 0) + self.assertEqual (open(r1.job.outfile).read().strip(), '1\n2') self.assertTrue (r1.isValid()) r2.submit() r2.wait() - self.assertTrue (os.path.exists(r2.rcfile)) - self.assertTrue (os.path.exists(r2.outfile)) - self.assertTrue (os.path.exists(r2.errfile)) - self.assertEqual (r2.rc(), 1) - self.assertEqual (open(r2.outfile).read().strip(), '2') - self.assertEqual (open(r2.errfile).read().strip(), '3') + self.assertTrue (os.path.exists(r2.job.rcfile)) + self.assertTrue (os.path.exists(r2.job.outfile)) + self.assertTrue (os.path.exists(r2.job.errfile)) + self.assertEqual (r2.job.rc(), 1) + self.assertEqual (open(r2.job.outfile).read().strip(), '2') + self.assertEqual (open(r2.job.errfile).read().strip(), '3') self.assertFalse (r2.isValid()) def testSSHRun (self): - r0 = runner_ssh(self.scripts[0], { + r0 = runner_ssh(self.jobs[0], { 'sshRunner': {'servers': ['franklin01']} }) - r1 = runner_ssh(self.scripts[1], { + r1 = runner_ssh(self.jobs[1], { 'sshRunner': {'servers': ['franklin02']} }) - r2 = runner_ssh(self.scripts[4], { + r2 = runner_ssh(self.jobs[4], { 'sshRunner': {'servers': ['franklin03']} }) r0.submit() r0.wait() - self.assertTrue (os.path.exists(r0.rcfile)) - self.assertTrue (os.path.exists(r0.outfile)) - self.assertTrue (os.path.exists(r0.errfile)) - self.assertEqual (r0.rc(), 0) - self.assertEqual (open(r0.outfile).read().strip(), '0') + self.assertTrue (os.path.exists(r0.job.rcfile)) + self.assertTrue (os.path.exists(r0.job.outfile)) + self.assertTrue (os.path.exists(r0.job.errfile)) + self.assertEqual (r0.job.rc(), 0) + self.assertEqual (open(r0.job.outfile).read().strip(), '0') self.assertTrue (r0.isValid()) r1.submit() r1.wait() - self.assertTrue (os.path.exists(r1.rcfile)) - self.assertTrue (os.path.exists(r1.outfile)) - self.assertTrue (os.path.exists(r1.errfile)) - self.assertEqual (r1.rc(), 0) - self.assertEqual (open(r1.outfile).read().strip(), '1\n2') + self.assertTrue (os.path.exists(r1.job.rcfile)) + self.assertTrue (os.path.exists(r1.job.outfile)) + self.assertTrue (os.path.exists(r1.job.errfile)) + self.assertEqual (r1.job.rc(), 0) + self.assertEqual (open(r1.job.outfile).read().strip(), '1\n2') self.assertTrue (r1.isValid()) r2.submit() r2.wait() - self.assertTrue (os.path.exists(r2.rcfile)) - self.assertTrue (os.path.exists(r2.outfile)) - self.assertTrue (os.path.exists(r2.errfile)) - self.assertEqual (r2.rc(), 1) - self.assertEqual (open(r2.outfile).read().strip(), '2') - self.assertEqual (open(r2.errfile).read().strip(), '3') + self.assertTrue (os.path.exists(r2.job.rcfile)) + self.assertTrue (os.path.exists(r2.job.outfile)) + self.assertTrue (os.path.exists(r2.job.errfile)) + self.assertEqual (r2.job.rc(), 1) + self.assertEqual (open(r2.job.outfile).read().strip(), '2') + self.assertEqual (open(r2.job.errfile).read().strip(), '3') self.assertFalse (r2.isValid()) - #@unittest.skip("Skipping SGE test...") + @unittest.skip("Skipping SGE test...") def testSGERun (self): - r0 = runner_sge(self.scripts[0], { + r0 = runner_sge(self.jobs[0], { 'sgeRunner': {'sge_N': 'job_r0', 'sge_q': '1-hour', 'sge_M': 'Wang.Panwen@mayo.edu'} }) - r1 = runner_sge(self.scripts[1], { + r1 = runner_sge(self.jobs[1], { 'echo': True, 'sgeRunner': {'sge_N': 'job_r1', 'sge_q': '1-hour'} }) - r2 = runner_sge(self.scripts[4], { + r2 = runner_sge(self.jobs[4], { 'sgeRunner': {'sge_N': 'job_r4', 'sge_q': '1-hour'} }) diff --git a/tests/utils.unittest.py b/tests/utils.unittest.py index f8cf7cec..d1a8184b 100644 --- a/tests/utils.unittest.py +++ b/tests/utils.unittest.py @@ -1,6 +1,6 @@ import unittest import sys -import os +import os, shutil rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, rootdir) from pyppl import utils @@ -64,6 +64,128 @@ def method (self): objMethod = obj.method() self.assertEqual (objMethod, 'objMethod') + def testDictUpdate (self): + ref1 = {"c": 3, "d": 9} + ref2 = {"c": 4} + orig = {"a":1, "b":ref1} + newd = {"b":ref2, "c":8} + utils.dictUpdate (orig, newd) + self.assertEqual (orig, {"a":1, "b":{"c":4, "d":9}, "c":8}) + orig2 = {"a":1, "b":ref1} + newd2 = {"b":ref2, "c":8} + orig2.update(newd2) + self.assertEqual (orig2, {"a":1, "b":ref2, "c":8}) + + def testFuncSig (self): + def func1 (): + pass + + func2 = lambda x: x + func3 = "" + self.assertEqual (utils.funcSig(func1).strip(), "def func1 ():\n\t\t\tpass") + self.assertEqual (utils.funcSig(func2).strip(), "func2 = lambda x: x") + self.assertEqual (utils.funcSig(func3), "None") + + def testUid (self): + import random, string + + def randomword(length): + return ''.join(random.choice(string.lowercase) for i in range(length)) + + uids = {} + for i in range (10000): + s = randomword (10) + uid = utils.uid (s) + uids[uid] = 1 + self.assertEqual (len (uids.keys()), 10000) + + def testTargz (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + testdir = "./test/tgzfiles/tgzdir" + if not os.path.exists (testdir): + os.makedirs (testdir) + tgzfile = "./test/test.tgz" + open("./test/tgzfiles/a.txt", 'w').close() + open("./test/tgzfiles/b.txt", 'w').close() + open("./test/tgzfiles/tgzdir/b.txt", 'w').close() + open("./test/tgzfiles/tgzdir/c.txt", 'w').close() + utils.targz (tgzfile, "./test/tgzfiles") + self.assertTrue (os.path.exists(tgzfile)) + + shutil.rmtree ("./test/tgzfiles") + os.makedirs ("./test/tgzfiles") + utils.untargz (tgzfile, "./test/tgzfiles") + self.assertTrue (os.path.exists("./test/tgzfiles/a.txt")) + self.assertTrue (os.path.exists("./test/tgzfiles/b.txt")) + self.assertTrue (os.path.exists("./test/tgzfiles/tgzdir/b.txt")) + self.assertTrue (os.path.exists("./test/tgzfiles/tgzdir/c.txt")) + shutil.rmtree ("./test/") + + def testGz (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/") + orfile = "./test/gz.txt" + gzfile = orfile + '.gz' + open(orfile, 'w').close() + utils.gz (gzfile, orfile) + self.assertTrue (os.path.exists(gzfile)) + os.remove (orfile) + self.assertFalse (os.path.exists(orfile)) + utils.ungz (gzfile, orfile) + self.assertTrue (os.path.exists(orfile)) + shutil.rmtree ("./test/") + + def testFileSig (self): + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/") + thefile = "./test/filesig.txt" + open(thefile, 'w').write('') + sig = utils.fileSig (thefile) + from time import sleep + sleep (.1) + open(thefile, 'w').write('') + self.assertNotEqual (sig, utils.fileSig(thefile)) + shutil.rmtree ("./test/") + + def testAlwaysList(self): + string = "a,b, c, 'd,e'" + l = utils.alwaysList (string) + self.assertEqual (l, ['a', 'b', 'c', "'d,e'"]) + string = ["o1:var:{{c1}}", "o2:var:{{c2 | __import__('math').pow(float(_), 2.0)}}", "o3:file:{{c3.fn}}2{{c3.ext}}"] + l = utils.alwaysList (string) + self.assertEqual (l, string) + + def testSanitizeOutKey (self): + testdata = [ + ["a", ('__out.1__', 'var', 'a')], + ["key:val", ('key', 'var', 'val')], + ["file:val", ('__out.2__', 'file', 'val')], + ["a:var:c", ("a", "var", "c")] + ] + for data in testdata: + self.assertEqual(utils.sanitizeOutKey(data[0]), data[1]) + + def testChmodX (self): + + if os.path.exists ("./test/"): + shutil.rmtree ("./test/") + os.makedirs ("./test/") + thefile = "./test/chmodx.txt" + open(thefile, 'w').close() + self.assertEqual ([os.path.realpath(thefile)], utils.chmodX (thefile)) + shutil.rmtree ("./test/") + + def testLogger (self): + logger1 = utils.getLogger(name='logger1') + logger2 = utils.getLogger(name='logger2') + logger1.info ('logger1') + logger2.info ('logger2') + + logger2 = logger1 + logger2.info ('logger3') if __name__ == '__main__': unittest.main()