In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from tempfile import gettempdir

from hpcflow.api import (
    WorkflowTemplate, Workflow, TaskSchema, Task, Parameter, SchemaInput, load_config,
    InputValue, ElementSet, InputSource, ValueSequence, ElementPropagation
)

# Usually the user will have a known configuration directory; this step won't be 
# necessary. (I am doing this to avoid hpcflow reading the config file for the old version
# of hpcflow.)
load_config(config_dir=gettempdir())

In [3]:
# Note we can also import the App instance like this:
from hpcflow.api import hpcflow

# In MatFlow, this will be *slightly* less confusing, like this: `from matflow import MatFlow`
# The App instance shouldn't normally need to be imported.

print(hpcflow.version)

0.2.0a22


### Example 1: simple workflow

#### Workflow generation using the API

In [4]:
# Define some parameters (when end users use MatFlow, parameter types they/we define will 
# be domain-specific and more descriptive -- not just p1, p2 etc! The same applies for
# task schemas.):
p1 = Parameter('p1')
p2 = Parameter('p2')
p3 = Parameter('p3')
p4 = Parameter('p4')

# Define some task schemas
# note: we leave `actions` empty; for real use, actions will contain the commands that need
#       to run for the schema (e.g. execute some program).
s1 = TaskSchema(objective='t1', actions=[], inputs=[p1], outputs=[p2])
s2 = TaskSchema(objective='t2', actions=[], inputs=[p2, p3], outputs=[p4])

# Define some tasks:
t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])
t2 = Task(schemas=[s2], inputs=[InputValue(p3, 301)]) # note: p2 will originate from t1 output

# Generate a workflow template:
wkt = WorkflowTemplate(name='w1', tasks=[t1, t2])

# Write a persistent workflow from the template:
# note: by default the workflow directory will be the workflow name + a date-timestamp
#       here we pass the name explicitly, so there will be no date-timestamp, and we
#       overwrite. This avoids generating loads of workflow directories during testing.
wk = Workflow.from_template(wkt, name=wkt.name, overwrite=True) 

#### Examining the workflow structure

We can access tasks by their name:

In [5]:
wk.tasks.t1

WorkflowTask(name='t1')

... or by index:

In [6]:
wk.tasks[0]

WorkflowTask(name='t1')

Each task is associated with one or more elements:

In [7]:
wk.tasks.t1.elements

[Element(task=WorkflowTask(name='t1'), global_index=0)]

Each element has input and output values, for each of the input/output types defined in the schema:

In [8]:
wk.tasks.t1.elements[0].inputs, wk.tasks.t1.elements[0].outputs

(ElementInputs(p1), ElementOutputs(p2))

We also define "resources" e.g. where to run a given element. Here we see the default value, since we did not specify the resources for this task:

In [9]:
wk.tasks.t1.elements[0].resources

[ResourceSpec(scope=ActionScope.any())]

Resources are a list of `ResourceSpec` objects, where each object has a `scope` attribute, which tells us for which of the actions defined in the task schema, this resource specification applies. 

The parameter (input or output) values can be accessed by name:

In [12]:
wk.tasks.t1.elements[0].inputs.p1

101

There is a lower-level `Element` function for accessing parameter data by "path":

In [13]:
wk.tasks.t1.elements[0].get("inputs.p1")

101

Using `get` with no path argument returns all parameter data for that element:

In [14]:
wk.tasks.t1.elements[0].get()

{'resources': {'any': {'num_cores': None, 'scratch': None}},
 'inputs': {'p1': 101},
 'outputs': {'p2': None}}

#### Examining the event log

In [15]:
# the workflow is created and then tasks are added; for each task an empty task is first
# created and then element sets are added to the task:
print(wk.event_log.format_events())

0 2023-01-27 14:08:08 (Adam-Desktop): create_workflow
1 2023-01-27 14:08:08 (Adam-Desktop): ---[0] add_task
2 2023-01-27 14:08:08 (Adam-Desktop): ------[1] add_empty_task
3 2023-01-27 14:08:08 (Adam-Desktop): ------[1] add_elements
4 2023-01-27 14:08:08 (Adam-Desktop): ---------[3] add_element_set
5 2023-01-27 14:08:08 (Adam-Desktop): ---[0] add_task
6 2023-01-27 14:08:08 (Adam-Desktop): ------[5] add_empty_task
7 2023-01-27 14:08:08 (Adam-Desktop): ------[5] add_elements
8 2023-01-27 14:08:09 (Adam-Desktop): ---------[7] add_element_set



#### Checking dependencies

We can find the dependencies/dependents between tasks like this:

In [16]:
wk.tasks.t2.task_dependencies

[WorkflowTask(name='t1')]

In [17]:
wk.tasks.t1.dependent_tasks

[WorkflowTask(name='t2')]

These dependencies derive from the `p2` parameter, which is output by task `t1` and "consumed" by task `t2`. We can also check dependency relationship at the element level:

In [18]:
wk.tasks.t2.elements[0].element_dependencies # returns a global element index

[0]

In [19]:
wk.tasks.t1.elements[0].dependent_elements # returns a global element index

[1]

#### Adding more elements to an existing workflow

We can add more elements to a given task, using the `WorkflowTask.add_elements` method:

In [20]:
wk.tasks.t2.add_elements(inputs=[InputValue(p3, 302)])

Now, two elements exist in the second task:

In [21]:
wk.tasks.t2.elements

[Element(task=WorkflowTask(name='t2'), global_index=1),
 Element(task=WorkflowTask(name='t2'), global_index=2)]

Both elements in task `t2` depend on the single element in task `t1`:

In [22]:
wk.tasks.t1.elements[0].dependent_elements # return global element indices

[1, 2]

#### Accessing the original `Task` and `WorkflowTemplate` objects

When a `Task` object is added to a workflow, it is converted into a `WorkflowTask` object. However, we can still access (a copy of) the original `Task` object, using the `template` attribute:

In [23]:
wk.tasks.t1.template

Task(name='t1')

We can also access (a copy of) the original workflow template in a similar way:

In [24]:
wk.template

WorkflowTemplate(name='w1', tasks=[Task(name='t1'), Task(name='t2')], workflow=<hpcflow.sdk.app.Workflow object at 0x00000221DA6D4220>)

#### Element sets

In [None]:
Task(inputs=[])

In [29]:
wk.tasks.t1.template.element_sets[0].inputs

[InputValue(parameter='p1', value=101, value_group_idx=1)]

You might notice that the `Task` object accessed from a `Workflow` has no `inputs` attribute, even though we originally passed an `inputs` argument to its constructor, e.g. like this: `t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])`. This is because arguments passed to the `Task` object are converted to an *element set*. Element sets, defined by the `ElementSet` class, represent the arguments that we can pass when constructing a task, or when adding new elements to an existing task. An equivalent way of constructing the above task would be like this:

In [30]:
# t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])
t1_b = Task(schemas=[s1], element_sets=[ElementSet(inputs=[InputValue(p1, 101)])])

This is more verbose, and so will not usually be used, but it shows how the arguments to the task are stored internally. Element set are also what are used when adding elements to an existing task. Previously, we wrote: `wk.tasks.t2.add_elements(inputs=[InputValue(p3, 302)])`. This could be written more verbosely with an element set like this: `wk.tasks.t2.add_elements(element_sets=[ElementSet(inputs=[InputValue(p3, 302)])])`.

The element set concept exists to support adding elements after a task is initially created, and helps with knowing from where elements originate.

We can access element sets via the `Task` object (via the `WorkflowTask.template` attribute):

In [31]:
wk.tasks.t2.template.element_sets

[<hpcflow.sdk.app.ElementSet at 0x221da703fa0>,
 <hpcflow.sdk.app.ElementSet at 0x221da703e50>]

Element sets contain the original parametrisation:

In [None]:
wk.tasks.t2.template.element_sets[0].inputs

### Example 2: using default values and overriding input sources

In [32]:
# Define some parameters
p1 = Parameter('p1')
p2 = Parameter('p2')
p3 = Parameter('p3')
p4 = Parameter('p4')

The `inputs` argument of the `TaskSchema` constructor in fact takes a list `SchemaInput` objects, but if we pass a `Parameter` object instead, it will transformed into a `SchemaInput`. Likewise, the `outputs` argument more correctly takes a list of `SchemaOutput` objects.

The distinction allows us to provide default values for a give input type within a particular schema:

In [33]:
p1_input = SchemaInput(p1, default_value=10)
p2_input = SchemaInput(p2, default_value=30)

# Define some task schemas
s1 = TaskSchema(objective='t1', actions=[], inputs=[p1_input], outputs=[p2])
s2 = TaskSchema(objective='t2', actions=[], inputs=[p2_input, p3], outputs=[p4])

Now we can define a `Task` object without passing an input value for the "p1" parameter, because the default value can be used instead:

In [34]:
# Define some tasks:
t1 = Task(schemas=[s1])
t2 = Task(schemas=[s2], inputs=[InputValue(p3, 301)])

In [35]:
# Generate the workflow:
wkt = WorkflowTemplate(name='w1', tasks=[t1, t2])
wk = Workflow.from_template(wkt, name=wkt.name, overwrite=True) 

We can check that the default value was used by inspecting the relevent `ElementSet` object. In particular, the `input_sources` attribute is a map between each input parameter type and a list of `InputSource` objects:

In [36]:
wk.tasks.t1.template.element_sets[0].input_sources

{'p1': [InputSource.default()]}

There is some default behaviour regarding the input sources. If an parameter is defined locally (meaning within the task constructor), it's local value will be used. It the parameter is not provided locally, but is output from an upstream task, that value will be used. If a parameter is not defined locally nor is output from an upstream task, the default value will be used, if defined. Otherwise a `MissingInputs` exception will be raised.

In [37]:
wk.tasks.t2.template.element_sets[0].input_sources

{'p2': [InputSource.task(task_ref=0, task_source_type=output)],
 'p3': [InputSource.local()]}

We can override the default behaviour, and choose a different input source if required. We can demonstrate this by adding another element (set) to task `t2` and specify input sources manually:

In [39]:
wk.tasks.t2.add_elements(
    inputs=[
        InputValue(p2_input, 201),
        InputValue(p3, 301),
    ],
    input_sources={
        'p2': [InputSource.default()],
    }
)

In the above case, the locally specified value of `p2` is not used, because we specified that the default value should be used. The default behaviour will still be used for any parameter types not included in the `input_sources` dict.

In [40]:
wk.tasks.t2.template.element_sets[1].input_sources

{'p2': [InputSource.default()], 'p3': [InputSource.local()]}

### Example 3: multiple elements per task: sequences

In [41]:
# Define some parameters
p1 = Parameter('p1')
p2 = Parameter('p2')
p3 = Parameter('p3')
p4 = Parameter('p4')

# Define some task schemas
s1 = TaskSchema(objective='t1', actions=[], inputs=[p1], outputs=[p2])
s2 = TaskSchema(objective='t2', actions=[], inputs=[p2, p3], outputs=[p4])

Multiple elements can be generated for a task by using the `sequences` argument:

In [42]:
# TODO: this won't work yet, because hpcflow is looking in the app data for parameters
#       when loading the ValueSequence into the workflow! It would work if there was 
#       a parameter "p3" defined in the "parameter.yml" file and that file was specified
#       in the config file...


# Define some tasks:
t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])
t2 = Task(
    schemas=[s2],
    sequences=[
        ValueSequence('inputs.p3', values=[301, 302, 303], nesting_order=0),        
    ]
)

# Note the "parameter path" (e.g. "inputs.p3") must be given to a `ValueSequence` object;
# this is because sequences can also be generated for resources (e.g. "resources.num_cores")

In [None]:
# Generate the workflow:
wkt = WorkflowTemplate(name='w1', tasks=[t1, t2])
wk = Workflow.from_template(wkt, name=wkt.name, overwrite=True)

### Example 4: Propagating new elements downstream

When adding elements to a task, we can optionally propagate new elements to downstream tasks. Consider the workflow below, with three tasks, where each task depends on the previous:

In [None]:
s1 = TaskSchema(objective='t1', actions=[], inputs=[p1], outputs=[p2])
s2 = TaskSchema(objective='t2', actions=[], inputs=[p2, p3], outputs=[p4])
s3 = TaskSchema(objective='t3', actions=[], inputs=[p4], outputs=[])

t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])
t2 = Task(schemas=[s2], inputs=[InputValue(p3, 301)])
t3 = Task(schemas=[s3], inputs=[])

wkt = WorkflowTemplate(name='w1', tasks=[t1, t2, t3])
wk = Workflow.from_template(wkt, name=wkt.name, overwrite=True)

There is a single element in each task:

In [None]:
print([task.num_elements for task in wk.tasks])

Now if we add a new element to the first task, we can specify with the `propagate_to` argument that we wish to generate new elements in downstream tasks as well. The elements generated in the downstream tasks will be restricted to depending only on the new elements generated in their respective upstream tasks.

In [None]:
wk.tasks.t1.add_elements(
    inputs=[InputValue(p1, 102)],
    propagate_to=[]
)

Now we have an additional element in each task:

In [None]:
print([task.num_elements for task in wk.tasks])

Note that it is sufficient to specify `propagate_to` as an empty list to signify that you wish to propagate new elements downstream. There are cases where you will need to specify a nesting order for the newly generated downstream elements, in which case, `propagate_to` can be specified as a list of `ElementPropagation` objects, like this:

In [None]:
wk.tasks.t1.add_elements(
    inputs=[InputValue(p1, 103)],
    propagate_to=[
        ElementPropagation(task=wk.tasks.t2),
        ElementPropagation(task=wk.tasks.t3)
    ]
)

In [None]:
print([task.num_elements for task in wk.tasks])

The `ElementPropagation` object accepts a `nesting_order` argument, which in this case is not needed, because there are no `sequences` in the workflow. If an `ElementPropagation` object is missing for a given downstream task, a default object will be generated, with no nesting order specified. Note that there is not currently a way to propagate to only some downstream tasks. We should refine this behaviour later.

Let's look at the input sources used for all tasks.

In [None]:
for task in wk.tasks:
    print(f"{task.name=}")
    for elem_set in task.template.element_sets:
        print(f"{elem_set.index=}")
        for k, v in elem_set.input_sources.items():
            print(f"\t{k}: {v}")
    print()

We can see that a second (and third) element set has been generated for each task due to the `add_elements` calls on the first task. The input sources for each element set in a given task look the same, except an additional argument `elements` in the second (and third) element sets for task input sources. For example, for the second element set of the second task, the input sources for `p2` look like this:

```
[InputSource.task(task_ref=0, task_source_type=output, elements=[3])]
```

The `elements` argument here serves as a *contraint* on the elements from which that input can be sourced. In particular, element index 3, in this case, is the newly added element from the first task.

### Example 5: inserting tasks in the middle of the workflow

In [None]:
s1 = TaskSchema(objective='t1', actions=[], inputs=[p1], outputs=[p2])
s2 = TaskSchema(objective='t2', actions=[], inputs=[p2, p3], outputs=[p4])

t1 = Task(schemas=[s1], inputs=[InputValue(p1, 101)])
t2 = Task(schemas=[s2], inputs=[InputValue(p3, 301)])

wkt = WorkflowTemplate(name='w1', tasks=[t1, t2])
wk = Workflow.from_template(wkt, name=wkt.name, overwrite=True)

In [None]:
s3 = TaskSchema(objective='t3', actions=[], inputs=[p2], outputs=[p2, p3])
t3 = Task(schemas=[s3])
wk.add_task(t3, new_index=1) # this will add the task in between the existing tasks

Note that when adding a new task in the middle of the workflow, no new elements will be propagated to downstream tasks. However, in this case, we might want to add a new element to `t2` that uses outputs from the newly inserted task `t3`, which we can do like this:

In [None]:
wk.tasks.t2.add_elements()

We can inspect the element sets and input sources to see what this has done (note that input source `task_ref` refers to task `insert_ID`s, since this does not change as we add/remove tasks):

In [None]:
for task in wk.tasks:
    print(f"{task.name=} {task.insert_ID=} {task.num_elements=}")
    for elem_set in task.template.element_sets:
        print(f"{elem_set.index=}")
        for k, v in elem_set.input_sources.items():
            print(f"\t{k}: {v}")
    print()

So the second element set that we added to `t2` gets its `p2` value from the newly added middle task `t3`, and its `p3` value from the first task `t1`.