Skip to content

Commit

Permalink
add zmq manager
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidler committed Jan 6, 2017
1 parent d6f703f commit 88f3b87
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions socketio/zmq_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import time
import pickle
import re

try:
import zmq
except ImportError:
zmq = None

from .pubsub_manager import PubSubManager


class ZmqManager(PubSubManager): # pragma: no cover
"""zmq based client manager.
This class implements a zmq backend for event sharing across multiple
processes. To use a zmq backend, initialize the :class:`Server` instance as
follows::
url = 'zmq+tcp://hostname:port1+port2'
server = socketio.Server(client_manager=socketio.ZmqManager(url))
:param url: The connection URL for the zmq message broker,
which will need to be provided and running.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
N.B.
a zmq message broker must be running for the zmq_manager to work.
you can write your own or adapt one from the following simple broker below.
port numbers in the broker must match port numbers in connection string.
``
import zmq
receiver = zmq.Context().socket(zmq.PULL)
receiver.bind("tcp://*:5555")
publisher = zmq.Context().socket(zmq.PUB)
publisher.bind("tcp://*:5556")
while True:
publisher.send(receiver.recv())
``
"""
name = 'zmq'

def __init__(self, url='tcp://localhost:5555+5556',
channel='socketio',
write_only=False):
if zmq is None:
raise RuntimeError('zmq package is not installed '
'(Run "pip install pyzmq" in your '
'virtualenv).')

r = re.compile(':\d+\+\d+$')
if not (url.startswith('zmq+tcp://') and r.search(url)):
raise RuntimeError('unexpected connection string: ' + url)

url = url.replace('zmq+', '')
(sink_url, sub_port) = url.split('+')
sink_port = sink_url.split(':')[-1]
sub_url = sink_url.replace(sink_port, sub_port)

sink = zmq.Context().socket(zmq.PUSH)
sink.connect(sink_url)

sub = zmq.Context().socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, u'')
sub.connect(sub_url)

self.sink = sink
self.sub = sub
self.channel = channel
super(ZmqManager, self).__init__(channel=channel,
write_only=write_only)

def _publish(self, data):
pickled_data = pickle.dumps(
{
'type': 'message',
'channel': self.channel,
'data': data
}
)
return self.sink.send(pickled_data)

def zmq_listen(self):
while True:
try:
response = self.sub.recv(flags=zmq.NOBLOCK)
if response is not None:
yield response
except zmq.Again:
time.sleep(0.5)

def _listen(self):
for message in self.zmq_listen():
if isinstance(message, str):
message = pickle.loads(message)
if isinstance(message, dict) and \
message['channel'] == self.channel and \
'data' in message:
yield message['data']
return

0 comments on commit 88f3b87

Please sign in to comment.