In [1]:
from functools import wraps

In [2]:
# Decorator to prime coroutine
def coroutine(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr) # No need to prime the coroutine anymore, the decorator does it for you
        return cr
    return wrapper

In [3]:
def example_grep(pattern):
    print('Looking for {}'.format(pattern))
    try:
        while True:
            line = yield
            if pattern in line:
                print(line)
    except GeneratorExit:
        print('Closing. No more grep for you !')

In [4]:
g = example_grep('python')
next(g) # Or just use the decorator above for auto priming

Looking for python


In [5]:
g.send('Hello World')
g.send('I am a pythonista.')
g.send('c++ or python')
g.send('never mind')

I am a pythonista.
c++ or python


In [6]:
g.close() # Stop the generator. Will throw an exception that you need to catch

Closing. No more grep for you !


In [7]:
@coroutine
def printer():
    while True:
        line = yield
        print(line)

In [8]:
@coroutine
def broadcast(targets):
    while True:
        line = yield
        for target in targets:
            target.send(line)

In [9]:
@coroutine
def grep(pattern, publisher):
    while True:
        line = yield
        if pattern in line:
            publisher.send(line+' ({})'.format(pattern))

In [10]:
p = printer()
x = grep('python', p)
y = grep('ml', p)
b = broadcast([x, y])

In [11]:
b.send('Hello')
b.send('All hail the BDFL')
b.send('python is cool')
b.send('ml is in demand')
b.send('C++ or Fortran')
b.send('ml uses python') # This will be repeated

python is cool (python)
ml is in demand (ml)
ml uses python (python)
ml uses python (ml)


In [12]:
import xml.sax

In [13]:
class EventHandler(xml.sax.ContentHandler):
    def __init__(self, target):
        self.target = target
    def startElement(self,name,attrs):
        self.target.send(('start',(name,attrs._attrs)))
    def characters(self,text):
        self.target.send(('text',text))
    def endElement(self,name):
        self.target.send(('end',name))

# Output is too long - don't run 
# xml.sax.parse('coroutines/allroutes.xml', EventHandler(printer())) 

## OS

In [14]:
from collections import defaultdict

In [15]:
class Task:
    taskid = 0
    def __init__(self, target):
        Task.taskid += 1
        self.tid = Task.taskid # Task id
        self.target = target   # Target coroutine
        self.sendval = None    # Value to send to coroutine
        
    def run(self):
        return self.target.send(self.sendval)

In [16]:
from queue import Queue

In [17]:
class SystemCall:
    def handle(self):
        pass
    
class GeTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

In [18]:
# Used by a task to create a sub-task
class NewTask(SystemCall):
    def __init__(self, target):
        self.target = target
        
    def handle(self):
        child_tid = self.sched.new(self.target)
        self.task.sendval = child_tid
        self.sched.schedule(self.task)

In [19]:
# Used to kill a task
class KillTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid
        
    def handle(self):
        task = self.sched.taskmap.get(self.tid)
        if task:
            task.target.close()
            status = True
        else:
            status = False
        self.task.sendval = status
        self.sched.schedule(self.task)

In [20]:
class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid
    
    def handle(self):
        waiting = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = waiting
        if not waiting:
            self.sched.schedule(self.task)
        

In [21]:
class Scheduler:
    def __init__(self):
        self.ready = Queue() # Queue of tasks ready to run
        self.taskmap = {}
        self.exit_waiting = defaultdict(list)
        
    def new(self, target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask # Keeps track of all active tasks using their taskid
        self.schedule(newtask) 
        return newtask.tid
        
    # Puts something to the ready queue
    def schedule(self, task):
        self.ready.put(task)
        
    def exit(self, task):
        del self.taskmap[task.tid] # The task has finished running. Remove it from taskmap
        print('Task {} terminated.'.format(task.tid))
        for task in self.exit_waiting[task.tid]:
            self.schedule(task)          
        
    def waitforexit(self, waiting_task, waittid):
        if waittid in self.taskmap:
            # Add waiting_task to the list of tasks waiting for waittid
            self.exit_waiting[waittid].append(waiting_task) 
            return True
        else:
            return False       
        
    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()  # Only runs to the next yield      
            except StopIteration:
                self.exit(task)        
            else:
                if isinstance(result, SystemCall): # If task returns a SystemCall, do something with it
                    result.task = task
                    result.sched = self
                    result.handle()
                else:
                    self.schedule(task)

### Examples

#### A few tasks

In [22]:
def foo():
    print('foo - Part 1')
    yield
    print('foo -Part 2')
    yield
    

In [23]:
def bar():
    print("I'm bar")
    yield
    print("I am friends with foo.")
    yield
    print("What's the deal with C++ anyway?!")
    yield
    print('Tri la la!')

In [24]:
def foo_with_return():
    mytid = yield GeTid()
    for i in range(7):
        print('I am foo, running with tid {}'.format(mytid))
        yield
        
def bar_with_return():
    mytid = yield GeTid()
    for i in range(2):
        print('I am bar, running with tid {}'.format(mytid))
        yield

In [25]:
def parent():
    child = yield NewTask(foo_with_return())
    for i in range(3):
        print('parent : iteration {}'.format(i))
        yield
    yield KillTask(child)
    print('parent complete')

In [26]:
def waiting_parent():
    child = yield NewTask(foo())
    print('Waiting for child')
    yield WaitTask(child)
    print('Child done')

#### Task Example

In [27]:
t1 = Task(foo())
print('Starting foo()')
t1.run()
print('Resuming foo()')
t1.run()
try:
    t1.run()
except StopIteration:
    print('Task finished')

Starting foo()
foo - Part 1
Resuming foo()
foo -Part 2
Task finished


#### Scheduler example

In [28]:
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

foo - Part 1
I'm bar
foo -Part 2
I am friends with foo.
Task 2 terminated.
What's the deal with C++ anyway?!
Tri la la!
Task 3 terminated.


#### System call example

In [29]:
sched = Scheduler()
sched.new(foo_with_return())
sched.new(bar_with_return())
sched.mainloop()

I am foo, running with tid 4
I am bar, running with tid 5
I am foo, running with tid 4
I am bar, running with tid 5
I am foo, running with tid 4
Task 5 terminated.
I am foo, running with tid 4
I am foo, running with tid 4
I am foo, running with tid 4
I am foo, running with tid 4
Task 4 terminated.


In [30]:
sched = Scheduler()
sched.new(parent())
sched.mainloop()

parent : iteration 0
I am foo, running with tid 7
parent : iteration 1
I am foo, running with tid 7
parent : iteration 2
I am foo, running with tid 7
Task 7 terminated.
parent complete
Task 6 terminated.


In [31]:
sched = Scheduler()
sched.new(waiting_parent())
sched.mainloop()

foo - Part 1
Waiting for child
foo -Part 2
Task 9 terminated.
Child done
Task 8 terminated.
