Skip to content

Commit

Permalink
integrate message reading loop into twisted reactor loop
Browse files Browse the repository at this point in the history
  • Loading branch information
daa committed May 19, 2016
1 parent 2843571 commit 661f332
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions txzmq/connection.py
Expand Up @@ -8,7 +8,7 @@

from zope.interface import implementer

from twisted.internet import reactor
from twisted.internet import reactor, task
from twisted.internet.interfaces import IFileDescriptor, IReadDescriptor
from twisted.python import log

Expand Down Expand Up @@ -171,6 +171,7 @@ def __init__(self, factory, endpoint=None, identity=None):
if endpoint:
self.addEndpoints([endpoint])

self._read_loop = None
self.factory.connections.add(self)

self.factory.reactor.addReader(self)
Expand Down Expand Up @@ -263,7 +264,12 @@ def doRead(self):
if not self.read_scheduled.called:
self.read_scheduled.cancel()
self.read_scheduled = None
if self._read_loop is None:
self._read_loop = task.cooperate(self._read_messages()).\
whenDone().\
addBoth(self._read_done)

def _read_messages(self):
while True:
if self.factory is None: # disconnected
return
Expand All @@ -277,11 +283,14 @@ def doRead(self):
message = self._readMultipart()
except error.ZMQError as e:
if e.errno == constants.EAGAIN:
continue

raise e
pass
raise
else:
log.callWithLogger(self, self.messageReceived, message)
yield

log.callWithLogger(self, self.messageReceived, message)
def _read_done(self, _):
self._read_loop = None

def logPrefix(self):
"""
Expand Down

0 comments on commit 661f332

Please sign in to comment.