Permalink
Browse files

1. Issue with zmq_send

2. Fix tests
  • Loading branch information...
1 parent 3a88439 commit a754aed76c78d5671d852959841eb0d63fd79994 Vasiliy Evseenko committed Jun 2, 2011
View
@@ -151,7 +151,7 @@ class zmq_msg_t(Structure):
libzmq.zmq_msg_copy.argtypes = [POINTER(zmq_msg_t), POINTER(zmq_msg_t)]
libzmq.zmq_msg_data.restype = c_void_p
libzmq.zmq_msg_data.argtypes = [POINTER(zmq_msg_t)]
-libzmq.zmq_msg_size.restype = c_void_p
+libzmq.zmq_msg_size.restype = size_t
libzmq.zmq_msg_size.argtypes = [POINTER(zmq_msg_t)]
# 0MQ socket definition
@@ -441,17 +441,13 @@ def send(self, data, flags=0, copy=True, track=False):
msg_c_len = len(data)
zmq_msg_init_size(byref(msg), msg_c_len)
+ msg_buf = zmq_msg_data(byref(msg))
+ msg_buf_size = zmq_msg_size(byref(msg))
+ memmove(msg_buf, data, msg_buf_size)
- try:
- msg_buf = zmq_msg_data(byref(msg))
- msg_buf_size = zmq_msg_size(byref(msg))
- memmove(msg_buf, data, msg_buf_size)
- return zmq_send(self.handle, byref(msg), flags)
- finally:
- zmq_msg_close(byref(msg))
+ return zmq_send(self.handle, byref(msg), flags)
-
def recv(self, flags=0, copy=True, track=False):
"""s.recv(flags=0, copy=True, track=False)
@@ -51,11 +51,3 @@ def test_term(self):
def test_fail_init(self):
self.assertRaisesErrno(zmq.EINVAL, zmq.Context, 0)
- def test_term_hang(self):
- rep,req = self.create_bound_pair(zmq.XREP, zmq.XREQ)
- req.setsockopt(zmq.LINGER, 0)
- req.send('hello'.encode(), copy=False)
- req.close()
- rep.close()
- self.context.term()
-
View
@@ -1,114 +0,0 @@
-#
-# Copyright (c) 2010 Min Ragan-Kelley
-#
-# This file is part of pyzmq.
-#
-# pyzmq is free software; you can redistribute it and/or modify it under
-# the terms of the Lesser GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# pyzmq is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Lesser GNU General Public License for more details.
-#
-# You should have received a copy of the Lesser GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
-
-import time
-
-import zmq
-from zmq import devices
-from zmq.tests import BaseZMQTestCase
-from zmq.utils.strtypes import (bytes,unicode,basestring)
-
-#-----------------------------------------------------------------------------
-# Tests
-#-----------------------------------------------------------------------------
-
-
-class TestDevice(BaseZMQTestCase):
-
- def test_device_types(self):
- # a = self.context.socket(zmq.SUB)
- for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
- dev = devices.Device(devtype, zmq.PAIR,zmq.PAIR)
- self.assertEquals(dev.device_type, devtype)
- del dev
- # del a
-
- def test_device_attributes(self):
- # a = self.context.socket(zmq.SUB)
- # b = self.context.socket(zmq.PUB)
- dev = devices.Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
- self.assert_(dev.in_type == zmq.SUB)
- self.assert_(dev.out_type == zmq.PUB)
- self.assertEquals(dev.device_type, zmq.FORWARDER)
- self.assertEquals(dev.daemon, True)
- # del a
- del dev
-
- def test_tsdevice_attributes(self):
- dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
- self.assertEquals(dev.in_type, zmq.SUB)
- self.assertEquals(dev.out_type, zmq.PUB)
- self.assertEquals(dev.device_type, zmq.QUEUE)
- self.assertEquals(dev.daemon, True)
- del dev
-
-
- def test_single_socket_forwarder_connect(self):
- dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = req.bind_to_random_port('tcp://127.0.0.1')
- dev.connect_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = 'hello'.encode()
- req.send(msg)
- self.assertEquals(msg, req.recv())
- del dev
- del req
- dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = req.bind_to_random_port('tcp://127.0.0.1')
- dev.connect_out('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = 'hello again'.encode()
- req.send(msg)
- self.assertEquals(msg, req.recv())
- del dev
- del req
-
- def test_single_socket_forwarder_bind(self):
- dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = 12345
- req.connect('tcp://127.0.0.1:%i'%port)
- dev.bind_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = 'hello'.encode()
- req.send(msg)
- self.assertEquals(msg, req.recv())
- del dev
- del req
- dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
- req = self.context.socket(zmq.REQ)
- port = 12346
- req.connect('tcp://127.0.0.1:%i'%port)
- dev.bind_in('tcp://127.0.0.1:%i'%port)
- dev.start()
- time.sleep(.25)
- msg = 'hello again'.encode()
- req.send(msg)
- self.assertEquals(msg, req.recv())
- del dev
- del req
View
@@ -1,128 +0,0 @@
-#
-# Copyright (c) 2010 Brian E. Granger
-#
-# This file is part of pyzmq.
-#
-# pyzmq is free software; you can redistribute it and/or modify it under
-# the terms of the Lesser GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# pyzmq is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Lesser GNU General Public License for more details.
-#
-# You should have received a copy of the Lesser GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
-
-import time
-from unittest import TestCase
-
-import zmq
-from zmq.tests import BaseZMQTestCase
-
-from zmq.log import handlers
-import logging
-
-#-----------------------------------------------------------------------------
-# Tests
-#-----------------------------------------------------------------------------
-
-class TestPubLog(BaseZMQTestCase):
-
- iface = 'inproc://zmqlog'
- topic='zmq'.encode()
-
- @property
- def logger(self):
- # print dir(self)
- logger = logging.getLogger('zmqtest')
- logger.setLevel(logging.DEBUG)
- return logger
-
- def connect_handler(self):
- logger = self.logger
- pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
- handler = handlers.PUBHandler(pub)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = self.topic
- logger.addHandler(handler)
- sub.setsockopt(zmq.SUBSCRIBE, self.topic)
- time.sleep(0.1)
- return logger, handler, sub
-
- def test_init_iface(self):
- logger = self.logger
- ctx = self.context
- handler = handlers.PUBHandler(self.iface)
- self.assertFalse(handler.ctx is ctx)
- handler.socket.close()
-
- handler = handlers.PUBHandler(self.iface, self.context)
- self.assertTrue(handler.ctx is ctx)
-
- handler.setLevel(logging.DEBUG)
- handler.root_topic = self.topic
- logger.addHandler(handler)
-
- # handler.socket.close()
- sub = ctx.socket(zmq.SUB)
- sub.connect(self.iface)
- sub.setsockopt(zmq.SUBSCRIBE, self.topic)
- import time; time.sleep(0.1)
- msg1 = 'message'
- logger.info(msg1)
-
- (topic, msg2) = sub.recv_multipart()
- self.assertEquals(topic, 'zmq.INFO'.encode())
- self.assertEquals(msg2, (msg1+'\n').encode())
- logger.removeHandler(handler)
- # handler.socket.close()
-
- def test_init_socket(self):
- pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
- logger = self.logger
- handler = handlers.PUBHandler(pub)
- handler.setLevel(logging.DEBUG)
- handler.root_topic = self.topic
- logger.addHandler(handler)
-
- self.assertTrue(handler.socket is pub)
- self.assertTrue(handler.ctx is pub.context)
- self.assertTrue(handler.ctx is self.context)
- # handler.socket.close()
- sub.setsockopt(zmq.SUBSCRIBE, self.topic)
- import time; time.sleep(0.1)
- msg1 = 'message'
- logger.info(msg1)
-
- (topic, msg2) = sub.recv_multipart()
- self.assertEquals(topic, 'zmq.INFO'.encode())
- self.assertEquals(msg2, (msg1+'\n').encode())
- logger.removeHandler(handler)
- # handler.socket.close()
-
- def test_root_topic(self):
- logger, handler, sub = self.connect_handler()
- handler.socket.bind(self.iface)
- sub2 = sub.context.socket(zmq.SUB)
- sub2.connect(self.iface)
- sub2.setsockopt(zmq.SUBSCRIBE, ''.encode())
- handler.root_topic = 'twoonly'.encode()
- msg1 = 'ignored'
- logger.info(msg1)
- self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
- topic,msg2 = sub2.recv_multipart()
- self.assertEquals(topic, 'twoonly.INFO'.encode())
- self.assertEquals(msg2, (msg1+'\n').encode())
-
-
-
- logger.removeHandler(handler)
-
Oops, something went wrong.

0 comments on commit a754aed

Please sign in to comment.