Skip to content

Commit

Permalink
Major update to tornado.platform.twisted.
Browse files Browse the repository at this point in the history
Significantly improved compatibility (most important changes are in
TornadoReactor._invoke_callback) and expanded test coverage.
  • Loading branch information
bdarnell committed Jan 17, 2012
1 parent 3622650 commit af940f4
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 128 deletions.
169 changes: 93 additions & 76 deletions tornado/platform/twisted.py
Expand Up @@ -13,14 +13,35 @@
# License for the specific language governing permissions and limitations
# under the License.

"""
A twisted-style reactor for the Tornado IOLoop.
# Note: This module's docs are not currently extracted automatically,
# so changes must be made manually to twisted.rst
# TODO: refactor doc build process to use an appropriate virtualenv
"""A Twisted reactor built on the Tornado IOLoop.
This module lets you run applications and libraries written for
Twisted in a Tornado application. To use it, simply call `install` at
the beginning of the application::
import tornado.platform.twisted
tornado.platform.twisted.install()
from twisted.internet import reactor
When the app is ready to start, call `IOLoop.instance().start()`
instead of `reactor.run()`. This will allow you to use a mixture of
Twisted and Tornado code in the same process.
It is also possible to create a non-global reactor by calling
`tornado.platform.twisted.TornadoReactor(io_loop)`. However, if
the `IOLoop` and reactor are to be short-lived (such as those used in
unit tests), additional cleanup may be required. Specifically, it is
recommended to call::
reactor.fireSystemEvent('shutdown')
reactor.disconnectAll()
To use it, add the following to your twisted application:
before closing the `IOLoop`.
import tornado.platform.twisted
tornado.platform.twisted.install()
from twisted.internet import reactor
This module has been tested with Twisted versions 11.0.0 and 11.1.0.
"""

from __future__ import with_statement, absolute_import
Expand All @@ -32,7 +53,7 @@
from twisted.internet.posixbase import PosixReactorBase
from twisted.internet.interfaces import \
IReactorFDSet, IDelayedCall, IReactorTime
from twisted.python import failure
from twisted.python import failure, log
from twisted.internet import error

from zope.interface import implements
Expand All @@ -44,9 +65,7 @@


class TornadoDelayedCall(object):
"""
DelayedCall object for Tornado.
"""
"""DelayedCall object for Tornado."""
implements(IDelayedCall)

def __init__(self, reactor, seconds, f, *args, **kw):
Expand Down Expand Up @@ -89,23 +108,35 @@ def active(self):
return self._active

class TornadoReactor(PosixReactorBase):
"""
Twisted style reactor for Tornado.
"""Twisted reactor built on the Tornado IOLoop.
Since it is intented to be used in applications where the top-level
event loop is ``io_loop.start()`` rather than ``reactor.run()``,
it is implemented a little differently than other Twisted reactors.
We override `mainLoop` instead of `doIteration` and must implement
timed call functionality on top of `IOLoop.add_timeout` rather than
using the implementation in `PosixReactorBase`.
"""
implements(IReactorTime, IReactorFDSet)

def __init__(self, io_loop=None):
if not io_loop:
io_loop = tornado.ioloop.IOLoop.instance()
self._io_loop = io_loop
self._readers = {}
self._writers = {}
self._readers = {} # map of reader objects to fd
self._writers = {} # map of writer objects to fd
self._fds = {} # a map of fd to a (reader, writer) tuple
self._delayedCalls = {}
self._running = False
self._closed = False
PosixReactorBase.__init__(self)

# IOLoop.start() bypasses some of the reactor initialization.
# Fire off the necessary events if they weren't already triggered
# by reactor.run().
def start_if_necessary():
if not self._started:
self.fireSystemEvent('startup')
self._io_loop.add_callback(start_if_necessary)

# IReactorTime
def seconds(self):
return time.time()
Expand All @@ -124,9 +155,7 @@ def _removeDelayedCall(self, dc):

# IReactorThreads
def callFromThread(self, f, *args, **kw):
"""
See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
"""
"""See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
assert callable(f), "%s is not callable" % f
p = functools.partial(f, *args, **kw)
self._io_loop.add_callback(p)
Expand All @@ -142,25 +171,36 @@ def wakeUp(self):
# IReactorFDSet
def _invoke_callback(self, fd, events):
(reader, writer) = self._fds[fd]
if events & IOLoop.READ and reader:
reader.doRead()
if events & IOLoop.WRITE and writer:
writer.doWrite()
if events & IOLoop.ERROR:
if reader:
reader.readConnectionLost(failure.Failure(error.ConnectionLost()))
if writer:
writer.connectionLost(failure.Failure(error.ConnectionLost()))
if reader:
err = None
if reader.fileno() == -1:
err = error.ConnectionLost()
elif events & IOLoop.READ:
err = log.callWithLogger(reader, reader.doRead)
if err is None and events & IOLoop.ERROR:
err = error.ConnectionLost()
if err is not None:
self.removeReader(reader)
reader.readConnectionLost(failure.Failure(err))
if writer:
err = None
if writer.fileno() == -1:
err = error.ConnectionLost()
elif events & IOLoop.WRITE:
err = log.callWithLogger(writer, writer.doWrite)
if err is None and events & IOLoop.ERROR:
err = error.ConnectionLost()
if err is not None:
self.removeWriter(writer)
writer.writeConnectionLost(failure.Failure(err))

def addReader(self, reader):
"""
Add a FileDescriptor for notification of data available to read.
"""
"""Add a FileDescriptor for notification of data available to read."""
if reader in self._readers:
# Don't add the reader if it's already there
return
self._readers[reader] = True
fd = reader.fileno()
self._readers[reader] = fd
if fd in self._fds:
(_, writer) = self._fds[fd]
self._fds[fd] = (reader, writer)
Expand All @@ -175,13 +215,11 @@ def addReader(self, reader):
IOLoop.READ)

def addWriter(self, writer):
"""
Add a FileDescriptor for notification of data available to write.
"""
"""Add a FileDescriptor for notification of data available to write."""
if writer in self._writers:
return
self._writers[writer] = True
fd = writer.fileno()
self._writers[writer] = fd
if fd in self._fds:
(reader, _) = self._fds[fd]
self._fds[fd] = (reader, writer)
Expand All @@ -196,13 +234,9 @@ def addWriter(self, writer):
IOLoop.WRITE)

def removeReader(self, reader):
"""
Remove a Selectable for notification of data available to read.
"""
fd = reader.fileno()
"""Remove a Selectable for notification of data available to read."""
if reader in self._readers:
del self._readers[reader]
if self._closed: return
fd = self._readers.pop(reader)
(_, writer) = self._fds[fd]
if writer:
# We have a writer so we need to update the IOLoop for
Expand All @@ -217,13 +251,9 @@ def removeReader(self, reader):
self._io_loop.remove_handler(fd)

def removeWriter(self, writer):
"""
Remove a Selectable for notification of data available to write.
"""
fd = writer.fileno()
"""Remove a Selectable for notification of data available to write."""
if writer in self._writers:
del self._writers[writer]
if self._closed: return
fd = self._writers.pop(writer)
(reader, _) = self._fds[fd]
if reader:
# We have a reader so we need to update the IOLoop for
Expand All @@ -246,47 +276,30 @@ def getReaders(self):
def getWriters(self):
return self._writers.keys()

# The following functions are mainly used in twisted-style test cases;
# it is expected that most users of the TornadoReactor will call
# IOLoop.start() instead of Reactor.run().
def stop(self):
"""
Implement L{IReactorCore.stop}.
"""
self._running = False
PosixReactorBase.stop(self)
self.runUntilCurrent()
try:
self._io_loop.stop()
self._io_loop.close()
except:
# Ignore any exceptions thrown by IOLoop
pass
self._closed = True
self._io_loop.stop()

def crash(self):
if not self._running:
return
self._running = False
PosixReactorBase.crash(self)
self.runUntilCurrent()
try:
self._io_loop.stop()
self._io_loop.close()
except:
# Ignore any exceptions thrown by IOLoop
pass
self._closed = True
self._io_loop.stop()

def doIteration(self, delay):
raise NotImplementedError("doIteration")

def mainLoop(self):
self._running = True
self._io_loop.start()
if self._stopped:
self.fireSystemEvent("shutdown")

class _TestReactor(TornadoReactor):
"""Subclass of TornadoReactor for use in unittests.
This can't go in the test.py file because of import-order dependencies
with the twisted reactor test builder.
with the Twisted reactor test builder.
"""
def __init__(self):
# always use a new ioloop
Expand All @@ -299,12 +312,16 @@ def listenTCP(self, port, factory, backlog=50, interface=''):
return super(_TestReactor, self).listenTCP(
port, factory, backlog=backlog, interface=interface)

def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
if not interface:
interface = '127.0.0.1'
return super(_TestReactor, self).listenUDP(
port, protocol, interface=interface, maxPacketSize=maxPacketSize)



def install(io_loop=None):
"""
Install the Tornado reactor.
"""
"""Install this package as the default Twisted reactor."""
if not io_loop:
io_loop = tornado.ioloop.IOLoop.instance()
reactor = TornadoReactor(io_loop)
Expand Down

0 comments on commit af940f4

Please sign in to comment.