# An Asyncio of My Own

Obviously, that's pure hubris. This won't be that fleshed out, but I plan on using simple generators and queues to write an event loop.

I should model my idea of a coroutine first.  I'll want my generators-acting-like-coroutines-or-tasks to return something like a future. A future, as I understand it, is an object that holds data to do with a coroutine or task, where you can send a message to a task, get its results, and check on its status.

My idea is to yeild a dictionary with a few fields. I could write a class, but I think I can deal with a dict for the scope of this project. Here's some pseduocode with an | around each option.

`my_future = {"status": "PENDING|IN-PROGRESS|FINISHED|FAILED",
    "result": None|Object,
    "kill": True|False,
    "paused": True|False
    }`

Kill marks it for the event loop to delete references to it. Paused will tell the event loop to ignore it. Not sure if that's enough. But let's try it!

I will also write a few silly coroutines that are light on actual work, heavy on "doing stuff SLOW."

In [59]:
import time
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

PENDING, IN_PROGRESS, FINISHED, FAILED = range(4)

def wait(seconds):
    """Will complete after seconds, and return time overrun. Otherwise, returns an IN-PROGRESS future dict"""
    start = time.time()
    logging.debug("Start is %f", start)
    yield {"status": IN_PROGRESS, "result": None, "kill": False, "paused": False}
    now = time.time()
    while now - start < seconds:
        logging.debug("Time elapsed %f", now-start)
        yield {"status": IN_PROGRESS, "result": None, "kill": False, "paused": False}
        now = time.time()
    logging.info("Finished waiting for %f, plus %f seconds", seconds, now-start-seconds)
    yield {"status": FINISHED, "result": now-start-seconds, "kill": False, "paused": True}

So now we have a function that takes an arbitrary amount of time, and yields control while its waiting. I can imagine how network apis/IO calls that wait for the relevant API (perhaps the kernel) to finish would be implemented.

Let's call it by hand and see if we can generalize this pattern with an event loop.

In [60]:
waiter = wait(5)
fut = next(waiter)
print(fut)
time.sleep(2)
fut = next(waiter)
print(fut)
time.sleep(4)
fut = next(waiter)
print(fut)

{'status': 1, 'result': None, 'kill': False, 'paused': False}
{'status': 1, 'result': None, 'kill': False, 'paused': False}


INFO:root:Finished waiting for 5.000000, plus 1.007902 seconds


{'status': 2, 'result': 1.007901668548584, 'kill': False, 'paused': True}


My event loop will be a list that just iterates and runs generators until one of them has something to do. I'm not going to worry about efficient waiting or anything (which might rely on generators to share a bit more about their task). Just busy-wait checking things over and over until it finds something.

Each entry on the list will be a future with a reference to the coroutine added. Like Asyncio, I'll call the futures wrapping a coroutine/generator "tasks."

In [75]:
class Looper(object):
    def __init__(self):
        self.loop = []
        
    def add_task(self, gen):
        task = {"status": PENDING, "result": None, "kill": False, "paused": False, "coro": gen}
        self.loop.append(task)
        
    def update_task(self, task_index):
        task = self.loop[task_index]
        gen = task["coro"]
        if not task["paused"]:
            try:
                t = next(gen)
                t["coro"] = gen
                self.loop[task_index] = t
            except StopIteration:
                loop[task_index]={"status": FAILED, "result": None, "kill": False, "paused": True, "coro": gen}
        
        
    def _get_actionable_tasks(self):
        return [task for task in self.loop if (task["status"] == PENDING or task["status"] == IN_PROGRESS) \
            and not task["paused"]]
    
    
    def _cull_list(self):
        kill_list = [task for task in self.loop if task["kill"]]
        for t in kill_list:
            del t["coro"]
            del t
        self.list = [task for task in self.loop if not task["kill"]]
        
    def run_until_complete(self):
        logging.debug(self.loop)
        todo_list = self._get_actionable_tasks()
        while todo_list:            
            logging.debug("loop started")
            for task_index, task in enumerate(self.loop):
                if (task["status"] == PENDING or task["status"] == IN_PROGRESS) and not task["paused"]:
                    self.update_task(task_index)
                    if self.loop[task_index]["status"] == FINISHED:
                        logging.debug(self.loop[task_index])
            self._cull_list()
            todo_list = self._get_actionable_tasks()
        # implicitly todo_list is null or empty here
        logging.info("Loop finished")
        return
                
        

Okay, now let's load it up and see if it can handle some waiting.

In [79]:
logger.setLevel(logging.INFO)
looper = Looper()
looper.add_task(wait(5))
looper.add_task(wait(10))
looper.add_task(wait(3))
looper.run_until_complete()

INFO:root:Finished waiting for 3.000000, plus 0.000160 seconds
INFO:root:Finished waiting for 5.000000, plus 0.003700 seconds
INFO:root:Finished waiting for 10.000000, plus 0.003966 seconds
INFO:root:Loop finished


Well, that seems to work!  A few tasks, not in any specific order, multiplexed on one thread.  Concurrency through task switching achieved.

But can we add to the loop on the fly?  We shouldn't add or remove items to self.loop midloop, as that's likely to cause index errors.  So we add a queue to get added to our loop list between rounds of updates.

Sadly, my future objects (just dicts, really) don't update on their own, so I added a search function, as well.

In [101]:
class Looper(object):
    def __init__(self):
        self.loop = []
        self.to_add = []
        
    def add_task(self, gen):
        task = {"status": PENDING, "result": None, "kill": False, "paused": False, "coro": gen}
        self.to_add.append(task)
        
    def update_task(self, task_index):
        task = self.loop[task_index]
        gen = task["coro"]
        if not task["paused"]:
            try:
                t = next(gen)
                t["coro"] = gen
                self.loop[task_index] = t
            except StopIteration:
                loop[task_index]={"status": FAILED, "result": None, "kill": False, "paused": True, "coro": gen}
        
        
    def _get_actionable_tasks(self):
        return [task for task in self.loop if (task["status"] == PENDING or task["status"] == IN_PROGRESS) \
            and not task["paused"]]
    
    def get_task(self, coro):
        """Search for a task that holds coroutine"""
        """Return none if not found"""
        l = [t for t in self.loop if t["coro"] is coro]
        if not l:
            # also check pending list
            l = [t for t in self.to_add if t["coro"] is coro]
        if l:
            return l[0]
        else:
            return None
        
    def kill_task(self, coro):
        """Mark every task with coro as something to kill"""
        logging.debug("Marking to kill %s", coro)
        l = [t for t in self.loop if t["coro"] is coro]
        logging.debug("Will mark %s", l)
        for e in l:
            e["kill"] = True
    
    def _cull_list(self):
        kill_list = [task for task in self.loop if task["kill"]]
        logging.debug(kill_list)
        for t in kill_list:
            logging.debug("Killing task %s", t)
            del t["coro"]
            del t
        self.list = [task for task in self.loop if not task["kill"]]
        
    def run_until_complete(self):
        logging.debug(self.loop)
        self.loop.extend(self.to_add)
        self.to_add = []
        todo_list = self._get_actionable_tasks()
        while todo_list:
            logging.debug("loop started")
            for task_index, task in enumerate(self.loop):
                if (task["status"] == PENDING or task["status"] == IN_PROGRESS) and not task["paused"]:
                    self.update_task(task_index)
                    if self.loop[task_index]["status"] == FINISHED:
                        logging.debug(self.loop[task_index])
            # update list before next round
            self._cull_list()
            self.loop.extend(self.to_add)
            self.to_add = []
            todo_list = self._get_actionable_tasks()
        # implicitly todo_list is null or empty here, so there are no actionable tasks
        logging.info("Loop finished")
        return

Now we can just pass a reference to "looper" in a task that wants to add more tasks. It's turtles all the way down!

Let's trigger an action to call after wait.

In [106]:
def wait_wrapper(f, wait_seconds, loop):
    w_coro = wait(wait_seconds)
    loop.add_task(w_coro)
    while True:
        fut = loop.get_task(w_coro)
        if fut and fut["status"] == FINISHED:
            result = f()
            loop.kill_task(w_coro)
            yield {"status": FINISHED, "result": result, "kill": False, "paused": True}
        if not fut:
            yield {"status": FAILED, "result": None, "kill": False, "paused": True}
        else:
            yield {"status": IN_PROGRESS, "result": None, "kill": False, "paused": False}

In [109]:
logger.setLevel(logging.INFO)
hello = lambda: print("Hello, it was worth the wait")
looper = Looper()
looper.add_task(wait_wrapper(hello, 5, looper))
looper.run_until_complete()

INFO:root:Finished waiting for 5.000000, plus 0.006356 seconds
INFO:root:Loop finished


Hello, it was worth the wait


And there we have it!  A basic asynchronous, single threaded event loop.  It's interesting to see the basic concepts play out, although I'm sure the asyncio implementation is more robust in all kinds of ways.  For one, it thought through the futures concept better, and I could probably use the dictionaries if I was more careful about updating the same dict objects rather than just spawning new ones.