Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Avoid issuing too many RST_STREAM frames #150

Merged
merged 3 commits into from
Jul 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ def __init_state(self):
# Streams are stored in a dictionary keyed off their stream IDs. We
# also save the most recent one for easy access without having to walk
# the dictionary.
# Finally, we add a set of all streams that we or the remote party
# forcefully closed with RST_STREAM, to avoid encountering issues where
# frames were already in flight before the RST was processed.
self.streams = {}
self.recent_stream = None
self.next_stream_id = 1
self.reset_streams = set()

# Header encoding/decoding is at the connection scope, so we embed a
# header encoder and a decoder. These get passed to child stream
Expand Down Expand Up @@ -652,6 +656,13 @@ def _consume_frame_payload(self, frame, data):
# the stream and go about our business.
self._send_rst_frame(frame.promised_stream_id, 7)

# If this frame was received on a stream that has been reset, drop it.
if frame.stream_id in self.reset_streams:
log.info(
"Stream %s has been reset, dropping frame.", frame.stream_id
)
return

# Work out to whom this frame should go.
if frame.stream_id != 0:
try:
Expand All @@ -663,6 +674,11 @@ def _consume_frame_payload(self, frame, data):
log.warning(
"Unexpected stream identifier %d" % (frame.stream_id)
)

# If this is a RST_STREAM frame, we may get more than one (because
# of frames in flight). Keep track.
if frame.type == RstStreamFrame.type:
self.reset_streams.add(frame.stream_id)
else:
self.receive_frame(frame)

Expand Down Expand Up @@ -703,6 +719,10 @@ def _send_rst_frame(self, stream_id, error_code):
"Stream with id %d does not exist: %s",
stream_id, e)

# Keep track of the fact that we reset this stream in case there are
# other frames in flight.
self.reset_streams.add(stream_id)

# The following two methods are the implementation of the context manager
# protocol.
def __enter__(self):
Expand Down
7 changes: 7 additions & 0 deletions hyper/http20/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,10 @@ class ProtocolError(HTTP20Error):
The remote party violated the HTTP/2 protocol.
"""
pass


class StreamResetError(HTTP20Error):
"""
A stream was forcefully reset by the remote party.
"""
pass
7 changes: 5 additions & 2 deletions hyper/http20/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from ..common.headers import HTTPHeaderMap
from ..packages.hyperframe.frame import (
FRAME_MAX_LEN, FRAMES, HeadersFrame, DataFrame, PushPromiseFrame,
WindowUpdateFrame, ContinuationFrame, BlockedFrame
WindowUpdateFrame, ContinuationFrame, BlockedFrame, RstStreamFrame
)
from .exceptions import ProtocolError
from .exceptions import ProtocolError, StreamResetError
from .util import h2_safe_headers
import collections
import logging
Expand Down Expand Up @@ -233,6 +233,9 @@ def receive_frame(self, frame):
w = WindowUpdateFrame(self.stream_id)
w.window_increment = increment
self._data_cb(w, True)
elif frame.type == RstStreamFrame.type:
self.close(0)
raise StreamResetError("Stream forcefully closed.")
elif frame.type in FRAMES:
# This frame isn't valid at this point.
raise ValueError("Unexpected frame %s." % frame)
Expand Down
99 changes: 97 additions & 2 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from hyper.contrib import HTTP20Adapter
from hyper.packages.hyperframe.frame import (
Frame, SettingsFrame, WindowUpdateFrame, DataFrame, HeadersFrame,
GoAwayFrame,
GoAwayFrame, RstStreamFrame
)
from hyper.packages.hpack.hpack import Encoder
from hyper.packages.hpack.huffman import HuffmanEncoder
from hyper.packages.hpack.huffman_constants import (
REQUEST_CODES, REQUEST_CODES_LENGTH
)
from hyper.http20.exceptions import ConnectionError
from hyper.http20.exceptions import ConnectionError, StreamResetError
from server import SocketLevelTest

# Turn off certificate verification for the tests.
Expand Down Expand Up @@ -506,6 +506,101 @@ def socket_handler(listener):

self.tear_down()

def test_resetting_stream_with_frames_in_flight(self):
"""
Hyper emits only one RST_STREAM frame, despite the other frames in
flight.
"""
self.set_up()

recv_event = threading.Event()

def socket_handler(listener):
sock = listener.accept()[0]

# We get two messages for the connection open and then a HEADERS
# frame.
receive_preamble(sock)
sock.recv(65535)

# Now, send the headers for the response. This response has no
# body.
f = build_headers_frame(
[(':status', '204'), ('content-length', '0')]
)
f.flags.add('END_STREAM')
f.stream_id = 1
sock.send(f.serialize())

# Wait for the message from the main thread.
recv_event.wait()
sock.close()

self._start_server(socket_handler)
conn = self.get_connection()
stream_id = conn.request('GET', '/')

# Now, trigger the RST_STREAM frame by closing the stream.
conn._send_rst_frame(stream_id, 0)

# Now, eat the Headers frame. This should not cause an exception.
conn._recv_cb()

# However, attempting to get the response should.
with pytest.raises(KeyError):
conn.get_response(stream_id)

# Awesome, we're done now.
recv_event.set()

self.tear_down()

def test_stream_can_be_reset_multiple_times(self):
"""
Confirm that hyper gracefully handles receiving multiple RST_STREAM
frames.
"""
self.set_up()

recv_event = threading.Event()

def socket_handler(listener):
sock = listener.accept()[0]

# We get two messages for the connection open and then a HEADERS
# frame.
receive_preamble(sock)
sock.recv(65535)

# Now, send two RST_STREAM frames.
for _ in range(0, 2):
f = RstStreamFrame(1)
sock.send(f.serialize())

# Wait for the message from the main thread.
recv_event.wait()
sock.close()

self._start_server(socket_handler)
conn = self.get_connection()
conn.request('GET', '/')

# Now, eat the RstStream frames. The first one throws a
# StreamResetError.
with pytest.raises(StreamResetError):
conn._consume_single_frame()

# The next should throw no exception.
conn._consume_single_frame()

assert conn.reset_streams == set([1])

# Awesome, we're done now.
recv_event.set()

self.tear_down()


class TestRequestsAdapter(SocketLevelTest):
# This uses HTTP/2.
h2 = True
Expand Down