Skip to content

Commit

Permalink
Make the API consistent between static and dynamic graphs.
Browse files Browse the repository at this point in the history
  • Loading branch information
tshead committed Dec 3, 2020
1 parent 10de47a commit 5c56610
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 270 deletions.
28 changes: 14 additions & 14 deletions docs/image-processing-case-study.ipynb

Large diffs are not rendered by default.

119 changes: 58 additions & 61 deletions docs/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task A executing. Inputs: {}\n",
"INFO:graphcat:Task A finished. Output: None\n"
"INFO:graphcat.common:Task A executing. Inputs: {}\n",
"INFO:graphcat.common:Task A finished. Output: None\n"
]
},
{
Expand Down Expand Up @@ -270,10 +270,10 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task B executing. Inputs: {}\n",
"INFO:graphcat:Task B finished. Output: None\n",
"INFO:graphcat:Task C executing. Inputs: {None: [None, None]}\n",
"INFO:graphcat:Task C finished. Output: None\n"
"INFO:graphcat.common:Task B executing. Inputs: {}\n",
"INFO:graphcat.common:Task B finished. Output: None\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: None, None: None}\n",
"INFO:graphcat.common:Task C finished. Output: None\n"
]
},
{
Expand Down Expand Up @@ -404,10 +404,10 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task A executing. Inputs: {}\n",
"INFO:graphcat:Task A finished. Output: None\n",
"INFO:graphcat:Task C executing. Inputs: {None: [None, None]}\n",
"INFO:graphcat:Task C finished. Output: None\n"
"INFO:graphcat.common:Task A executing. Inputs: {}\n",
"INFO:graphcat.common:Task A finished. Output: None\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: None, None: None}\n",
"INFO:graphcat.common:Task C finished. Output: None\n"
]
},
{
Expand Down Expand Up @@ -491,11 +491,7 @@
" return 3\n",
"\n",
"def add(graph, name, inputs):\n",
" result = 0\n",
" for key, values in inputs.items():\n",
" for value in values:\n",
" result += value\n",
" return result"
" return sum([value() for value in inputs.values()])"
]
},
{
Expand Down Expand Up @@ -570,7 +566,7 @@
"raw_mimetype": "text/restructuredtext"
},
"source": [
"Notice that changing the functions with :meth:`graphcat.StaticGraphGraph.set_task` also marks the tasks as unfinished. This is an example of how Graphcat always ensures that changes to the graph will propagate to its results. Let's update the graph and see what happens:"
"Notice that changing the functions with :meth:`graphcat.StaticGraph.set_task` also marks the tasks as unfinished. This is an example of how Graphcat always ensures that changes to the graph will propagate to its results. Let's update the graph and see what happens:"
]
},
{
Expand All @@ -582,12 +578,12 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task A executing. Inputs: {}\n",
"INFO:graphcat:Task A finished. Output: 2\n",
"INFO:graphcat:Task B executing. Inputs: {}\n",
"INFO:graphcat:Task B finished. Output: 3\n",
"INFO:graphcat:Task C executing. Inputs: {None: [2, 3]}\n",
"INFO:graphcat:Task C finished. Output: 5\n"
"INFO:graphcat.common:Task A executing. Inputs: {}\n",
"INFO:graphcat.common:Task A finished. Output: 2\n",
"INFO:graphcat.common:Task B executing. Inputs: {}\n",
"INFO:graphcat.common:Task B finished. Output: 3\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: 2, None: 3}\n",
"INFO:graphcat.common:Task C finished. Output: 5\n"
]
},
{
Expand Down Expand Up @@ -646,7 +642,7 @@
"source": [
"Now, the full meaning of the log messages should be clearer - tasks \"A\" and \"B\" have no inputs when they execute, returning 2 and 3 respectively as their outputs. Those outputs become inputs to \"C\" when it executes, where they are summed, so that the output of \"C\" is 5, as expected.\n",
"\n",
"Of course, you normally want to retrieve outputs from your graph so you can do something with them. So far, we've only seen the outputs in log messages when we call :meth:`graphcat.StaticGraph.update`. To retrieve the most recent output for a task, use :meth:`graphcat.StaticGraphGraph.output` instead:"
"Of course, you normally want to retrieve the outputs from your graph so you can do something with them. So far, all we've seen are log messages. To retrieve the most recent output for a task, use :meth:`graphcat.StaticGraph.output` instead of :meth:`graphcat.StaticGraph.update`:"
]
},
{
Expand All @@ -672,11 +668,11 @@
"raw_mimetype": "text/restructuredtext"
},
"source": [
"Note that :meth:`graphcat.StaticGraphGraph.output` automatically calls :meth:`graphcat.StaticGraph.update` for you, so you can just use the former whenever you need to execute your graph and retrieve an output.\n",
"Note that :meth:`graphcat.StaticGraph.output` automatically calls :meth:`graphcat.StaticGraph.update` for you, so you can just use the former whenever you need to execute your graph and retrieve an output.\n",
"\n",
"Now that our graph is peforming a real (albeit trivial) task, let's look at some ways to simplify setting it up:\n",
"\n",
"First, it is extremely common for a graph to have \"parameter\" tasks that simply return a value, as tasks \"A\" and \"B\" do in our example. Having to create a separate function for every parameter, as we did for this example, would be perverse. Fortunately, Graphcat provides a helper function, :func:`graphcat.constant`, that you can use instead:"
"First, it is extremely common for a graph to have \"parameter\" tasks that simply return a value, as tasks \"A\" and \"B\" do in our example. Having to create a separate function for every parameter would be perverse. Fortunately, Graphcat provides a helper function, :func:`graphcat.constant`, that you can use instead:"
]
},
{
Expand All @@ -688,12 +684,12 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task A executing. Inputs: {}\n",
"INFO:graphcat:Task A finished. Output: 4\n",
"INFO:graphcat:Task B executing. Inputs: {}\n",
"INFO:graphcat:Task B finished. Output: 5\n",
"INFO:graphcat:Task C executing. Inputs: {None: [4, 5]}\n",
"INFO:graphcat:Task C finished. Output: 9\n"
"INFO:graphcat.common:Task A executing. Inputs: {}\n",
"INFO:graphcat.common:Task A finished. Output: 4\n",
"INFO:graphcat.common:Task B executing. Inputs: {}\n",
"INFO:graphcat.common:Task B finished. Output: 5\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: 4, None: 5}\n",
"INFO:graphcat.common:Task C finished. Output: 9\n"
]
},
{
Expand All @@ -718,7 +714,7 @@
"source": [
":func:`graphcat.constant` is a factory for task functions that always return a value you provide, eliminating the need to create dedicated task functions of your own for parameters. This also means that you can easily change your workflow parameters with :meth:`graphcat.StaticGraph.set_task` any time that the inputs to your workflow change, whether due to user input, changes in the environment, network traffic, or anything else that your workflow depends upon.\n",
"\n",
"Next, you may wonder why it's necessary to call both :meth:`graphcat.StaticGraphGraph.add_task` and :meth:`graphcat.StaticGraph.set_task` just to create a working task. In fact, you don't - either method can create a task and assign its function in a single step:"
"Next, you may wonder why it's necessary to call both :meth:`graphcat.StaticGraph.add_task` and :meth:`graphcat.StaticGraph.set_task` just to create a working task. In fact, you don't - either method can create a task and assign its function in a single step:"
]
},
{
Expand All @@ -727,7 +723,7 @@
"metadata": {},
"outputs": [],
"source": [
"graph.set_task(\"D\", graphcat.constant(1.5))"
"graph.set_task(\"D\", graphcat.constant(6))"
]
},
{
Expand All @@ -746,17 +742,17 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task D executing. Inputs: {}\n",
"INFO:graphcat:Task D finished. Output: 1.5\n",
"INFO:graphcat:Task C executing. Inputs: {None: [4, 5, 1.5]}\n",
"INFO:graphcat:Task C finished. Output: 10.5\n"
"INFO:graphcat.common:Task D executing. Inputs: {}\n",
"INFO:graphcat.common:Task D finished. Output: 6\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: 4, None: 5, None: 6}\n",
"INFO:graphcat.common:Task C finished. Output: 15\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Result: 10.5\n"
"Result: 15\n"
]
},
{
Expand Down Expand Up @@ -824,7 +820,7 @@
"source": [
"## Named Inputs\n",
"\n",
"By now, you should have questions about the way inputs are passed to task functions. From the log message in the preceding example - `{None: [4, 5, 1.5]}` - it's obvious that the results from \"A\", \"B\", and \"D\" are passed to \"C\" using a list - `[4, 5, 1.5]`, but why is the list part of a dict, and why is the key `None`?"
"By now, you should have questions about the way inputs are passed to task functions. From the log message in the preceding example - `{None: 4, None: 5, None: 6}` - it's obvious that the results from \"A\", \"B\", and \"D\" are passed to \"C\" using something that looks like a dict, but what's with the key `None`, and why does it appear multiple times (something that can't happen with an actual dict)?"
]
},
{
Expand All @@ -835,7 +831,7 @@
"source": [
"What's happening is that when you create a link between a source and a target, you also specify a *named input* for the edge. The named input becomes the key in the task function's input dict. This makes it easier for task functions with multiple inputs to tell those inputs apart. If you don't specify a named input when you create a link, it defaults to :any:`None`.\n",
"\n",
"Let's change our current example to see what this looks like. Instead of addition, we will create a new task function that can be used to generate a common greeting:"
"Let's change our current example to see what this looks like. Instead of adding values, we'll create a new task function that generates greetings:"
]
},
{
Expand All @@ -845,15 +841,14 @@
"outputs": [],
"source": [
"def greeting(graph, name, inputs):\n",
" return f\"{inputs['greeting'][0]}, {inputs['subject'][0]}!\""
" return f\"{inputs.get('greeting')}, {inputs.get('subject')}!\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the `greeting()` task function expects the input dict to contain inputs named \"greeting\" and \"subject\". Also note that because input dict values are always lists, the function uses `[0]` to get the first value in each list.\n",
"\n",
"Note that the `greeting()` task function expects inputs named \"greeting\" and \"subject\", otherwise it will fail. \n",
"Now we can setup the parameter and greeting task functions for our existing graph:"
]
},
Expand All @@ -872,7 +867,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"And we'll replace our existing links with links that connect to the named inputs required by \"C\"'s task function:"
"And we'll replace our existing links with links that connect to the named inputs required by the `greeting` function:"
]
},
{
Expand Down Expand Up @@ -901,12 +896,12 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task A executing. Inputs: {}\n",
"INFO:graphcat:Task A finished. Output: Hello\n",
"INFO:graphcat:Task B executing. Inputs: {}\n",
"INFO:graphcat:Task B finished. Output: World\n",
"INFO:graphcat:Task C executing. Inputs: {None: [1.5], 'greeting': ['Hello'], 'subject': ['World']}\n",
"INFO:graphcat:Task C finished. Output: Hello, World!\n"
"INFO:graphcat.common:Task A executing. Inputs: {}\n",
"INFO:graphcat.common:Task A finished. Output: Hello\n",
"INFO:graphcat.common:Task B executing. Inputs: {}\n",
"INFO:graphcat.common:Task B finished. Output: World\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: 6, greeting: Hello, subject: World}\n",
"INFO:graphcat.common:Task C finished. Output: Hello, World!\n"
]
},
{
Expand Down Expand Up @@ -982,11 +977,13 @@
"raw_mimetype": "text/restructuredtext"
},
"source": [
"Note that the notebook diagram links are labelled when they're connected to any input other than :any:`None`.\n",
"Note that the notebook diagram links are labelled when they're connected to any inputs with names other than :any:`None`.\n",
"\n",
"Now, the input dict for \"C\" printed to the log should make more sense - it contains a list of outputs for each named input: in our case, \"greeting\" and \"subject\". Task \"D\" is still connected to :any:`None`, but it's ignored by the `greeting` implementation.\n",
"\n",
"Now, the input dict for \"C\" printed to the log should make more sense - it contains a list of outputs for each named input: in our case, \"greeting\" and \"subject\". Task \"D\" is still connected to :any:`None`, but it's ignored by the `greeting` implementation. It should also be clear now why each named input is associated with a list of values: you can connect multiple tasks to a single input, one task to multiple inputs, or any combination of the two.\n",
"It should also be clear now why a name can appear more than once in a task's inputs: you can connect multiple tasks to a single input, one task to multiple inputs, or any combination of the two.\n",
"\n",
"By examining the input dict, a task function can implement any desired behavior, from very strict (failing unless the dict contains a specific set of named inputs with explicit requirements on number of inputs and types) to very permissive (adjusting functionality based on names, numbers, and types of values in the input dict)."
"By examining the input object, a task function can implement any desired behavior, from very strict (failing unless the dict contains a specific set of named inputs with explicit requirements on number of inputs and types) to very permissive (adjusting functionality based on names, numbers, and types of values in the input dict), and anywhere in-between."
]
},
{
Expand Down Expand Up @@ -1023,8 +1020,8 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task D executing. Inputs: {}\n",
"ERROR:graphcat:Task D failed. Exception: Whoops!\n"
"INFO:graphcat.common:Task D executing. Inputs: {}\n",
"ERROR:graphcat.common:Task D failed. Exception: Whoops!\n"
]
},
{
Expand Down Expand Up @@ -1115,8 +1112,8 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task D executing. Inputs: {}\n",
"ERROR:graphcat:Task D failed. Exception: Whoops!\n"
"INFO:graphcat.common:Task D executing. Inputs: {}\n",
"ERROR:graphcat.common:Task D failed. Exception: Whoops!\n"
]
},
{
Expand Down Expand Up @@ -1205,10 +1202,10 @@
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:graphcat:Task D executing. Inputs: {}\n",
"INFO:graphcat:Task D finished. Output: 42\n",
"INFO:graphcat:Task C executing. Inputs: {None: [42], 'greeting': ['Hello'], 'subject': ['World']}\n",
"INFO:graphcat:Task C finished. Output: Hello, World!\n"
"INFO:graphcat.common:Task D executing. Inputs: {}\n",
"INFO:graphcat.common:Task D finished. Output: 42\n",
"INFO:graphcat.common:Task C executing. Inputs: {None: 42, greeting: Hello, subject: World}\n",
"INFO:graphcat.common:Task C finished. Output: Hello, World!\n"
]
},
{
Expand Down
3 changes: 2 additions & 1 deletion features/steps/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ def step_impl(context, names):
def step_impl(context, names, inputs):
names = eval(names)
inputs = eval(inputs)

test.assert_equal(names, context.events.executed)
test.assert_equal(inputs, context.events.inputs)
test.assert_equal(inputs, [inputs.dict() for inputs in context.events.inputs])


@then(u'tasks {names} are executed')
Expand Down
73 changes: 68 additions & 5 deletions graphcat/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,13 @@ def update(self, name):

try:
# Gather inputs for the function.
inputs = collections.defaultdict(list)
for target, source, input in self._graph.out_edges(name, data="input"):
output = self._graph.nodes[source]["output"]
inputs[input].append(output)
inputs = dict(inputs)
inputs = NamedInputs(self, name)

# inputs = collections.defaultdict(list)
# for target, source, input in self._graph.out_edges(name, data="input"):
# output = self._graph.nodes[source]["output"]
# inputs[input].append(output)
# inputs = dict(inputs)

# Execute the function and store the output.
self._on_execute.send(self, name=name, inputs=inputs)
Expand All @@ -592,3 +594,64 @@ def update(self, name):
raise exception


class NamedInputs(object):
def __init__(self, graph, name):
def constant(value):
def implementation():
return value
return implementation

edges = graph._graph.out_edges(name, data="input")
self._keys = [input for target, source, input in edges]
self._values = [constant(graph._graph.nodes[source]["output"]) for target, source, input in edges]

def __contains__(self, name):
return name in self._keys

def __getitem__(self, key):
return self.getall(key)

def __len__(self):
return len(self._keys)

def __repr__(self):
inputs = ", ".join([f"{key}: {value()}" for key, value in zip(self._keys, self._values)])
return f"{{{inputs}}}"

def dict(self):
result = collections.defaultdict(list)
for key, value in zip(self._keys, self._values):
result[key].append(value())
return dict(result)

def get(self, name, default=None):
values = [value for key, value in zip(self._keys, self._values) if key == name]
if len(values) == 0:
return default
elif len(values) == 1:
return values[0]()
else:
raise KeyError(f"More than one input {name!r}")

def getall(self, name):
return [value() for key, value in zip(self._keys, self._values) if key == name]

def getone(self, name):
values = [value for key, value in zip(self._keys, self._values) if key == name]
if len(values) == 0:
raise KeyError(name)
elif len(values) == 1:
return values[0]()
else:
raise KeyError(f"More than one input {name!r}")

def items(self):
return zip(self._keys, self._values)

def keys(self):
return self._keys

def values(self):
return self._values


Loading

0 comments on commit 5c56610

Please sign in to comment.