-
Notifications
You must be signed in to change notification settings - Fork 1
/
dataflow.py
27 lines (22 loc) · 880 Bytes
/
dataflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _resolve(arg, assignments={}):
if isinstance(arg, DelayedFunctionCall):
return arg.execute(assignments)
elif isinstance(arg, Variable):
return assignments[arg]
return arg
class DelayedFunctionCall:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def execute(self, assigments={}):
# step 1: resolve input paramaters
args = [_resolve(arg, assigments) for arg in self.args]
kwargs = {k: _resolve(v, assigments) for k, v in self.kwargs.items()}
# step 2: compute function call given input parameters
return self.func(*args, **kwargs)
def dataflow(func):
# "calling" the function will instead bind the input parameters
return lambda *args, **kwargs: DelayedFunctionCall(func, *args, **kwargs)
class Variable:
pass