Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 109 lines (92 sloc) 3.508 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
# coding: utf-8
import sys
import time
import Queue
import threading
import multiprocessing

def coroutine(func):
    """ Decorator for priming co-routines that use (yield) """
    def wrapper(*args, **kwargs):
        c = func(*args, **kwargs)
        c.next() # prime it for iteration
        return c
    return wrapper

class CoroutineProcess(multiprocessing.Process):
    """ Will run a coroutine in its own process, using the multiprocessing
library. The coroutine thread runs as a daemon, and is closed automatically
when it is no longer needed. Because it exposes send and close methods, a
CoroutineProcess wrapped coroutine can be dropped in for a regular
coroutine."""

    def __init__(self, target_func):
        multiprocessing.Process.__init__(self)
        self.in_queue = multiprocessing.Queue()
        self.processor = target_func
        self.daemon = True
        # Allows the thread to close correctly
        self.shutdown = multiprocessing.Event()

    def send(self, item):
        if self.shutdown.is_set():
            raise StopIteration
        self.in_queue.put(item)

    def __call__(self, *args, **kwargs):
        # Prime the wrapped coroutine.
        self.processor = self.processor(*args, **kwargs)
        self.processor.next()
        self.start()
        return self

    def run(self): # this is the isolated 'process' being run after start() is called
        try:
            while True:
                item = self.in_queue.get()
                self.processor.send(item) # throws StopIteration if close() has been called
        except StopIteration:
            pass
        self.close()

    def close(self):
        self.processor.close()
        self.shutdown.set()

def coroutine_process(func):
    def wrapper(*args, **kwargs):
        cp = CoroutineProcess(func)
        cp = cp(*args, **kwargs)
        # XXX(todo): use @CoroutineProcess on an individual function, then wrap
        # with @coroutine, too. Don't start until .next().
        return cp
    return wrapper

class CoroutineThread(threading.Thread):
    """ Wrapper for coroutines; runs in their own threads. """
    def __init__(self, target_func):
        threading.Thread.__init__(self) # creates a thread
        self.setDaemon(True)
        self.in_queue = Queue.Queue() # creates a queue for cross-thread communication
        self.processor = target_func # the function to process incoming data
        self.shutdown = threading.Event() # watch for close

    def send(self, item):
        if self.shutdown.isSet():
            raise StopIteration
        self.in_queue.put(item)

    def __call__(self, *args, **kwargs):
        # Prime the wrapped coroutine.
        self.processor = self.processor(*args, **kwargs)
        self.processor.next()
        self.start()
        return self

    def run(self): # this is running in its own thread after it is created
        try:
            while True:
                item = self.in_queue.get()
                if self.shutdown.is_set(): break
                self.processor.send(item)
        except StopIteration:
            pass
        self.shutdown.set()

    def close(self):
        self.shutdown.set()

def coroutine_thread(func):
    def wrapper(*args, **kwargs):
        cp = CoroutineThread(func)
        cp = cp(*args, **kwargs)
        # XXX(todo): use @CoroutineProcess on an individual function, then wrap
        # with @coroutine, too. Don't start until .next().
        return cp
    return wrapper


Something went wrong with that request. Please try again.