diff --git a/HISTORY.rst b/HISTORY.rst index 8b1f739..7685e1b 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ------- +2.2.1-2014.2 (2014-06-04) ++++++++++++++++++++++++++ + +* Properly support sending and receiving large messages. + 2.2.1-2014.1 (2014-02-11) +++++++++++++++++++++++++ diff --git a/pygazebo/pygazebo.py b/pygazebo/pygazebo.py index aeafc68..22120d3 100755 --- a/pygazebo/pygazebo.py +++ b/pygazebo/pygazebo.py @@ -143,6 +143,11 @@ class _Connection(object): server. In either case, it provides methods to read and write structured data on the socket. """ + + # Do all raw socket reads and writes in amounts no larger than + # this. + BUF_SIZE = 16384 + def __init__(self): self.address = None self.socket = None @@ -174,9 +179,16 @@ def read_raw(self): except ValueError: raise ParseError('invalid header: ' + header) - data = self.socket.recv(size) - if len(data) < size: - return None + data = '' + + # Read in BUF_SIZE increments. + while len(data) < size: + this_size = min(size - len(data), self.BUF_SIZE) + this_data = self.socket.recv(this_size) + if len(this_data) == 0: + return None + data += this_data + return data def read(self): @@ -187,13 +199,19 @@ def read(self): packet = msg.packet_pb2.Packet.FromString(data) return packet + def send_pieces(self, data): + start = 0 + while start < len(data): + self.socket.send(data[start:start + self.BUF_SIZE]) + start += self.BUF_SIZE + def write(self, message): self._socket_ready.wait() data = message.SerializeToString() header = '%08X' % len(data) - self.socket.send(header + data) + self.send_pieces(header + data) def write_packet(self, name, message): packet = msg.packet_pb2.Packet() diff --git a/setup.py b/setup.py index f0c4c42..1f2976b 100755 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def run_tests(self): setup( name='pygazebo', - version='2.2.1-2014.1', + version='2.2.1-2014.2', description='Python bindings for the Gazebo multi-robot simulator.', long_description=readme + '\n\n' + history, author='Josh Pieper', diff --git a/tests/test_pygazebo.py b/tests/test_pygazebo.py index a1a8112..38797a7 100755 --- a/tests/test_pygazebo.py +++ b/tests/test_pygazebo.py @@ -43,13 +43,20 @@ def send(self, data): self.write(data) def write(self, data): + assert len(data) <= 16384 for x in data: assert len(x) == 1 self.other.queue.put(x, block=True, timeout=1.0) - def write_frame(self, data): - header = '%08X' % len(data) - self.write(header + data) + def write_frame(self, payload): + header = '%08X' % len(payload) + + data = header + payload + + start = 0 + while start < len(data): + self.write(data[start:start + 1000]) + start += 1000 def write_packet(self, name, message): packet = packet_pb2.Packet() @@ -60,6 +67,7 @@ def write_packet(self, name, message): self.write_frame(packet.SerializeToString()) def recv(self, length): + assert length <= 16384 result = '' for x in range(length): data = self.queue.get(block=True, timeout=1.0) @@ -77,9 +85,14 @@ def read_frame(self): except ValueError: return None - data = self.recv(size) - if len(data) < size: - return None + data = '' + while len(data) < size: + this_size = min(size - len(data), 1000) + this_data = self.recv(this_size) + if len(this_data) == 0: + return None + data += this_data + return data @@ -250,11 +263,20 @@ def test_send(self, manager): # At this point, anything we "publish" should end up being # written to this pipe. - read_data = eventlet.spawn(pipe.endpointb.read_frame) + read_data1 = eventlet.spawn(pipe.endpointb.read_frame) sample_message = gz_string_pb2.GzString() sample_message.data = 'testdata' publisher.publish(sample_message) - data_frame = read_data.wait() + data_frame = read_data1.wait() + assert data_frame == sample_message.SerializeToString() + + # Test sending a very large message, it should require no + # individual writes which are too large. + read_data2 = eventlet.spawn(pipe.endpointb.read_frame) + sample_message.data = ' ' * 100000 + publisher.publish(sample_message) + + data_frame = read_data2.wait() assert data_frame == sample_message.SerializeToString()