In [1]:
import asyncio

In [2]:
from sys import version_info as pyversion
print(pyversion)
if not (pyversion.major == 3 and pyversion.minor >= 5):
    print("python 3.5 is required")
    exit(1)

sys.version_info(major=3, minor=5, micro=1, releaselevel='final', serial=0)


# async control flow engine - proof of concept

### Playing with the following idea

We would like to describe a set of asynchroneous jobs, together with their
dependencies - a la makefile.

It does not look that this feature is supported by the native
asyncio tools.

So as an input we take something that describes the dependencies
Each job has a set of *requirement* jobs, that must have completed before this
task can take off

### Example:

```
A1 \ 
A2 - A4 - A5 \
A3 /    \ A6 - A7
```

The job in this case would be something like

1. find out that A1 A2 A3 need to be started (find roots in the graph)
1. and use this to initialize the list of running tasks
1. start all running tasks until at least one completes
1. at that point, compute if we can start some new task, and if so add it to the list of running tasks.
2. in any case, proceed to step 3, until there is nothing left to run

At first sight the basic tool in the asyncio toolset is `asyncio.wait(*, return_when = FIRST_COMPLETED)`




### a generic coroutine for simple tests

In [3]:
from tests import sl, slm

Used variants for now
* `raw()`: simplest possible (middle=False)
* `rawm()`: sets `middle=True`
* `rawme`: not needed yet
* `rawe`: makes no sense 

### Warning

In this code we use the global `asyncio.get_event_loop()` object.

When tests fail it can affect the contents of that global (i.e. some stuff remains in the loop as being still pending) and weird things happen; restart kernel is one way to deal with this in this POC environment

###  run futures until all done, using `gather`

In [4]:
def run_all(loop, *coros):
    future = asyncio.gather(*coros)
    finals = loop.run_until_complete(future)
    print("final results has {} elts -> {}".format(len(finals), finals))    

In [5]:
# expect 6 lines
run_all(asyncio.get_event_loop(), slm(1.2), slm(0.6))

14-48-958 -> sl(1.2)
14-48-958 -> sl(0.6)
14-49-258 == sl(0.6)
14-49-558 == sl(1.2)
14-49-559 <- sl(0.6)
14-50-161 <- sl(1.2)
final results has 2 elts -> [1.2, 0.6]


### out of curiosity, using another loop instance

In [6]:
#my_loop = asyncio.new_event_loop()
#try:
#    run_all(my_loop, rawm(1.2), rawm(0.6))
#except Exception as e:
#    print("not working", e)

### similar using `asyncio.wait`

In [7]:
def run_all_wait(loop, *coros):
    futures = [ asyncio.ensure_future(c) for c in coros ]
    done, pending = loop.run_until_complete(asyncio.wait(futures))
    for d in done: print("done:", d)
    for p in pending: print("pending:", p)

In [8]:
# expect 6 lines 
run_all_wait(asyncio.get_event_loop(), slm(0.4), slm(0.7))

14-50-178 -> sl(0.4)
14-50-178 -> sl(0.7)
14-50-380 == sl(0.4)
14-50-532 == sl(0.7)
14-50-584 <- sl(0.4)
14-50-883 <- sl(0.7)
done: <Task finished coro=<_sl() done, defined at /Users/parmentelat/git/apssh/engine/tests.py:19> result=0.4>
done: <Task finished coro=<_sl() done, defined at /Users/parmentelat/git/apssh/engine/tests.py:19> result=0.7>


### iteratively, manually adding stuff one at at a time

In [9]:
def future_repr(future):
    return '<%s result=%s state=%s>' % (future.__class__.__name__, future._result, future._state)

def show_done_pending(done, pending, msg):
    for d in done: print("D", msg, type(d), future_repr(d))
    for p in pending: print("P", msg, type(p), future_repr(p))

def run_two_then_last(loop, c1, c2, c3):
    futures = [ asyncio.ensure_future(c) for c in (c1, c2) ]
    print("starting t1 & t2 until one finishes")
    done, pending = loop.run_until_complete(asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED))
    show_done_pending(done, pending, "1:")
    print("adding t3 and finishing one more")
    f3 = asyncio.ensure_future(c3)
    pending.add(f3)
    done, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED))
    show_done_pending(done, pending, "2:")
    print("finishing last")
    done, pending = loop.run_until_complete(asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED))
    show_done_pending(done, pending, "3:")
    

In [10]:
# expect 3 lines, then 2, then 1
run_two_then_last(asyncio.get_event_loop(), sl(1), sl(0.5), sl(0.7))

starting t1 & t2 until one finishes
14-50-907 -> sl(1)
14-50-907 -> sl(0.5)
14-51-412 <- sl(0.5)
D 1: <class 'asyncio.tasks.Task'> <Task result=0.5 state=FINISHED>
P 1: <class 'asyncio.tasks.Task'> <Task result=None state=PENDING>
adding t3 and finishing one more
14-51-412 -> sl(0.7)
14-51-909 <- sl(1)
D 2: <class 'asyncio.tasks.Task'> <Task result=1 state=FINISHED>
P 2: <class 'asyncio.tasks.Task'> <Task result=None state=PENDING>
finishing last
14-52-116 <- sl(0.7)
D 3: <class 'asyncio.tasks.Task'> <Task result=0.7 state=FINISHED>


# Obsolete section

That's not the way it ended up at all. We do not subclass `Task` because creating a `Task` element means adding it for scheduling, and our needs are to describe dependencies **before** anything is started, of course.

### Subclassing Future

~~So far so good, but it'd probably be a good idea to be able to use our own subclass of `Task` so that we can easily model the interactions / dependencies between jobs. Using a task factory seems to be the angle here.~~

In [11]:
class Job(asyncio.Task):
    def __init__(self, coro, *args, **kwds):
        print("Job __init__, coro=", coro)
        asyncio.Task.__init__(self, coro, *args, **kwds)

In [12]:
# use a regular loop object and define its internal 
# factory so that it uses Job objects instead of Task
def use_jobs(loop):
    def factory(loop, coro):
        return Job(coro, loop=loop)
    loop.set_task_factory(factory)
    
### let's try to avoid this     
#use_jobs(global_loop)
#run_two_then_last(global_loop, raw(1), raw(0.5), raw(0.7))

### Using our own loop instance

~~Fine; here again though, using `asyncio.new_event_loop()` seems to break it all.
This happens whether I use `use_jobs` or not, so it's much deeper than subclassing...~~

~~We'll write our code with a loop in argument but will for now primarily mess with the global instance for now. - this is not crucial, just a little odd to have to mess with a global object.~~

In [13]:
#use_jobs(my_loop)
#run_two_then_last(my_loop, raw(1), raw(0.5), raw(0.7))