Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
build/
env/
dist/
*.egg-info/
Expand Down
1 change: 1 addition & 0 deletions hyper/http20/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DataFrame, HeadersFrame, SettingsFrame, Frame, WindowUpdateFrame,
GoAwayFrame
)
from .response import HTTP20Response
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this import?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I'll fix it up. =D

from .window import FlowControlManager
from .exceptions import ConnectionError

Expand Down
2 changes: 1 addition & 1 deletion hyper/http20/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Frame(object):
defined_flags = []

# The type of the frame.
type = 0
type = -1

def __init__(self, stream_id):
self.stream_id = stream_id
Expand Down
126 changes: 64 additions & 62 deletions hyper/http20/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def __init__(self,
self.stream_id = stream_id
self.state = STATE_IDLE
self.headers = []
self._queued_frames = collections.deque()

self.response_headers = None
self.header_data = []
self.data = []

# There are two flow control windows: one for data we're sending,
# one for data being sent to us.
Expand Down Expand Up @@ -110,6 +113,34 @@ def file_iterator(fobj):
for chunk in chunks:
self._send_chunk(chunk, final)

@property
def _local_closed(self):
return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_LOCAL)

@property
def _remote_closed(self):
return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_REMOTE)

@property
def _local_open(self):
return self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE)

@property
def _remote_open(self):
return self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL)

def _close_local(self):
self.state = (
STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN
else STATE_CLOSED
)

def _close_remote(self):
self.state = (
STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN
else STATE_CLOSED
)

def _read(self, amt=None):
"""
Read data from the stream. Unlike a normal read behaviour, this
Expand All @@ -118,55 +149,46 @@ def _read(self, amt=None):
if self.state == STATE_CLOSED:
return b''

assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL)
assert self._remote_open

def listlen(list):
return sum(map(len, list))

data = []
# Keep reading until the stream is closed or we get enough data.
while not self._remote_closed and (amt is None or listlen(self.data) < amt):
self._recv_cb()

# Begin by processing frames off the queue.
while amt is None or listlen(data) < amt:
try:
frame = self._queued_frames.popleft()
except IndexError:
# No frames on the queue. Try to read one and try again.
self._recv_cb()
continue
result = b''.join(self.data)
self.data = []
return result

# All queued frames at this point should be data frames.
assert isinstance(frame, DataFrame)
def receive_frame(self, frame):
"""
Handle a frame received on this stream.
"""
if 'END_STREAM' in frame.flags:
self._close_remote()

if isinstance(frame, WindowUpdateFrame):
self._out_flow_control_window += frame.window_increment
elif isinstance(frame, (HeadersFrame, ContinuationFrame)):
self.header_data.append(frame.data)
elif isinstance(frame, DataFrame):
# Append the data to the buffer.
data.append(frame.data)

# If that was the last frame, we're done here.
if 'END_STREAM' in frame.flags:
self.state = (
STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN
else STATE_CLOSED
)
break
self.data.append(frame.data)

# Increase the window size. Only do this if the data frame contains
# actual data.
increment = self._in_window_manager._handle_frame(len(frame.data))
if increment:
if increment and not self._remote_closed:
w = WindowUpdateFrame(self.stream_id)
w.window_increment = increment
self._data_cb(w)

return b''.join(data)

def receive_frame(self, frame):
"""
Handle a frame received on this stream. If this is a window update
frame, immediately update the window accordingly.
"""
if isinstance(frame, WindowUpdateFrame):
self._out_flow_control_window += frame.window_increment
else:
self._queued_frames.append(frame)
raise ValueError('Unexpected frame type: %i' % frame.type)

if 'END_HEADERS' in frame.flags:
self.response_headers = self._decoder.decode(b''.join(self.header_data))

def open(self, end):
"""
Expand Down Expand Up @@ -211,38 +233,19 @@ def getresponse(self):
Once all data has been sent on this connection, returns a
HTTP20Response object wrapping this stream.
"""
assert self.state == STATE_HALF_CLOSED_LOCAL
header_data = []

# At this stage, the only things in the frame queue should be HEADERS
# and CONTINUATION frames. Grab them all, reading more frames off the
# connection if necessary.
while True:
try:
frame = self._queued_frames.popleft()
except IndexError:
self._recv_cb()
continue

assert isinstance(frame, (HeadersFrame, ContinuationFrame))
assert self._local_closed

header_data.append(frame.data)

if 'END_HEADERS' in frame.flags:
if 'END_STREAM' in frame.flags:
self.state = STATE_CLOSED
break

# Decode the headers.
headers = self._decoder.decode(b''.join(header_data))
# Keep reading until all headers are received.
while self.response_headers is None:
self._recv_cb()

# Find the Content-Length header if present.
self._in_window_manager.document_size = (
int(get_from_key_value_set(headers, 'content-length', 0))
int(get_from_key_value_set(self.response_headers, 'content-length', 0))
)

# Create the HTTP response.
return HTTP20Response(headers, self)
return HTTP20Response(self.response_headers, self)

def close(self):
"""
Expand All @@ -264,7 +267,7 @@ def _send_chunk(self, data, final):
(determined by being of size less than MAX_CHUNK) and no more data is
to be sent.
"""
assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE)
assert self._local_open

f = DataFrame(self.stream_id)
f.data = data
Expand All @@ -286,5 +289,4 @@ def _send_chunk(self, data, final):

# If no more data is to be sent on this stream, transition our state.
if len(data) < MAX_CHUNK and final:
self.state = (STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN
else STATE_CLOSED)
self._close_local()
41 changes: 32 additions & 9 deletions test/test_hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,9 +922,7 @@ def test_we_can_read_from_the_socket(self):
c._recv_cb()

s = c.recent_stream
assert len(s._queued_frames) == 1
assert isinstance(s._queued_frames[0], DataFrame)
assert s._queued_frames[0].data == b'testdata'
assert s.data == [b'testdata']

def test_putrequest_sends_data(self):
sock = DummySocket()
Expand Down Expand Up @@ -1029,6 +1027,28 @@ def test_connections_handle_resizing_header_tables_properly(self):
assert f2.stream_id == 0
assert f2.flags == set(['ACK'])

def test_read_headers_out_of_order(self):
# If header blocks aren't decoded in the same order they're received,
# regardless of the stream they belong to, the decoder state will become
# corrupted.
e = Encoder()
h1 = HeadersFrame(1)
h1.data = e.encode({':status': 200, 'content-type': 'foo/bar'})
h1.flags |= set(['END_HEADERS', 'END_STREAM'])
h3 = HeadersFrame(3)
h3.data = e.encode({':status': 200, 'content-type': 'baz/qux'})
h3.flags |= set(['END_HEADERS', 'END_STREAM'])
sock = DummySocket()
sock.buffer = BytesIO(h1.serialize() + h3.serialize())

c = HTTP20Connection('www.google.com')
c._sock = sock
r1 = c.request('GET', '/a')
r3 = c.request('GET', '/b')

assert c.getresponse(r3).getheaders() == [('content-type', 'baz/qux')]
assert c.getresponse(r1).getheaders() == [('content-type', 'foo/bar')]

def test_receive_unexpected_frame(self):
# RSTSTREAM frames are never defined on connections, so send one of
# those.
Expand Down Expand Up @@ -1065,11 +1085,6 @@ def data_callback(frame):

assert s.state == STATE_HALF_CLOSED_LOCAL

def test_receiving_a_frame_queues_it(self):
s = Stream(1, None, None, None, None, None, None)
s.receive_frame(Frame(0))
assert len(s._queued_frames) == 1

def test_file_objects_can_be_sent(self):
def data_callback(frame):
assert isinstance(frame, DataFrame)
Expand Down Expand Up @@ -1158,7 +1173,7 @@ def inner():
s.receive_frame(in_frames.pop(0))
return inner

s = Stream(1, send_cb, None, None, None, None, None)
s = Stream(1, send_cb, None, None, None, None, FlowControlManager(65535))
s._recv_cb = recv_cb(s)
s.state = STATE_HALF_CLOSED_LOCAL

Expand Down Expand Up @@ -1241,6 +1256,14 @@ def inner():
assert len(out_frames) == 1
assert s.state == STATE_CLOSED

def test_receive_unexpected_frame(self):
# SETTINGS frames are never defined on streams, so send one of those.
s = Stream(1, None, None, None, None, None, None)
f = SettingsFrame(0)

with pytest.raises(ValueError):
s.receive_frame(f)


class TestResponse(object):
def test_status_is_stripped_from_headers(self):
Expand Down