Skip to content

Commit

Permalink
Use separate read and write Kombu connections
Browse files Browse the repository at this point in the history
Eventlet does not like file handles to be shared among greenlets. Using
an independent connection in the listening thread addresses this
problem. (fixes #13)
  • Loading branch information
miguelgrinberg committed Jan 15, 2016
1 parent 155fd5b commit 0c35757
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions socketio/kombu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,24 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
self.kombu = kombu.Connection(url)
self.exchange = kombu.Exchange(channel, type='fanout', durable=False)
self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange)
super(KombuManager, self).__init__(channel=channel,
write_only=write_only)
super(KombuManager, self).__init__(channel=channel)
self.url = url
self.writer_conn = kombu.Connection(self.url)
self.writer_queue = self._queue(self.writer_conn)

def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue

def _publish(self, data):
with self.kombu.SimpleQueue(self.queue) as queue:
with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
queue.put(pickle.dumps(data))

def _listen(self):
with self.kombu.SimpleQueue(self.queue) as queue:
reader_conn = kombu.Connection(self.url)
reader_queue = self._queue(reader_conn)
with reader_conn.SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
Expand Down

0 comments on commit 0c35757

Please sign in to comment.