Skip to content

Commit

Permalink
Consolidate HTTP/2 window_update increments
Browse files Browse the repository at this point in the history
All increments for a single call to `Mint.HTTP2.stream` or recv are
sent in single frames per-stream plus the frame for the connection-level
increment. Any RST_STREAM frames and HEADER or DATA frames with the
End Stream flag set discard the stream-level window size increment.

This doesn't appear to have any effect in practice.
  • Loading branch information
the-mikedavis committed Jan 27, 2024
1 parent 321c830 commit c77a13a
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 66 deletions.
146 changes: 80 additions & 66 deletions lib/mint/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ defmodule Mint.HTTP2 do
alias Mint.Types
alias Mint.Core.Util
alias Mint.HTTP2.Frame
alias Mint.HTTP2.WindowSizeIncrements

require Logger
require Integer
Expand Down Expand Up @@ -1382,21 +1383,21 @@ defmodule Mint.HTTP2 do

defp maybe_concat_and_handle_new_data(conn, data) do
data = maybe_concat(conn.buffer, data)
{conn, responses} = handle_new_data(conn, data, [])
{conn, responses} = handle_new_data(conn, data, [], WindowSizeIncrements.new())
{:ok, conn, Enum.reverse(responses)}
end

defp handle_new_data(%__MODULE__{} = conn, data, responses) do
defp handle_new_data(%__MODULE__{} = conn, data, responses, window_size_increments) do
case Frame.decode_next(data, conn.client_settings.max_frame_size) do
{:ok, frame, rest} ->
log(conn, :debug, "Received frame: #{Frame.inspect(frame)}")
conn = validate_frame(conn, frame)
{conn, responses} = handle_frame(conn, frame, responses)
handle_new_data(conn, rest, responses)
{conn, responses, window_size_increments} = handle_frame(conn, frame, responses, window_size_increments)
handle_new_data(conn, rest, responses, window_size_increments)

:more ->
conn = put_in(conn.buffer, data)
handle_consumed_all_frames(conn, responses)
handle_consumed_all_frames(conn, responses, window_size_increments)

{:error, :payload_too_big} ->
debug_data = "frame payload exceeds connection's max frame size"
Expand All @@ -1415,7 +1416,7 @@ defmodule Mint.HTTP2 do
:throw, {:mint, _conn, _error, _responses} = thrown -> throw(thrown)
end

defp handle_consumed_all_frames(%{state: state} = conn, responses) do
defp handle_consumed_all_frames(%{state: state} = conn, responses, window_size_increments) do
case state do
{:goaway, :no_error, _debug_data} ->
{conn, responses}
Expand All @@ -1425,10 +1426,19 @@ defmodule Mint.HTTP2 do
throw({:mint, conn, error, responses})

_ ->
conn = refill_client_windows(conn, window_size_increments)
{conn, responses}
end
end

defp refill_client_windows(conn, window_size_increments) do
if open?(conn) do
send!(conn, WindowSizeIncrements.frames(window_size_increments))
else
conn
end
end

defp validate_frame(conn, unknown()) do
# Unknown frames MUST be ignored:
# https://datatracker.ietf.org/doc/html/rfc7540#section-4.1
Expand Down Expand Up @@ -1520,33 +1530,29 @@ defmodule Mint.HTTP2 do
for frame_name <- stream_level_frames ++ connection_level_frames ++ [:window_update, :unknown] do
function_name = :"handle_#{frame_name}"

defp handle_frame(conn, Frame.unquote(frame_name)() = frame, responses) do
unquote(function_name)(conn, frame, responses)
defp handle_frame(conn, Frame.unquote(frame_name)() = frame, responses, window_size_increments) do
unquote(function_name)(conn, frame, responses, window_size_increments)
end
end

defp handle_unknown(conn, _frame, responses) do
defp handle_unknown(conn, _frame, responses, window_size_increments) do
# Implementations MUST ignore and discard any frame that has a type that is unknown.
# see: https://datatracker.ietf.org/doc/html/rfc7540#section-4.1

{conn, responses}
{conn, responses, window_size_increments}
end

# DATA

defp handle_data(conn, frame, responses) do
defp handle_data(conn, frame, responses, window_size_increments) do
data(stream_id: stream_id, flags: flags, data: data, padding: padding) = frame

# Regardless of whether we have the stream or not, we need to abide by flow
# control rules so we still refill the client window for the stream_id we got.
window_size_increment = byte_size(data) + byte_size(padding || "")
size_increment = byte_size(data) + byte_size(padding || "")

conn =
if window_size_increment > 0 do
refill_client_windows(conn, stream_id, window_size_increment)
else
conn
end
window_size_increments =
WindowSizeIncrements.increment_stream(window_size_increments, stream_id, size_increment)

case Map.fetch(conn.streams, stream_id) do
{:ok, stream} ->
Expand All @@ -1555,9 +1561,10 @@ defmodule Mint.HTTP2 do

if flag_set?(flags, :data, :end_stream) do
conn = close_stream!(conn, stream.id, :no_error)
{conn, [{:done, stream.ref} | responses]}
window_size_increments = WindowSizeIncrements.discard_stream(window_size_increments, stream_id)
{conn, [{:done, stream.ref} | responses], window_size_increments}
else
{conn, responses}
{conn, responses, window_size_increments}
end

:error ->
Expand All @@ -1566,35 +1573,32 @@ defmodule Mint.HTTP2 do
end
end

defp refill_client_windows(conn, stream_id, data_size) do
connection_frame = window_update(stream_id: 0, window_size_increment: data_size)
stream_frame = window_update(stream_id: stream_id, window_size_increment: data_size)

if open?(conn) do
send!(conn, [Frame.encode(connection_frame), Frame.encode(stream_frame)])
else
conn
end
end

# HEADERS

defp handle_headers(conn, frame, responses) do
defp handle_headers(conn, frame, responses, window_size_increments) do
headers(stream_id: stream_id, flags: flags, hbf: hbf) = frame

stream = Map.get(conn.streams, stream_id)
end_stream? = flag_set?(flags, :headers, :end_stream)

window_size_increments =
if end_stream? do
WindowSizeIncrements.discard_stream(window_size_increments, stream_id)
else
window_size_increments
end

if stream do
assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote])
end

if flag_set?(flags, :headers, :end_headers) do
decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?)
{conn, responses} = decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?)
{conn, responses, window_size_increments}
else
callback = &decode_hbf_and_add_responses(&1, &2, &3, &4, end_stream?)
conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
{conn, responses}
{conn, responses, window_size_increments}
end
end

Expand Down Expand Up @@ -1746,14 +1750,14 @@ defmodule Mint.HTTP2 do
# PRIORITY

# For now we ignore all PRIORITY frames. This shouldn't cause practical trouble.
defp handle_priority(conn, frame, responses) do
defp handle_priority(conn, frame, responses, window_size_increments) do
log(conn, :warning, "Ignoring PRIORITY frame: #{inspect(frame)}")
{conn, responses}
{conn, responses, window_size_increments}
end

# RST_STREAM

defp handle_rst_stream(conn, frame, responses) do
defp handle_rst_stream(conn, frame, responses, window_size_increments) do
rst_stream(stream_id: stream_id, error_code: error_code) = frame

# If we receive RST_STREAM on a closed stream, we ignore it.
Expand All @@ -1764,31 +1768,34 @@ defmodule Mint.HTTP2 do
# it, so that if we get things like DATA on that stream we error out.
conn = delete_stream(conn, stream)

window_size_increments =
WindowSizeIncrements.discard_stream(window_size_increments, stream_id)

if error_code == :no_error do
{conn, [{:done, stream.ref} | responses]}
{conn, [{:done, stream.ref} | responses], window_size_increments}
else
error = wrap_error({:server_closed_request, error_code})
{conn, [{:error, stream.ref, error} | responses]}
{conn, [{:error, stream.ref, error} | responses], window_size_increments}
end

:error ->
{conn, responses}
{conn, responses, window_size_increments}
end
end

# SETTINGS

defp handle_settings(conn, frame, responses) do
defp handle_settings(conn, frame, responses, window_size_increments) do
settings(flags: flags, params: params) = frame

if flag_set?(flags, :settings, :ack) do
conn = apply_client_settings(conn)
{conn, responses}
{conn, responses, window_size_increments}
else
conn = apply_server_settings(conn, params)
frame = settings(flags: set_flags(:settings, [:ack]), params: [])
conn = send!(conn, Frame.encode(frame))
{conn, responses}
{conn, responses, window_size_increments}
end
end

Expand Down Expand Up @@ -1882,13 +1889,14 @@ defmodule Mint.HTTP2 do
defp handle_push_promise(
%__MODULE__{client_settings: %{enable_push: false}} = conn,
push_promise(),
_responses
_responses,
_window_size_increments
) do
debug_data = "received PUSH_PROMISE frame when SETTINGS_ENABLE_PUSH was false"
send_connection_error!(conn, :protocol_error, debug_data)
end

defp handle_push_promise(conn, push_promise() = frame, responses) do
defp handle_push_promise(conn, push_promise() = frame, responses, window_size_increments) do
push_promise(
stream_id: stream_id,
flags: flags,
Expand All @@ -1902,17 +1910,19 @@ defmodule Mint.HTTP2 do
assert_stream_in_state(conn, stream, [:open, :half_closed_local])

if flag_set?(flags, :push_promise, :end_headers) do
decode_push_promise_headers_and_add_response(
conn,
responses,
hbf,
stream,
promised_stream_id
)
{conn, responses} =
decode_push_promise_headers_and_add_response(
conn,
responses,
hbf,
stream,
promised_stream_id
)
{conn, responses, window_size_increments}
else
callback = &decode_push_promise_headers_and_add_response(&1, &2, &3, &4, promised_stream_id)
conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
{conn, responses}
{conn, responses, window_size_increments}
end
end

Expand Down Expand Up @@ -1958,15 +1968,16 @@ defmodule Mint.HTTP2 do

# PING

defp handle_ping(conn, Frame.ping() = frame, responses) do
defp handle_ping(conn, Frame.ping() = frame, responses, window_size_increments) do
Frame.ping(flags: flags, opaque_data: opaque_data) = frame

if flag_set?(flags, :ping, :ack) do
handle_ping_ack(conn, opaque_data, responses)
{conn, responses} = handle_ping_ack(conn, opaque_data, responses)
{conn, responses, window_size_increments}
else
ack = Frame.ping(stream_id: 0, flags: set_flags(:ping, [:ack]), opaque_data: opaque_data)
conn = send!(conn, Frame.encode(ack))
{conn, responses}
{conn, responses, window_size_increments}
end
end

Expand All @@ -1988,7 +1999,7 @@ defmodule Mint.HTTP2 do

# GOAWAY

defp handle_goaway(conn, frame, responses) do
defp handle_goaway(conn, frame, responses, _window_size_increments) do
goaway(
last_stream_id: last_stream_id,
error_code: error_code,
Expand Down Expand Up @@ -2016,47 +2027,49 @@ defmodule Mint.HTTP2 do
log(conn, :debug, "#{message} (with debug data: #{inspect(debug_data)})")

conn = put_in(conn.state, {:goaway, error_code, debug_data})
{conn, unprocessed_request_responses ++ responses}
{conn, unprocessed_request_responses ++ responses, WindowSizeIncrements.new()}
end

# WINDOW_UPDATE

defp handle_window_update(
conn,
window_update(stream_id: 0, window_size_increment: wsi),
responses
responses,
window_size_increments
) do
new_window_size = conn.window_size + wsi

if new_window_size > @max_window_size do
send_connection_error!(conn, :flow_control_error, "window size too big")
else
conn = put_in(conn.window_size, new_window_size)
{conn, responses}
{conn, responses, window_size_increments}
end
end

defp handle_window_update(
conn,
window_update(stream_id: stream_id, window_size_increment: wsi),
responses
responses,
window_size_increments
) do
stream = fetch_stream!(conn, stream_id)
new_window_size = conn.streams[stream_id].window_size + wsi

if new_window_size > @max_window_size do
conn = close_stream!(conn, stream_id, :flow_control_error)
error = wrap_error({:flow_control_error, "window size too big"})
{conn, [{:error, stream.ref, error} | responses]}
{conn, [{:error, stream.ref, error} | responses], window_size_increments}
else
conn = put_in(conn.streams[stream_id].window_size, new_window_size)
{conn, responses}
{conn, responses, window_size_increments}
end
end

# CONTINUATION

defp handle_continuation(conn, frame, responses) do
defp handle_continuation(conn, frame, responses, window_size_increments) do
continuation(stream_id: stream_id, flags: flags, hbf: hbf_chunk) = frame
stream = Map.get(conn.streams, stream_id)

Expand All @@ -2069,10 +2082,11 @@ defmodule Mint.HTTP2 do
if flag_set?(flags, :continuation, :end_headers) do
hbf = IO.iodata_to_binary([hbf_acc, hbf_chunk])
conn = put_in(conn.headers_being_processed, nil)
callback.(conn, responses, hbf, stream)
{conn, responses} = callback.(conn, responses, hbf, stream)
{conn, responses, window_size_increments}
else
conn = put_in(conn.headers_being_processed, {stream_id, [hbf_acc, hbf_chunk], callback})
{conn, responses}
{conn, responses, window_size_increments}
end
end

Expand Down
Loading

0 comments on commit c77a13a

Please sign in to comment.