# Imports

In [1]:
from pyiron_contrib.tinybase.task import AbstractTask, FunctionTask, SeriesTask, LoopTask





In [2]:
from pyiron_contrib.tinybase.executor import FuturesSubmitter, Submitter

In [3]:
from concurrent.futures import ThreadPoolExecutor

In [4]:
from concurrent.futures import ProcessPoolExecutor

In [70]:
process = FuturesSubmitter(ProcessPoolExecutor(max_workers=4))
thread = FuturesSubmitter(ThreadPoolExecutor(max_workers=4))

In [5]:
import logging
logging.getLogger().setLevel(20)

In [6]:
import numpy as np

# Function Task

## Basic

In [7]:
def calc_fib(n):
    import time
    n1 = n2 = 1
    for i in range(n):
        time.sleep(.1)
        x = n1 + n2
        n1 = n2
        n2 = x
    return x

In [8]:
f = FunctionTask(calc_fib)

In [9]:
f.input.args

[]

In [10]:
f.input.kwargs

{}

In [11]:
f.input.kwargs['n'] = 10

In [12]:
f.input.kwargs

{'n': 10}

In [13]:
f.execute()

(ReturnStatus(Code.DONE, None), FunctionOutput(result=144))

## We can use an executor to distribute the task to any compute resource

### Directly in the foreground

In [14]:
exe = Submitter().submit([f])

In [15]:
exe.run()

In [16]:
exe.status[0]

ReturnStatus(Code.DONE, None)

In [17]:
exe.output[0].result

144

### Do the same but in the background

In [18]:
f = FunctionTask(calc_fib)

In [19]:
f.input.kwargs['n'] = 100

In [72]:
exe = thread.submit([f])

In [73]:
exe.run()

In [74]:
exe._run_machine.state

<Code.RUNNING: 'running'>

In [23]:
exe.wait()

In [24]:
exe.output[0].result

927372692193078999176

### Do the same but in the background as process

In [25]:
f = FunctionTask(calc_fib)

In [26]:
f.input.kwargs['n'] = 100

In [27]:
exe = process.submit([f])

In [28]:
exe.run()

In [29]:
exe._run_machine.state

<Code.RUNNING: 'running'>

In [30]:
exe.wait()

In [31]:
exe.output[0].result

927372692193078999176

# Executors handle single Tasks and lists of them on the same footing

In [32]:
tasks = [FunctionTask(calc_fib) for _ in range(10)]

In [33]:
for i, n in enumerate(tasks):
    n.input.kwargs['n'] = 3 + i

## With the basic executor

In [34]:
exe = Submitter().submit(tasks)
exe.run()

In [35]:
exe.output

(FunctionOutput(result=5),
 FunctionOutput(result=8),
 FunctionOutput(result=13),
 FunctionOutput(result=21),
 FunctionOutput(result=34),
 FunctionOutput(result=55),
 FunctionOutput(result=89),
 FunctionOutput(result=144),
 FunctionOutput(result=233),
 FunctionOutput(result=377))

In [36]:
exe.output[1].result

8

## With the process executor

In [75]:
exe = process.submit(tasks)

In [76]:
exe.run()

In [77]:
exe.wait()

In [78]:
exe.status

[ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None),
 ReturnStatus(Code.DONE, None)]

In [41]:
exe.output

[FunctionOutput(result=5),
 FunctionOutput(result=8),
 FunctionOutput(result=13),
 FunctionOutput(result=21),
 FunctionOutput(result=34),
 FunctionOutput(result=55),
 FunctionOutput(result=89),
 FunctionOutput(result=144),
 FunctionOutput(result=233),
 FunctionOutput(result=377)]

In [42]:
exe.output[5].result

55

# SeriesTask

In [43]:
s = SeriesTask()

In [44]:
f1 = FunctionTask(calc_fib)

In [45]:
f2 = FunctionTask(np.sqrt)

In [46]:
def transfer(input, output):
    input.args = [output.result]

In [47]:
s.input.first(f1).then(f2, transfer)

SeriesInput(tasks=[<pyiron_contrib.tinybase.task.FunctionTask object at 0x7f641e7a9450>, <pyiron_contrib.tinybase.task.FunctionTask object at 0x7f641e7a6a10>], connections=[<function transfer at 0x7f641e7b0360>])

In [48]:
s.input.tasks[0].input.kwargs['n'] = 10

In [49]:
status, output = s.execute()

In [50]:
status

ReturnStatus(Code.DONE, None)

In [51]:
output.result

12.0

# Loop Task

## Simple repeat loop

In [52]:
l = LoopTask()

In [53]:
l.input.task = FunctionTask(lambda: np.random.rand())

In [54]:
l.input.repeat(10, restart=lambda output, input, scratch: print(output.result))

In [55]:
l.execute()

0.25206094698683046
0.7580366794891185
0.3684721838966667
0.6114443141823555
0.701768145318245
0.1082013931806125
0.1991682408970753
0.2954001242249057
0.602000592148809
0.13324613153490172


(ReturnStatus(Code.DONE, None), FunctionOutput(result=0.7176369370748094))

In [59]:
exe = thread.submit([l])
exe.run()
exe.wait()

0.8015059839851431
0.5016843345204535
0.1366176125775127
0.8682887572803999
0.35995686077477174
0.20630244622625882
0.024072735251068123
0.1047725596732394
0.8964585655685834
0.6529667772087503


In [60]:
exe.output[0].result

0.20599900230508372

## Loop with a termination condition

In [61]:
l = LoopTask()

In [62]:
l.input.task = FunctionTask(lambda: np.random.rand())

In [63]:
l.input.control_with(
    condition=lambda task, output, scratch: output.result < .15,
    restart=lambda output, input, scratch: print(output.result)
)

In [67]:
l.execute()

0.2309565676155162
0.7973850614628536
0.1566921745270966
0.4129120332177768
0.8722993665925096
0.7014044109954554
0.5356448112961381
0.3716950282432553


(ReturnStatus(Code.DONE, None), FunctionOutput(result=0.10941147936096285))

# Implementation Examples

For a much too simplified example, let's write a task that simply waits `n` times `time` seconds, where each of the `time` waits is a separate, independent task itself.  In tinybase speak such a construct is a `TaskGenerator`, because it internally generates a sequence of atomic tasks that can be scheduled by an executor in whatever order.  From a user's perspective however, a task generator behaves exactly like a task (and it implements the same internal interface).

To write such a class, we need to

1. define an input class;
2. define an output class
3. and combine them on the actual generator.

For the waiting, we'll reuse the already introduces `FunctionTask` to wrap `time.sleep` from the standard library.

In [68]:
from pyiron_contrib.tinybase.task import TaskGenerator, FunctionTask, ReturnStatus
from pyiron_contrib.tinybase.container import AbstractInput, AbstractOutput, StorageAttribute
import time

class WaitInput(AbstractInput):
    # this defines the input parameters
    time: float = 10.0
    n: int = 10

class WaitOutput(AbstractOutput):
    # we have no meaningful output, so we'll leave it empty.
    pass

class WaitGenerator(TaskGenerator):
    # here our task generator class, needs to advertise which input and output classes it is going to use
    def _get_input(self):
        return WaitInput()
    def __iter__(self):
        # the main computation in a generator is defined in its __iter__ method.
        # executors will iterate over the the results yielded here and inject back the results
        # in each iteration the generator can dynamically return new tasks depending on the
        # results that came back from an executor.

        # in our case we just have `n` independent waiting tasks, so we create them in a loop
        # and yield them in one iteration; then discard their (anyway empty) outut and return
        # our own return status
        tasks = []
        for _ in range(self.input.n):
            t = FunctionTask(time.sleep)
            tasks.append(t)
            t.input.args = [self.input.time]
        ret, out = zip(*(yield tasks))
        return ReturnStatus.done(), WaitOutput()

Passing the `capture_exceptions` as `False` means tinybase will not catch any exceptions
and give us the direct stack trace where any exceptions occured.  This is useful
for debugging a new implemention in a notebook like here.  By default tinybase captures
exceptions and sets the return status to aborted automatically.

In [69]:
%%time
wait = WaitGenerator(capture_exceptions=False)
wait.input.time = 2.0
wait.input.n = 10
wait.execute()

CPU times: user 2.22 ms, sys: 1.08 ms, total: 3.3 ms
Wall time: 20 s


(ReturnStatus(Code.DONE, None), WaitOutput())

Calling `execute` on a task generator will simply execute one task after the other.
We therefore expect the run time to be 2 * 10 s.

If we run with the process executor, but only give one core, we expect the run time
to stay the same.

In [82]:
%%time
exe = FuturesSubmitter(ProcessPoolExecutor(max_workers=1)).submit([wait])
exe.run()
exe.wait()

CPU times: user 11.2 ms, sys: 19.4 ms, total: 30.6 ms
Wall time: 20.1 s


If we allow multiple cores to wait in parallel the run time naturally goes down accordingly
modulo overhead from the process pool.

In [83]:
%%time
exe = process.submit([wait])
exe.run()
exe.wait()

CPU times: user 4.68 ms, sys: 6.71 ms, total: 11.4 ms
Wall time: 6.01 s


Since we are just waiting here, even running in separate threads gives the same speed up, 
regardless of the GIL.

In [81]:
%%time
exe = thread.submit([wait])
exe.run()
exe.wait()

CPU times: user 7.92 ms, sys: 1.47 ms, total: 9.39 ms
Wall time: 6.01 s
