Skip to content
Go-style concurrency for Twisted
Branch: master
Clone or download
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
csp don't rely on collections.deque.maxlen since it's not available in py… Jun 9, 2014
example use special value CLOSED (still equal to None for now) to signify clo… May 31, 2014
.gitignore
CHANGELOG.md 0.2.0 May 31, 2014
LICENSE.txt add EPL license Jan 31, 2014
MANIFEST.in add MANIFEST file Jan 31, 2014
README.md remove "spawn", add "go" variations and decorators May 31, 2014
requirements.txt add zope.interface dependency Jan 31, 2014
run fix run script still using old "spawn" API May 31, 2014
setup.py 0.2.0 May 31, 2014

README.md

Twisted CSP

Communicating sequential processes for Twisted. Channels like Go, or Clojurescript's core.async.

WARNING: This is currently alpha software.

This is a very close port of Clojurescript's core.async. The significant difference is that light-weight processes are implemented using generators (yield) instead of macros.

  • Channel operations must happen inside "light-weight processes" (code flows, not actual threads).
  • Light-weight processes are spawn by calling go, go_channel, go_deferred or by using their decorator equivalents.
  • Most channel operations must follow the form of yield do_sth(...).
def slow_pipe(input, output):
    while True:
        value = yield take(input)
        yield sleep(0.5)
        if value is None: # input closed
            close(output)
            yield stop()
        else:
            yield put(output, value)

go(slow_pipe, chan1, chan2))

Examples

Function returning channel (http://talks.golang.org/2012/concurrency.slide#25).

def boring(message):
    c = Channel()
    def counter():
        i = 0
        while True:
            yield put(c, "%s %d" % (message, i))
            yield sleep(random.random())
            i += 1
    go(counter)
    return c


def main():
    b = boring("boring!")
    for i in range(5):
        print "You say: \"%s\"" % (yield take(b))
    print "You are boring; I'm leaving."

Pingpong (http://talks.golang.org/2013/advconc.slide#6).

class Ball:
    hits = 0


@process
def player(name, table):
    while True:
        ball = yield take(table)
        ball.hits += 1
        print name, ball.hits
        yield sleep(0.1)
        yield put(table, ball)


def main():
    table = Channel()

    player("ping", table)
    player("pong", table)

    yield put(table, Ball())
    yield sleep(1)

Timeout using alts (select in Go) (http://talks.golang.org/2012/concurrency.slide#35).

c = boring("Joe")
while True:
    value, chan = yield alts([c, timeout(0.8)])
    if chan is c:
        print value
    else:
        print "You're too slow."
        yield stop()

Running the examples

Use the run script, like

./run example.go.timeout_for_whole_conversation_using_select

Examples under example/go are ports of Go examples from:

Playing around in a REPL

Python 2.7.5+ (default, Sep 19 2013, 13:48:49)
[GCC 4.8.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import thread
>>> from csp import *
>>> from twisted.internet import reactor
>>> thread.start_new_thread(reactor.run, (), {"installSignalHandlers": False})
140038185355008
>>> class Ball:
...     hits = 0
...
>>> def player(name, table):
...     while True:
...         ball = yield take(table)
...         if ball is None: # channel's closed
...             print name, "Ball's gone"
...             break
...         ball.hits += 1
...         print name, ball.hits
...         yield sleep(0.1)
...         yield put(table, ball)
...
>>> def main():
...     table = Channel()
...     go(player, "ping", table)
...     go(player, "pong", table)
...     yield put(table, Ball())
...     yield sleep(1)
...     close(table)
...
>>> reactor.callFromThread(lambda: go(main))
>>> ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping Ball's gone
pong Ball's gone

>>>

Limitations

  • Does not work in a multi-threaded environment, at all (this is fixable though).
  • Channel's normal API cannot be used outside of a process (more precisely outside of the generator function of a process).
  • Generator functions must be used to spawn processes. This makes it less composable than in Go (where the constructs are built-in), or Clojurescript (where macros rule).
  • Forgetting to yield can cause subtle bugs.
  • Cooperative multi-processing (not sure if this is a big problem though).

TODO

  • Multiplexing, mixing, publishing/subscribing.
  • Channel operations (map, filter, reduce...).
  • Support multi-threaded environment (porting Clojure's core.async not Clojurescript's).
  • Write tests.
  • Think of a sensible error handling strategy (I think this should be decided by client code not library code though).
    • Should there be a separate error channel?
    • Should channels deliver (result, error) tuples?
    • Should errors be treated as special values (caught exceptions re-thrown when taken)?
  • Support other reactors, e.g. Tornado (should be easy, as the dispatcher is the only thing that depends on twisted).
  • More documentation.
  • More examples (focusing on leveraging Twisted's rich network capabilities).
  • put_then_callback, take_then_callback execute the supplied callback in the same tick if result is immediately available. This can cause problems (especially if they are public API).

Inspiration

Other Python CSP libraries:

These libraries use threads/processes (except for pycsp, which has support for greenlets (which is not portable)). This makes implementation simpler (in principle), sacrificing efficiency (but managing threads/processes can be a chore). On the other hand they are much more generic, and support networked channels (that is not necessarily a good thing though). TODO: More elaborate comparison.

You can’t perform that action at this time.