Skip to content

Commit

Permalink
moved GraphFunc to symboltable.py and refactore. fixed race condition…
Browse files Browse the repository at this point in the history
… with condition variables.
  • Loading branch information
kuitang committed Jul 18, 2011
1 parent 0a8d981 commit afa12fa
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 93 deletions.
114 changes: 28 additions & 86 deletions flowconduit/flowconduit.py
@@ -1,5 +1,5 @@
from symboltable import SymbolTable
from topsort import topsort
from symboltable import SymbolTable, SymbolTableFunc
from topsort import topsort_from_srcs, dfs_visit, TopsortState

import sys
from threading import Lock, RLock, Condition
Expand All @@ -17,77 +17,6 @@ def transpose(adj):
newadj[v].add(u)
return newadj

class GraphFunc(object):
def __init__(self, f, inputs=None, outputs=None):
self.name = f.__name__
if isinstance(inputs, Mapping):
def pf_mapping(syms):
d = dict((k, syms[v]) for k,v in inputs.items())
return f(**d)
self.prepared_f = pf_mapping
self.input_syms = inputs.values()
elif isinstance(inputs, (str, unicode)):
self.prepared_f = lambda syms: f(syms[inputs])
self.input_syms = [ inputs ]
elif isinstance(inputs, Iterable):
self.prepared_f = lambda syms: f(*[syms[k] for k in inputs])
self.input_syms = inputs
elif inputs is None:
self.prepared_f = lambda _: f()
self.input_syms = None
else:
raise ArgumentError("GraphFunc's inputs must be Mapping, str, unicode, Iterable, or None")

if isinstance(outputs, Mapping):
def osym_mapping(symtable, retval):
newtable = symtable.new_child()
if isinstance(retval, Mapping):
d = retval
elif hasattr(retval, '__dict__'):
d = retval.__dict__
else:
raise ArgumentError('GraphFunc called with a Mapping outputs expects f to return a Mapping or object with __dict__ attribute')

for k,v in outputs.items(): newtable[v] = d[k]
return newtable
self.output = osym_mapping
self.output_syms = outputs.values()
elif isinstance(outputs, str):
def osym_scalar(symtable, retval):
newsyms = symtable.new_child()
newsyms[outputs] = retval
return newsyms
self.output = osym_scalar
self.output_syms = [ outputs ]
elif isinstance(outputs, Iterable):
def osym_iterable(symtable, retval):
newtable = symtable.new_child()
if isinstance(retval, Mapping):
for k in outputs: newtable[k] = retval[k]
elif hasattr(retval, '__dict__'):
for k in outputs: newtable[k] = retval.__dict__[k]
elif isinstance(retval, Iterable):
for k,v in izip(outputs, retval): newtable[k] = v
else:
raise ArgumentError('GraphFunc called with a Iterable output expects f to return a Mapping or object with __dict__ attribute or a Iterable')
return newtable
self.output = osym_iterable
self.output_syms = outputs
elif outputs is None:
self.output = lambda syms, _: syms
self.output_syms = None
else:
raise ArgumentError("GraphFunc's outputs must be Mapping, str, unicode, Iterable, or None")

def get_func(self):
def f(symtable):
newsyms = self.output(symtable, self.prepared_f(symtable))
return newsyms
return f

def __call__(self, symtable):
newsyms = self.output(symtable, self.prepared_f(symtable))
return newsyms

class SynchronizedCounter(object):
"""A counter to share between threads."""
Expand Down Expand Up @@ -147,7 +76,7 @@ def __init__(self):

def add_func(self, f, inputs=None, outputs=None):
self.finalized = False
gf = GraphFunc(f, inputs, outputs)
gf = SymbolTableFunc(f, inputs, outputs)
self.supplier.update((sym, gf) for sym in gf.output_syms)
symdeps = (gf, gf.input_syms)
self.symdeps.append(symdeps)
Expand All @@ -159,15 +88,20 @@ def finalize(self):
self.depends[gf] = set(self.supplier[s] for s in syms)
self.preceded_by = transpose(self.depends)

def _topsort_subgraph(self, symnames):
suppliers = set(self.supplier[s] for s in symnames)
depends_toporder = topsort_from_srcs(self.depends, suppliers)
toporder = topsort_from_srcs(self.depends, depends_toporder)
return toporder

def run(self, symnames, init_symbols=None, N=4):
if isinstance(symnames, (str, unicode)):
symnames = [ symnames ]

self.finalize()
toporder = topsort(self.preceded_by)
sources = [ v for v in toporder if len(self.depends[v]) == 0 ]
toporder = self._topsort_subgraph(symnames)
sources = ( v for v in toporder if len(self.depends[v]) == 0 )
remain_to_submit = SynchronizedCounter(len(toporder))

finished_deps = defaultdict(SynchronizedCounter)
p = ThreadPool(N)

Expand All @@ -177,7 +111,13 @@ def run(self, symnames, init_symbols=None, N=4):
syms = init_symbols

parentlock = RLock()
cv_done_submitting = Condition()

done_submitting = Condition()
# If the child thread notifies before the parent thread reaches the
# wait statement, then the parent will never receive the notification
# and will block forever. To fix this, the child will decrement this
# counter to zero, and the parent will check this before waiting.
done_submitting_helper = SynchronizedCounter(1)
# The callback runs within the thread. Don't know how to fix.
def make_apply_callback(gf):
def finished(new_syms):
Expand Down Expand Up @@ -205,20 +145,22 @@ def finished(new_syms):
# Queue doesn't need to be locked
p.add_task(next_gf, args=(symtable,), callback=make_apply_callback(next_gf))
if remain_to_submit.dec() == 0:
print >> sys.stderr, "All jobs have been submitted. Notifying parent thread."
cv_done_submitting.acquire()
cv_done_submitting.notify()
cv_done_submitting.release()
print >> sys.stderr, "All jobs have been submitted. Waiting for parent thread to be ready to receive done_submitting"
done_submitting.acquire()
done_submitting.notify()
done_submitting.release()
done_submitting_helper.dec()
return finished

for s in sources:
remain_to_submit.dec()
p.add_task(s, args=(SymbolTable(),), callback=make_apply_callback(s))

cv_done_submitting.acquire()
print >> sys.stderr, "PARENT THREAD: Awaiting condition variable"
cv_done_submitting.wait()
cv_done_submitting.release()
if done_submitting_helper.get() > 0:
done_submitting.acquire()
print >> sys.stderr, "PARENT THREAD: Awaiting condition variable"
done_submitting.wait()
done_submitting.release()
print >> sys.stderr, "PARENT THREAD: Joining the thread pool"
p.wait_completion()

Expand Down
7 changes: 1 addition & 6 deletions flowconduit/symboltable.py
Expand Up @@ -60,9 +60,6 @@ def __iter__(self, chain_from_iterable=chain.from_iterable):
def __contains__(self, key, any=any):
return any(key in m for m in self.maps())

# def __repr__(self):
# return '<symboltable.SymbolTable object at %x>'%id(self)
#
def __repr__(self, repr=repr):
return 'SymbolTable(map=%s, parents=%s)'%(repr(self.map), repr(self.parents))

Expand All @@ -77,9 +74,7 @@ def pf_mapping(syms):
prepared_f = lambda syms: f(syms[inputs])
input_syms = [ inputs ]
elif isinstance(inputs, Iterable):
#prepared_f = lambda syms: f(*[syms[k] for k in inputs])
def prepared_f(syms):
print "PREPARED_F: syms = ", syms
prepared_f = lambda syms: f(*[syms[k] for k in inputs])
input_syms = inputs
elif inputs is None:
prepared_f = lambda _: f()
Expand Down
2 changes: 1 addition & 1 deletion flowconduit/threadpool.py
Expand Up @@ -27,7 +27,7 @@ def __init__(self, num_threads):
for _ in xrange(num_threads): Worker(self.tasks)

def add_task(self, func, args, kwargs={}, callback=lambda x: x):
print "add_task: ", locals()
# print >> sys.stderr, "add_task: ", locals()
self.tasks.put((func, args, kwargs, callback))

def wait_completion(self):
Expand Down

0 comments on commit afa12fa

Please sign in to comment.