From 56b5b4d49973702bcb95bb36dcd1e35f40b57a1d Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Tue, 21 Jul 2015 21:10:59 +0100 Subject: [PATCH 1/3] Add exception for streams forcefully reset --- hyper/http20/exceptions.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hyper/http20/exceptions.py b/hyper/http20/exceptions.py index 0dbc4d9f..64808471 100644 --- a/hyper/http20/exceptions.py +++ b/hyper/http20/exceptions.py @@ -40,3 +40,10 @@ class ProtocolError(HTTP20Error): The remote party violated the HTTP/2 protocol. """ pass + + +class StreamResetError(HTTP20Error): + """ + A stream was forcefully reset by the remote party. + """ + pass From 927f82e5c033b15ff8d749344d3c7a269d063065 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Tue, 21 Jul 2015 21:11:33 +0100 Subject: [PATCH 2/3] Throw StreamResetError on stream reset --- hyper/http20/stream.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hyper/http20/stream.py b/hyper/http20/stream.py index 98a43f07..0f8b5564 100644 --- a/hyper/http20/stream.py +++ b/hyper/http20/stream.py @@ -16,9 +16,9 @@ from ..common.headers import HTTPHeaderMap from ..packages.hyperframe.frame import ( FRAME_MAX_LEN, FRAMES, HeadersFrame, DataFrame, PushPromiseFrame, - WindowUpdateFrame, ContinuationFrame, BlockedFrame + WindowUpdateFrame, ContinuationFrame, BlockedFrame, RstStreamFrame ) -from .exceptions import ProtocolError +from .exceptions import ProtocolError, StreamResetError from .util import h2_safe_headers import collections import logging @@ -233,6 +233,9 @@ def receive_frame(self, frame): w = WindowUpdateFrame(self.stream_id) w.window_increment = increment self._data_cb(w, True) + elif frame.type == RstStreamFrame.type: + self.close(0) + raise StreamResetError("Stream forcefully closed.") elif frame.type in FRAMES: # This frame isn't valid at this point. raise ValueError("Unexpected frame %s." % frame) From ef9b7f2c42af2f9abe834220dcd054af6b041eed Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Tue, 21 Jul 2015 21:11:47 +0100 Subject: [PATCH 3/3] Avoid overproducing RST_STREAM frames. --- hyper/http20/connection.py | 20 ++++++++ test/test_integration.py | 99 +++++++++++++++++++++++++++++++++++++- 2 files changed, 117 insertions(+), 2 deletions(-) diff --git a/hyper/http20/connection.py b/hyper/http20/connection.py index 9a45b398..a0463710 100644 --- a/hyper/http20/connection.py +++ b/hyper/http20/connection.py @@ -110,9 +110,13 @@ def __init_state(self): # Streams are stored in a dictionary keyed off their stream IDs. We # also save the most recent one for easy access without having to walk # the dictionary. + # Finally, we add a set of all streams that we or the remote party + # forcefully closed with RST_STREAM, to avoid encountering issues where + # frames were already in flight before the RST was processed. self.streams = {} self.recent_stream = None self.next_stream_id = 1 + self.reset_streams = set() # Header encoding/decoding is at the connection scope, so we embed a # header encoder and a decoder. These get passed to child stream @@ -652,6 +656,13 @@ def _consume_frame_payload(self, frame, data): # the stream and go about our business. self._send_rst_frame(frame.promised_stream_id, 7) + # If this frame was received on a stream that has been reset, drop it. + if frame.stream_id in self.reset_streams: + log.info( + "Stream %s has been reset, dropping frame.", frame.stream_id + ) + return + # Work out to whom this frame should go. if frame.stream_id != 0: try: @@ -663,6 +674,11 @@ def _consume_frame_payload(self, frame, data): log.warning( "Unexpected stream identifier %d" % (frame.stream_id) ) + + # If this is a RST_STREAM frame, we may get more than one (because + # of frames in flight). Keep track. + if frame.type == RstStreamFrame.type: + self.reset_streams.add(frame.stream_id) else: self.receive_frame(frame) @@ -703,6 +719,10 @@ def _send_rst_frame(self, stream_id, error_code): "Stream with id %d does not exist: %s", stream_id, e) + # Keep track of the fact that we reset this stream in case there are + # other frames in flight. + self.reset_streams.add(stream_id) + # The following two methods are the implementation of the context manager # protocol. def __enter__(self): diff --git a/test/test_integration.py b/test/test_integration.py index 89886e81..be3c2da1 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -15,14 +15,14 @@ from hyper.contrib import HTTP20Adapter from hyper.packages.hyperframe.frame import ( Frame, SettingsFrame, WindowUpdateFrame, DataFrame, HeadersFrame, - GoAwayFrame, + GoAwayFrame, RstStreamFrame ) from hyper.packages.hpack.hpack import Encoder from hyper.packages.hpack.huffman import HuffmanEncoder from hyper.packages.hpack.huffman_constants import ( REQUEST_CODES, REQUEST_CODES_LENGTH ) -from hyper.http20.exceptions import ConnectionError +from hyper.http20.exceptions import ConnectionError, StreamResetError from server import SocketLevelTest # Turn off certificate verification for the tests. @@ -506,6 +506,101 @@ def socket_handler(listener): self.tear_down() + def test_resetting_stream_with_frames_in_flight(self): + """ + Hyper emits only one RST_STREAM frame, despite the other frames in + flight. + """ + self.set_up() + + recv_event = threading.Event() + + def socket_handler(listener): + sock = listener.accept()[0] + + # We get two messages for the connection open and then a HEADERS + # frame. + receive_preamble(sock) + sock.recv(65535) + + # Now, send the headers for the response. This response has no + # body. + f = build_headers_frame( + [(':status', '204'), ('content-length', '0')] + ) + f.flags.add('END_STREAM') + f.stream_id = 1 + sock.send(f.serialize()) + + # Wait for the message from the main thread. + recv_event.wait() + sock.close() + + self._start_server(socket_handler) + conn = self.get_connection() + stream_id = conn.request('GET', '/') + + # Now, trigger the RST_STREAM frame by closing the stream. + conn._send_rst_frame(stream_id, 0) + + # Now, eat the Headers frame. This should not cause an exception. + conn._recv_cb() + + # However, attempting to get the response should. + with pytest.raises(KeyError): + conn.get_response(stream_id) + + # Awesome, we're done now. + recv_event.set() + + self.tear_down() + + def test_stream_can_be_reset_multiple_times(self): + """ + Confirm that hyper gracefully handles receiving multiple RST_STREAM + frames. + """ + self.set_up() + + recv_event = threading.Event() + + def socket_handler(listener): + sock = listener.accept()[0] + + # We get two messages for the connection open and then a HEADERS + # frame. + receive_preamble(sock) + sock.recv(65535) + + # Now, send two RST_STREAM frames. + for _ in range(0, 2): + f = RstStreamFrame(1) + sock.send(f.serialize()) + + # Wait for the message from the main thread. + recv_event.wait() + sock.close() + + self._start_server(socket_handler) + conn = self.get_connection() + conn.request('GET', '/') + + # Now, eat the RstStream frames. The first one throws a + # StreamResetError. + with pytest.raises(StreamResetError): + conn._consume_single_frame() + + # The next should throw no exception. + conn._consume_single_frame() + + assert conn.reset_streams == set([1]) + + # Awesome, we're done now. + recv_event.set() + + self.tear_down() + + class TestRequestsAdapter(SocketLevelTest): # This uses HTTP/2. h2 = True