From b0b39ad7700ac4cfee8c0de4927b50188f8e1299 Mon Sep 17 00:00:00 2001 From: Alek Storm Date: Sun, 6 Apr 2014 17:04:17 -0700 Subject: [PATCH 1/3] Refactor Stream to ensure header blocks are decoded in the correct order: Previously, incoming frames were placed on per-Stream queues, which were read from lazily, i.e. whenever HTTP20Response.getheaders() or .read() were called. However, the HPACK algorithm depends on header blocks being encoded/decoded in the same order on both sides of the connection; otherwise, the codec states on each peer will get out of sync. This commit rearranges header decoding by moving it from Stream.getresponse() to Stream.receive_frame(), which is executed on the Connection's event loop. DATA frame receiving was refactored in a similar way for consistency. --- .gitignore | 1 + hyper/http20/connection.py | 1 + hyper/http20/frame.py | 2 +- hyper/http20/stream.py | 124 +++++++++++++++++++------------------ test/test_hyper.py | 46 +++++++++----- 5 files changed, 98 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index 61a166a9..c2ffe46a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +build/ env/ dist/ *.egg-info/ diff --git a/hyper/http20/connection.py b/hyper/http20/connection.py index adbb57b4..b29f9ac7 100644 --- a/hyper/http20/connection.py +++ b/hyper/http20/connection.py @@ -12,6 +12,7 @@ DataFrame, HeadersFrame, SettingsFrame, Frame, WindowUpdateFrame, GoAwayFrame ) +from .response import HTTP20Response from .window import FlowControlManager from .exceptions import ConnectionError diff --git a/hyper/http20/frame.py b/hyper/http20/frame.py index afe41d12..99244112 100644 --- a/hyper/http20/frame.py +++ b/hyper/http20/frame.py @@ -21,7 +21,7 @@ class Frame(object): defined_flags = [] # The type of the frame. - type = 0 + type = -1 def __init__(self, stream_id): self.stream_id = stream_id diff --git a/hyper/http20/stream.py b/hyper/http20/stream.py index b0705c98..08acc219 100644 --- a/hyper/http20/stream.py +++ b/hyper/http20/stream.py @@ -56,7 +56,10 @@ def __init__(self, self.stream_id = stream_id self.state = STATE_IDLE self.headers = [] - self._queued_frames = collections.deque() + + self.response_headers = None + self.header_data = [] + self.data = [] # There are two flow control windows: one for data we're sending, # one for data being sent to us. @@ -110,6 +113,34 @@ def file_iterator(fobj): for chunk in chunks: self._send_chunk(chunk, final) + @property + def _local_closed(self): + return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_LOCAL) + + @property + def _remote_closed(self): + return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_REMOTE) + + @property + def _local_open(self): + return self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE) + + @property + def _remote_open(self): + return self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL) + + def _close_local(self): + self.state = ( + STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN + else STATE_CLOSED + ) + + def _close_remote(self): + self.state = ( + STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN + else STATE_CLOSED + ) + def _read(self, amt=None): """ Read data from the stream. Unlike a normal read behaviour, this @@ -118,35 +149,30 @@ def _read(self, amt=None): if self.state == STATE_CLOSED: return b'' - assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL) + assert self._remote_open def listlen(list): return sum(map(len, list)) - data = [] - - # Begin by processing frames off the queue. - while amt is None or listlen(data) < amt: - try: - frame = self._queued_frames.popleft() - except IndexError: - # No frames on the queue. Try to read one and try again. - self._recv_cb() - continue + # Keep reading until the stream is closed or we get enough data. + while not self._remote_closed and (amt is None or listlen(self.data) < amt): + self._recv_cb() - # All queued frames at this point should be data frames. - assert isinstance(frame, DataFrame) + result = b''.join(self.data) + self.data = [] + return result + def receive_frame(self, frame): + """ + Handle a frame received on this stream. + """ + if isinstance(frame, WindowUpdateFrame): + self._out_flow_control_window += frame.window_increment + elif isinstance(frame, (HeadersFrame, ContinuationFrame)): + self.header_data.append(frame.data) + elif isinstance(frame, DataFrame): # Append the data to the buffer. - data.append(frame.data) - - # If that was the last frame, we're done here. - if 'END_STREAM' in frame.flags: - self.state = ( - STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN - else STATE_CLOSED - ) - break + self.data.append(frame.data) # Increase the window size. Only do this if the data frame contains # actual data. @@ -155,18 +181,14 @@ def listlen(list): w = WindowUpdateFrame(self.stream_id) w.window_increment = increment self._data_cb(w) + else: # pragma: no cover + raise ValueError('Unexpected frame type: %i' % frame.type) - return b''.join(data) + if 'END_HEADERS' in frame.flags: + self.response_headers = self._decoder.decode(b''.join(self.header_data)) - def receive_frame(self, frame): - """ - Handle a frame received on this stream. If this is a window update - frame, immediately update the window accordingly. - """ - if isinstance(frame, WindowUpdateFrame): - self._out_flow_control_window += frame.window_increment - else: - self._queued_frames.append(frame) + if 'END_STREAM' in frame.flags: + self._close_remote() def open(self, end): """ @@ -211,38 +233,19 @@ def getresponse(self): Once all data has been sent on this connection, returns a HTTP20Response object wrapping this stream. """ - assert self.state == STATE_HALF_CLOSED_LOCAL - header_data = [] - - # At this stage, the only things in the frame queue should be HEADERS - # and CONTINUATION frames. Grab them all, reading more frames off the - # connection if necessary. - while True: - try: - frame = self._queued_frames.popleft() - except IndexError: - self._recv_cb() - continue - - assert isinstance(frame, (HeadersFrame, ContinuationFrame)) + assert self._local_closed - header_data.append(frame.data) - - if 'END_HEADERS' in frame.flags: - if 'END_STREAM' in frame.flags: - self.state = STATE_CLOSED - break - - # Decode the headers. - headers = self._decoder.decode(b''.join(header_data)) + # Keep reading until all headers are received. + while self.response_headers is None: + self._recv_cb() # Find the Content-Length header if present. self._in_window_manager.document_size = ( - int(get_from_key_value_set(headers, 'content-length', 0)) + int(get_from_key_value_set(self.response_headers, 'content-length', 0)) ) # Create the HTTP response. - return HTTP20Response(headers, self) + return HTTP20Response(self.response_headers, self) def close(self): """ @@ -264,7 +267,7 @@ def _send_chunk(self, data, final): (determined by being of size less than MAX_CHUNK) and no more data is to be sent. """ - assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE) + assert self._local_open f = DataFrame(self.stream_id) f.data = data @@ -286,5 +289,4 @@ def _send_chunk(self, data, final): # If no more data is to be sent on this stream, transition our state. if len(data) < MAX_CHUNK and final: - self.state = (STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN - else STATE_CLOSED) + self._close_local() diff --git a/test/test_hyper.py b/test/test_hyper.py index 7fb23df7..3ce5b453 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -922,9 +922,7 @@ def test_we_can_read_from_the_socket(self): c._recv_cb() s = c.recent_stream - assert len(s._queued_frames) == 1 - assert isinstance(s._queued_frames[0], DataFrame) - assert s._queued_frames[0].data == b'testdata' + assert s.data == [b'testdata'] def test_putrequest_sends_data(self): sock = DummySocket() @@ -1029,6 +1027,28 @@ def test_connections_handle_resizing_header_tables_properly(self): assert f2.stream_id == 0 assert f2.flags == set(['ACK']) + def test_read_headers_out_of_order(self): + # If header blocks aren't decoded in the same order they're received, + # regardless of the stream they belong to, the decoder state will become + # corrupted. + e = Encoder() + h1 = HeadersFrame(1) + h1.data = e.encode({':status': 200, 'content-type': 'foo/bar'}) + h1.flags |= set(['END_HEADERS', 'END_STREAM']) + h3 = HeadersFrame(3) + h3.data = e.encode({':status': 200, 'content-type': 'baz/qux'}) + h3.flags |= set(['END_HEADERS', 'END_STREAM']) + sock = DummySocket() + sock.buffer = BytesIO(h1.serialize() + h3.serialize()) + + c = HTTP20Connection('www.google.com') + c._sock = sock + r1 = c.request('GET', '/a') + r3 = c.request('GET', '/b') + + assert c.getresponse(r3).getheaders() == [('content-type', 'baz/qux')] + assert c.getresponse(r1).getheaders() == [('content-type', 'foo/bar')] + def test_receive_unexpected_frame(self): # RSTSTREAM frames are never defined on connections, so send one of # those. @@ -1065,11 +1085,6 @@ def data_callback(frame): assert s.state == STATE_HALF_CLOSED_LOCAL - def test_receiving_a_frame_queues_it(self): - s = Stream(1, None, None, None, None, None, None) - s.receive_frame(Frame(0)) - assert len(s._queued_frames) == 1 - def test_file_objects_can_be_sent(self): def data_callback(frame): assert isinstance(frame, DataFrame) @@ -1158,7 +1173,7 @@ def inner(): s.receive_frame(in_frames.pop(0)) return inner - s = Stream(1, send_cb, None, None, None, None, None) + s = Stream(1, send_cb, None, None, None, None, FlowControlManager(65535)) s._recv_cb = recv_cb(s) s.state = STATE_HALF_CLOSED_LOCAL @@ -1170,7 +1185,9 @@ def inner(): data = s._read() assert data == b'hi there!' - assert len(out_frames) == 0 + assert len(out_frames) == 1 + assert isinstance(out_frames[0], WindowUpdateFrame) + assert out_frames[0].window_increment == len(b'hi there!') def test_can_read_multiple_frames_from_streams(self): out_frames = [] @@ -1200,9 +1217,10 @@ def inner(): data = s._read() assert data == b'hi there!hi there again!' - assert len(out_frames) == 1 - assert isinstance(out_frames[0], WindowUpdateFrame) - assert out_frames[0].window_increment == len(b'hi there!') + assert len(out_frames) == 2 + for frame, data in zip(out_frames, [b'hi there!', b'hi there again!']): + assert isinstance(frame, WindowUpdateFrame) + assert frame.window_increment == len(data) def test_partial_reads_from_streams(self): out_frames = [] @@ -1238,7 +1256,7 @@ def inner(): # Now we'll get the entire of the second frame. data = s._read(4) assert data == b'hi there again!' - assert len(out_frames) == 1 + assert len(out_frames) == 2 assert s.state == STATE_CLOSED From 7b17cbccab80f17885fb99707c2566391a1f424e Mon Sep 17 00:00:00 2001 From: Alek Storm Date: Mon, 7 Apr 2014 10:28:30 -0700 Subject: [PATCH 2/3] Revert a change that caused WINDOW_UPDATE frames to be sent on closed streams --- hyper/http20/stream.py | 8 ++++---- test/test_hyper.py | 13 +++++-------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/hyper/http20/stream.py b/hyper/http20/stream.py index 08acc219..4e0d6e56 100644 --- a/hyper/http20/stream.py +++ b/hyper/http20/stream.py @@ -166,6 +166,9 @@ def receive_frame(self, frame): """ Handle a frame received on this stream. """ + if 'END_STREAM' in frame.flags: + self._close_remote() + if isinstance(frame, WindowUpdateFrame): self._out_flow_control_window += frame.window_increment elif isinstance(frame, (HeadersFrame, ContinuationFrame)): @@ -177,7 +180,7 @@ def receive_frame(self, frame): # Increase the window size. Only do this if the data frame contains # actual data. increment = self._in_window_manager._handle_frame(len(frame.data)) - if increment: + if increment and not self._remote_closed: w = WindowUpdateFrame(self.stream_id) w.window_increment = increment self._data_cb(w) @@ -187,9 +190,6 @@ def receive_frame(self, frame): if 'END_HEADERS' in frame.flags: self.response_headers = self._decoder.decode(b''.join(self.header_data)) - if 'END_STREAM' in frame.flags: - self._close_remote() - def open(self, end): """ Open the stream. Does this by encoding and sending the headers: no more diff --git a/test/test_hyper.py b/test/test_hyper.py index 3ce5b453..1a2140c5 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -1185,9 +1185,7 @@ def inner(): data = s._read() assert data == b'hi there!' - assert len(out_frames) == 1 - assert isinstance(out_frames[0], WindowUpdateFrame) - assert out_frames[0].window_increment == len(b'hi there!') + assert len(out_frames) == 0 def test_can_read_multiple_frames_from_streams(self): out_frames = [] @@ -1217,10 +1215,9 @@ def inner(): data = s._read() assert data == b'hi there!hi there again!' - assert len(out_frames) == 2 - for frame, data in zip(out_frames, [b'hi there!', b'hi there again!']): - assert isinstance(frame, WindowUpdateFrame) - assert frame.window_increment == len(data) + assert len(out_frames) == 1 + assert isinstance(out_frames[0], WindowUpdateFrame) + assert out_frames[0].window_increment == len(b'hi there!') def test_partial_reads_from_streams(self): out_frames = [] @@ -1256,7 +1253,7 @@ def inner(): # Now we'll get the entire of the second frame. data = s._read(4) assert data == b'hi there again!' - assert len(out_frames) == 2 + assert len(out_frames) == 1 assert s.state == STATE_CLOSED From ede2d3452cfb6fda2b292650e7ff2d4a2ec1be0b Mon Sep 17 00:00:00 2001 From: Alek Storm Date: Mon, 7 Apr 2014 10:28:51 -0700 Subject: [PATCH 3/3] Test receiving an unexpected frame on a stream --- hyper/http20/stream.py | 2 +- test/test_hyper.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/hyper/http20/stream.py b/hyper/http20/stream.py index 4e0d6e56..2f97b27e 100644 --- a/hyper/http20/stream.py +++ b/hyper/http20/stream.py @@ -184,7 +184,7 @@ def receive_frame(self, frame): w = WindowUpdateFrame(self.stream_id) w.window_increment = increment self._data_cb(w) - else: # pragma: no cover + else: raise ValueError('Unexpected frame type: %i' % frame.type) if 'END_HEADERS' in frame.flags: diff --git a/test/test_hyper.py b/test/test_hyper.py index 1a2140c5..ce26a53e 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -1256,6 +1256,14 @@ def inner(): assert len(out_frames) == 1 assert s.state == STATE_CLOSED + def test_receive_unexpected_frame(self): + # SETTINGS frames are never defined on streams, so send one of those. + s = Stream(1, None, None, None, None, None, None) + f = SettingsFrame(0) + + with pytest.raises(ValueError): + s.receive_frame(f) + class TestResponse(object): def test_status_is_stripped_from_headers(self):