diff --git a/txzmq/connection.py b/txzmq/connection.py index f59f01a..b20cf23 100644 --- a/txzmq/connection.py +++ b/txzmq/connection.py @@ -66,6 +66,7 @@ class ZmqConnection(object): :vartype multicastRate: int :var highWaterMark: hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer + :var topicSep: bytes :vartype highWaterMark: int :var tcpKeepalive: if set to 1, enable TCP keepalive, otherwise leave it as default @@ -99,6 +100,7 @@ class ZmqConnection(object): allowLoopbackMulticast = False multicastRate = 100 highWaterMark = 0 + topicSep = b'\0' # Only supported by zeromq3 and pyzmq>=2.2.0.1 tcpKeepalive = 0 diff --git a/txzmq/pubsub.py b/txzmq/pubsub.py index 5813cdd..9066d6c 100644 --- a/txzmq/pubsub.py +++ b/txzmq/pubsub.py @@ -23,7 +23,11 @@ def publish(self, message, tag=b''): :param tag: message tag :type tag: str """ - self.send(tag + b'\0' + message) + if isinstance(tag, str): + tag = tag.encode() + if isinstance(message, str): + message = message.encode() + self.send(tag + self.topicSep + message) class ZmqSubConnection(ZmqConnection): @@ -44,7 +48,7 @@ def subscribe(self, tag): :param tag: message tag :type tag: str """ - self.socket.set(constants.SUBSCRIBE, tag) + self.socket.setsockopt(constants.SUBSCRIBE, tag) def unsubscribe(self, tag): """ @@ -55,7 +59,7 @@ def unsubscribe(self, tag): :param tag: message tag :type tag: str """ - self.socket.set(constants.UNSUBSCRIBE, tag) + self.socket.setsockopt(constants.UNSUBSCRIBE, tag) def messageReceived(self, message): """ @@ -71,7 +75,7 @@ def messageReceived(self, message): # of multi-part message self.gotMessage(message[1], message[0]) else: - self.gotMessage(*reversed(message[0].split(b'\0', 1))) + self.gotMessage(*reversed(message[0].split(self.topicSep, 1))) def gotMessage(self, message, tag): """