Skip to content

Commit

Permalink
Version:0.4.0; Add use job class for jobs in a proc; Use "1,2 3,4" fo…
Browse files Browse the repository at this point in the history
…r channel.fromArgs for multi-width channels; Add rbind, cbind, slice for channel; Add alias for some proc properties; Remove callfront for proc; Add export cache mode; Add gzip export support (#1); Unify loggers; Use job cache instead of proc cache so that a proc can be partly cached; Rewrite buildInput and buildOutput; Use job to construct runners;
  • Loading branch information
pwwang committed Apr 13, 2017
1 parent 459a834 commit af3bd3f
Show file tree
Hide file tree
Showing 16 changed files with 1,120 additions and 787 deletions.
1 change: 1 addition & 0 deletions pyppl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pyppl import pyppl
from pyppl import proc
from pyppl import job
from pyppl import aggr
from pyppl import channel
from pyppl import utils
Expand Down
1 change: 1 addition & 0 deletions pyppl/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from channel import channel
from proc import proc
from aggr import aggr
from job import job
import utils
155 changes: 78 additions & 77 deletions pyppl/helpers/channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import copy as pycopy
import utils

class channel (list):

Expand All @@ -13,8 +14,6 @@ def create (l = []):
@staticmethod
def fromChannels (*args):
ret = channel.create()
if not args:
return ret
ret.merge (*args)
return ret

Expand Down Expand Up @@ -43,33 +42,30 @@ def fromPairs (pattern):
return c

@staticmethod
def fromArgv (width = 1):
def fromArgv ():
from sys import argv
ret = channel.create()
args = argv[1:]
alen = len (args)
if alen == 0: return channel()
if width == None: width = alen
if alen % width != 0:
raise Exception('Length (%s) of argv[1:] must be exactly divided by width (%s)' % (alen, width))
if alen == 0: return ret

ret = channel()
for i in xrange(0, alen, width):
tmp = ()
for j in range(width):
tmp += (args[i+j], )
ret.append (tmp)
width = None
for arg in args:
items = channel._tuplize(utils.split(arg, ','))
if width is not None and width != len(items):
raise ValueError('Width %s (%s) is not consistent with previous %s' % (len(items), arg, width))
width = len(items)
ret.append (items)
return ret

@staticmethod
def _tuplize (tu):
if isinstance(tu, str) or isinstance(tu, unicode):
if isinstance(tu, (str, unicode)):
tu = (tu, )
else:
try:
iter(tu)
except:
tu = (tu, )
return tu
try: iter(tu)
except: tu = (tu, )
return tuple(tu)

# expand the channel according to the files in <col>, other cols will keep the same
# [(dir1/dir2, 1)].expand (0, "*") will expand to
Expand Down Expand Up @@ -106,11 +102,9 @@ def copy (self):
return pycopy(self)

def width (self):
if not self:
return 0
if not self: return 0
ele = self[0]
if not isinstance(ele, tuple):
return 1
if not isinstance(ele, tuple): return 1
return len(ele)

def length (self):
Expand All @@ -124,69 +118,76 @@ def filter (self, func):

def reduce (self, func):
return channel.create(reduce(func, self))

def merge (self, *args):
if not args: return
maxlen = max(map(len, args))
minlen = min(map(len, args))
if maxlen != minlen:
raise Exception('Cannot merge channels with different length (%s, %s).' % (maxlen, minlen))
clen = len (self)
if clen != 0 and clen != maxlen:
raise Exception('Cannot merge channels with different length (%s, %s).' % (maxlen, clen))

for i in range(maxlen):
tu = () if clen==0 else channel._tuplize(self[i])
for arg in args:
tu += channel._tuplize (arg[i])
if clen == 0:
self.append(tu)
else:
self[i] = tu

def rbind (self, row):
row = channel._tuplize(row)
if self.length() != 0 and self.width() != len(row):
raise ValueError ('Cannot bind row (len: %s) to channel (width: %s): width is different.' % (len(row), self.width()))
self.append (row)
return self

def insert (self, idx, val):
idx = self.width() if idx is None else idx
if not isinstance(val, list):
val = [val]
if len (val) == 1:
val = val * len (self)
elif len(val) != len(self):
raise Exception('Cannot merge channels with different length (%s, %s).' % (len(self), len(val)))
for i in range(len(self)):
ele = list (self[i])
newele = ele[:idx] + list(channel._tuplize(val[i])) + ele[idx:]
self[i] = tuple (newele)
def rbindMany (self, *rows):
for row in rows: self.rbind(row)
return self

def split (self):
ret = []
for i, tu in enumerate(self):
if isinstance(tu, str):
tu = (tu, )
try:
iter(tu)
except:
tu = (tu, )
if i == 0:
for t in tu: ret.append(channel())

for j, t in enumerate(tu):
ret[j].append(channel._tuplize(t))
def cbind (self, col):
if not isinstance(col, list): col = [col]
if len (col) == 1: col = col * max(1, self.length())
if self.length() == 0 :
for ele in col: self.append (channel._tuplize(ele))
elif self.length() == len (col):
for i in range (self.length()):
self[i] += channel._tuplize(col[i])
else:
raise ValueError ('Cannot bind column (len: %s) to channel (length: %s): length is different.' % (len(col), self.length()))
return self

def cbindMany (self, *cols):
for col in cols:
self.cbind(col)
return self

def merge (self, *chans):
for chan in chans:
if not isinstance(chan, channel): chan = channel.create(chan)
cols = [x.toList() for x in chan.split()]
self.cbindMany (*cols)
return self

def colAt (self, index):
return self.slice (index, 1)

def slice (self, start, length = None):
if start is None: start = self.width()
if length is None: length = self.width()
if start < 0: start = start + self.width()
if start >= self.width(): return channel.create()
ret = channel.create()
if length == 0: return ret
for ele in self:
row = tuple (ele[start:start+length])
ret.rbind (row)
return ret

def insert (self, index, col):
if not isinstance(col, list): col = [col]
if len (col) == 1: col = col * max(1, self.length())
part1 = self.slice (0, index)
part2 = self.slice (index)
del self[:]
self.merge (part1)
self.cbind (col)
self.merge (part2)
return self

def split (self):
return [ self.colAt(i) for i in range(self.width()) ]

def toList (self): # [(a,), (b,)] to [a, b], only applicable when width =1
if self.width() != 1:
raise Exception ('Width = %s, but expect width = 1.' % self.width())
raise ValueError ('Width = %s, but expect width = 1.' % self.width())

ret = []
for e in self:
if isinstance(e, list):
v = e # support list elements
else:
(v, ) = e
ret.append(v)
return ret
return [ e[0] if isinstance(e, tuple) else e for e in self ]



111 changes: 111 additions & 0 deletions pyppl/helpers/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from hashlib import md5
from glob import glob
from os import path, remove, symlink, makedirs
from shutil import rmtree, copytree, copyfile, move
import gzip
import utils


class job (object):

def __init__(self, index, workdir, input = None, output = None):

self.script = path.join (workdir, "scripts", "script.%s" % index)
self.rcfile = path.join (workdir, "scripts", "script.%s.rc" % index)
self.outfile = path.join (workdir, "scripts", "script.%s.stdout" % index)
self.errfile = path.join (workdir, "scripts", "script.%s.stderr" % index)
self.input = {'var':[], 'file':[], 'files':[]} if input is None else input
self.output = {'var':[], 'file':[]} if output is None else input
self.index = index

def signature (self):
sSig = utils.fileSig(self.script)
iSig = ''
oSig = ''
for val in self.input['var']:
val = str(val)
iSig += "|var:" + md5(val).hexdigest()
for val in self.input['file']:
iSig += "|file:" + utils.fileSig(val)
iSig += "|files"
for val in self.input['files']:
for v in val: iSig += ':' + utils.fileSig(v)

for val in self.output['var']:
val = str(val)
oSig += "|var:" + md5(val).hexdigest()
for val in self.output['file']:
oSig += "|file:" + utils.fileSig(val)
return md5(sSig + ';' + iSig + ';' + oSig).hexdigest()

def rc (self):
if not path.exists (self.rcfile): return -99
rccodestr = open (self.rcfile).read().strip()
return int(rccodestr) if rccodestr else -99

# whether output files are generated
def outfileGenerated (self):
for outfile in self.output['file']:
if not path.exists (outfile):
raise Exception ('[Job#%s]: Output file not generated: %s' % (self.index, outfile))

# use export as cache
def exportCached (self, exdir, how, log):
if how == 'symlink':
raise ValueError ('Unable to use export cache when you export using symlink.')
if not exdir:
raise ValueError ('Output files not exported, cannot use them for caching.')

exfiles = [path.join (exdir, path.basename(outfile)) for outfile in self.output['file']]
for i, exfile in enumerate(exfiles):
outfile = self.output['file'][i]
if how == 'gzip':
gzfile = exfile + '.gz'
tgzfile = exfile + '.tgz'
if not path.exists(gzfile) and not path.exists(tgzfile): return False
if path.exists(outfile):
log ('Overwrite file/dir to use export for caching: %s' % outfile, 'warning')
if path.isdir (outfile): rmtree (outfile)
else: remove (outfile)
if path.exists(gzfile):
utils.ungz (gzfile, outfile)
elif path.exists(tgzfile):
makedirs(outfile)
utils.untargz (tgzfile, outfile)
else:
if not path.exists (exfile): return False
if path.exists(outfile):
log ('Overwrite file/dir to use export for caching: %s' % outfile, 'warning')
if path.isdir (outfile): rmtree (outfile)
else: remove (outfile)
symlink (exfile, outfile)
return True

def export (self, exdir, how, ow, log):
for outfile in self.output['file']:
bn = path.basename (outfile)
target = path.join (exdir, bn)

if path.exists (target):
if ow:
if path.isdir (target):rmtree (target)
else: remove (target)
else:
log('%s (target exists, skipped)' % target, 'warning')
if not path.exists (target):
log ('%s (%s)' % (target, how), 'info', 'EXPORT')
if how == 'copy':
if path.isdir (outfile): copytree (outfile, target)
else: copyfile (outfile, target)
elif how == 'move':
move (outfile, target)
symlink(target, outfile) # make sure dependent proc can run
elif how == 'symlink':
symlink (outfile, target)
elif how == 'gzip':

if path.isdir (outfile):
utils.targz (target + '.tgz', outfile)
else:
utils.gz (target + '.gz', outfile)

0 comments on commit af3bd3f

Please sign in to comment.