From 455d6966658e5bd57d8bf99c1adef195512628df Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 27 Feb 2015 10:44:00 +0000 Subject: [PATCH 1/2] Initial buffered socket readline proposal. --- hyper/http20/bufsocket.py | 96 ++++++++++++++++++++++++++++++++------ hyper/http20/exceptions.py | 15 ++++++ test/test_socket.py | 61 ++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 13 deletions(-) diff --git a/hyper/http20/bufsocket.py b/hyper/http20/bufsocket.py index 128f93fe..8ab4eb5d 100644 --- a/hyper/http20/bufsocket.py +++ b/hyper/http20/bufsocket.py @@ -11,7 +11,7 @@ process. """ import select -from .exceptions import ConnectionResetError +from .exceptions import ConnectionResetError, LineTooLongError class BufferedSocket(object): """ @@ -76,6 +76,25 @@ def can_read(self): return False + def new_buffer(self): + """ + This method moves all the data in the backing buffer to the start of + a new, fresh buffer. This gives the ability to read much more data. + """ + def read_all_from_buffer(): + end = self._index + self._bytes_in_buffer + return self._buffer_view[self._index:end] + + new_buffer = bytearray(self._buffer_size) + new_buffer_view = memoryview(new_buffer) + new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer() + + self._index = 0 + self._backing_buffer = new_buffer + self._buffer_view = new_buffer_view + + return + def recv(self, amt): """ Read some data from the socket. @@ -85,10 +104,6 @@ def recv(self, amt): bytes. The data *must* be copied out by the caller before the next call to this function. """ - def read_all_from_buffer(): - end = self._index + self._bytes_in_buffer - return self._buffer_view[self._index:end] - # In this implementation you can never read more than the number of # bytes in the buffer. if amt > self._buffer_size: @@ -96,15 +111,9 @@ def read_all_from_buffer(): # If the amount of data we've been asked to read is less than the # remaining space in the buffer, we need to clear out the buffer and - # start over. Copy the data into the new array. + # start over. if amt > self._remaining_capacity: - new_buffer = bytearray(self._buffer_size) - new_buffer_view = memoryview(new_buffer) - new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer() - - self._index = 0 - self._backing_buffer = new_buffer - self._buffer_view = new_buffer_view + self.new_buffer() # If there's still some room in the buffer, opportunistically attempt # to read into it. @@ -136,5 +145,66 @@ def read_all_from_buffer(): return data + def readline(self): + """ + Read up to a newline from the network and returns it. The implicit + maximum line length is the buffer size of the buffered socket. + + :returns: A ``memoryview`` object containing the appropriate number of + bytes. The data *must* be copied out by the caller before the next + call to this function. + """ + # First, check if there's anything in the buffer. This is one of those + # rare circumstances where this will work correctly on all platforms. + index = self._backing_buffer.find( + b'\n', + self._index, + self._index + self._bytes_in_buffer + ) + + if index != -1: + length = index + 1 - self._index + data = self._buffer_view[self._index:self._index+length] + self._index += length + self._bytes_in_buffer -= length + return data + + # In this case, we didn't find a newline in the buffer. To fix that, + # read some data into the buffer. To do our best to satisfy the read, + # we should shunt the data down in the buffer so that it's right at + # the start. We don't bother if we're already at the start of the + # buffer. + if self._index != 0: + self.new_buffer() + + while self._bytes_in_buffer < self._buffer_size: + count = self._sck.recv_into(self._buffer_view[self._buffer_end:]) + if not count: + raise ConnectionResetError() + + # We have some more data. Again, look for a newline in that gap. + first_new_byte = self._buffer_end + self._bytes_in_buffer += count + index = self._backing_buffer.find( + b'\n', + first_new_byte, + first_new_byte + count, + ) + + if index != -1: + # The length of the buffer is the index into the + # buffer at which we found the newline plus 1, minus the start + # index of the buffer, which really should be zero. + assert not self._index + length = index + 1 + data = self._buffer_view[:length] + self._index += length + self._bytes_in_buffer -= length + return data + + # If we got here, it means we filled the buffer without ever getting + # a newline. Time to throw an exception. + raise LineTooLongError() + def __getattr__(self, name): return getattr(self._sck, name) diff --git a/hyper/http20/exceptions.py b/hyper/http20/exceptions.py index 725603df..6956c2ab 100644 --- a/hyper/http20/exceptions.py +++ b/hyper/http20/exceptions.py @@ -5,6 +5,21 @@ This defines exceptions used in the HTTP/2 portion of hyper. """ +class SocketError(Exception): + """ + An error occurred during socket operation. + """ + pass + + +class LineTooLongError(Exception): + """ + An attempt to read a line from a socket failed because no newline was + found. + """ + pass + + class HTTP20Error(Exception): """ The base class for all of ``hyper``'s HTTP/2-related exceptions. diff --git a/test/test_socket.py b/test/test_socket.py index 447b6be0..aab07d13 100644 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -120,6 +120,67 @@ def test_oversized_read(self, monkeypatch): d = b.recv(1200).tobytes() assert d == b'a' * 600 + def test_readline_from_buffer(self, monkeypatch): + monkeypatch.setattr( + hyper.http20.bufsocket.select, 'select', dummy_select + ) + s = DummySocket() + b = BufferedSocket(s) + + one = b'hi there\r\n' + two = b'this is another line\r\n' + three = b'\r\n' + combined = b''.join([one, two, three]) + b._buffer_view[0:len(combined)] = combined + b._bytes_in_buffer += len(combined) + + assert b.readline().tobytes() == one + assert b.readline().tobytes() == two + assert b.readline().tobytes() == three + + def test_readline_from_socket(self, monkeypatch): + monkeypatch.setattr( + hyper.http20.bufsocket.select, 'select', dummy_select + ) + s = DummySocket() + b = BufferedSocket(s) + + one = b'hi there\r\n' + two = b'this is another line\r\n' + three = b'\r\n' + combined = b''.join([one, two, three]) + + for i in range(0, len(combined), 5): + s.inbound_packets.append(combined[i:i+5]) + + assert b.readline().tobytes() == one + assert b.readline().tobytes() == two + assert b.readline().tobytes() == three + + def test_readline_both(self, monkeypatch): + monkeypatch.setattr( + hyper.http20.bufsocket.select, 'select', dummy_select + ) + s = DummySocket() + b = BufferedSocket(s) + + one = b'hi there\r\n' + two = b'this is another line\r\n' + three = b'\r\n' + combined = b''.join([one, two, three]) + + split_index = int(len(combined) / 2) + + b._buffer_view[0:split_index] = combined[0:split_index] + b._bytes_in_buffer += split_index + + for i in range(split_index, len(combined), 5): + s.inbound_packets.append(combined[i:i+5]) + + assert b.readline().tobytes() == one + assert b.readline().tobytes() == two + assert b.readline().tobytes() == three + class DummySocket(object): def __init__(self): From d0182494a4edf1f4515f409274ade4c7ca9c41aa Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 27 Feb 2015 11:03:13 +0000 Subject: [PATCH 2/2] Code coverage of readline. --- hyper/http20/bufsocket.py | 3 +++ test/test_socket.py | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/hyper/http20/bufsocket.py b/hyper/http20/bufsocket.py index 8ab4eb5d..84e8e503 100644 --- a/hyper/http20/bufsocket.py +++ b/hyper/http20/bufsocket.py @@ -150,6 +150,9 @@ def readline(self): Read up to a newline from the network and returns it. The implicit maximum line length is the buffer size of the buffered socket. + Note that, unlike recv, this method absolutely *does* block until it + can read the line. + :returns: A ``memoryview`` object containing the appropriate number of bytes. The data *must* be copied out by the caller before the next call to this function. diff --git a/test/test_socket.py b/test/test_socket.py index aab07d13..79680432 100644 --- a/test/test_socket.py +++ b/test/test_socket.py @@ -5,8 +5,11 @@ Test the BufferedSocket implementation in hyper. """ +import pytest + import hyper.http20.bufsocket from hyper.http20.bufsocket import BufferedSocket +from hyper.http20.exceptions import ConnectionResetError, LineTooLongError # Patch the select method in bufsocket to make sure that it always returns # the dummy socket as readable. @@ -181,6 +184,29 @@ def test_readline_both(self, monkeypatch): assert b.readline().tobytes() == two assert b.readline().tobytes() == three + def test_socket_error_on_readline(self, monkeypatch): + monkeypatch.setattr( + hyper.http20.bufsocket.select, 'select', dummy_select + ) + s = DummySocket() + b = BufferedSocket(s) + + with pytest.raises(ConnectionResetError): + b.readline() + + def test_socket_readline_too_long(self, monkeypatch): + monkeypatch.setattr( + hyper.http20.bufsocket.select, 'select', dummy_select + ) + s = DummySocket() + b = BufferedSocket(s) + + b._buffer_view[0:b._buffer_size] = b'0' * b._buffer_size + b._bytes_in_buffer = b._buffer_size + + with pytest.raises(LineTooLongError): + b.readline() + class DummySocket(object): def __init__(self):