Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2: check window sizes, implement streaming #158

Merged
merged 8 commits into from
Dec 12, 2021
6 changes: 3 additions & 3 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ defmodule Finch do
@doc """
Builds an HTTP request to be sent with `request/3` or `stream/4`.

When making HTTP/1.x requests, it is possible to send the request body in a streaming fashion.
In order to do so, the `body` parameter needs to take form of a tuple `{:stream, body_stream}`,
where `body_stream` is a `Stream`. This feature is not yet supported for HTTP/2 requests.
It is possible to send the request body in a streaming fashion. In order to do so, the
`body` parameter needs to take form of a tuple `{:stream, body_stream}`, where `body_stream`
is a `Stream`.
"""
@spec build(Request.method(), Request.url(), Request.headers(), Request.body()) :: Request.t()
defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request
Expand Down
177 changes: 128 additions & 49 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Finch.HTTP2.Pool do
alias Finch.Error
alias Finch.Telemetry
alias Finch.SSL
alias Finch.HTTP2.RequestStream

require Logger

Expand Down Expand Up @@ -152,8 +153,8 @@ defmodule Finch.HTTP2.Pool do
# requests
def disconnected(:enter, _, data) do
:ok =
Enum.each(data.requests, fn {ref, from} ->
send(from, {:error, ref, Error.exception(:connection_closed)})
Enum.each(data.requests, fn {ref, request} ->
send(request.from_pid, {:error, ref, Error.exception(:connection_closed)})
end)

# It's possible that we're entering this state before we are alerted of the
Expand Down Expand Up @@ -246,43 +247,30 @@ defmodule Finch.HTTP2.Pool do

# Issue request to the upstream server. We store a ref to the request so we
# know who to respond to when we've completed everything
def connected({:call, {from_pid, _} = from}, {:request, req, opts}, data) do
case HTTP2.request(
data.conn,
req.method,
Finch.Request.request_path(req),
req.headers,
req.body
) do
{:ok, conn, ref} ->
data =
data
|> put_in([:conn], conn)
|> put_in([:requests, ref], from_pid)

# Set a timeout to close the request after a given timeout
actions = [
{:reply, from, {:ok, ref}},
{{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil}
]

{:keep_state, data, actions}

{:error, conn, %HTTPError{reason: :closed_for_writing}} ->
data = put_in(data.conn, conn)
def connected({:call, from}, {:request, req, opts}, data) do
request = RequestStream.new(req.body, from)

with {:ok, data, ref} <- request(data, req),
data = put_in(data.requests[ref], request),
{:ok, data, actions} <- continue_request(data, ref) do
# Set a timeout to close the request after a given timeout
request_timeout = {{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil}

{:keep_state, data, actions ++ [request_timeout]}
else
{:error, data, %HTTPError{reason: :closed_for_writing}} ->
actions = [{:reply, from, {:error, "read_only"}}]

if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do
if HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) do
{:next_state, :connected_read_only, data, actions}
else
{:next_state, :disconnected, data, actions}
end

{:error, conn, error} ->
data = put_in(data.conn, conn)
{:error, data, error} ->
actions = [{:reply, from, {:error, error}}]

if HTTP2.open?(conn) do
if HTTP2.open?(data.conn) do
{:keep_state, data, actions}
else
{:next_state, :disconnected, data, actions}
Expand All @@ -306,17 +294,18 @@ defmodule Finch.HTTP2.Pool do
case HTTP2.stream(data.conn, message) do
{:ok, conn, responses} ->
data = put_in(data.conn, conn)
{data, actions} = handle_responses(data, responses)
{data, response_actions} = handle_responses(data, responses)

cond do
HTTP2.open?(conn, :write) ->
{:keep_state, data, actions}
HTTP2.open?(data.conn, :write) ->
{data, streaming_actions} = continue_requests(data)
{:keep_state, data, response_actions ++ streaming_actions}

HTTP2.open?(conn, :read) && Enum.any?(data.requests) ->
{:next_state, :connected_read_only, data, actions}
HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) ->
{:next_state, :connected_read_only, data, response_actions}

true ->
{:next_state, :disconnected, data, actions}
{:next_state, :disconnected, data, response_actions}
end

{:error, conn, error, responses} ->
Expand All @@ -341,10 +330,10 @@ defmodule Finch.HTTP2.Pool do
end

def connected({:timeout, {:request_timeout, ref}}, _content, data) do
with {:pop, {from, data}} when not is_nil(from) <- {:pop, pop_in(data.requests[ref])},
with {:pop, {request, data}} when not is_nil(request) <- {:pop, pop_in(data.requests[ref])},
{:ok, conn} <- HTTP2.cancel_request(data.conn, ref) do
data = put_in(data.conn, conn)
send(from, {:error, ref, Error.exception(:request_timeout)})
send(request.from_pid, {:error, ref, Error.exception(:request_timeout)})
{:keep_state, data}
else
{:error, conn, _error} ->
Expand Down Expand Up @@ -373,8 +362,20 @@ defmodule Finch.HTTP2.Pool do
@doc false
def connected_read_only(event, content, data)

def connected_read_only(:enter, _old_state, _data) do
:keep_state_and_data
def connected_read_only(:enter, _old_state, data) do
{actions, data} =
Enum.flat_map_reduce(data.requests, data, fn
# request is awaiting a response and should stay in state
{_ref, %{status: :done}}, data ->
{[], data}

# request is still sending data and should be discarded
{ref, %{status: :streaming} = request}, data ->
{^request, data} = pop_in(data.requests[ref])
{[{:reply, request.from, {:error, Error.exception(:read_only)}}], data}
end)

{:keep_state, data, actions}
end

# If we're in a read only state than respond with an error immediately
Expand Down Expand Up @@ -432,11 +433,11 @@ defmodule Finch.HTTP2.Pool do
# We might get a request timeout that fired in the moment when we received the
# whole request, so we don't have the request in the state but we get the
# timer event anyways. In those cases, we don't do anything.
{from, data} = pop_in(data.requests[ref])
{request, data} = pop_in(data.requests[ref])

# Its possible that the request doesn't exist so we guard against that here.
if from != nil do
send(from, {:error, ref, Error.exception(:request_timeout)})
if request != nil do
send(request.from_pid, {:error, ref, Error.exception(:request_timeout)})
end

# If we're out of requests then we should enter the disconnected state.
Expand All @@ -456,22 +457,22 @@ defmodule Finch.HTTP2.Pool do

defp handle_response(data, {kind, ref, _value} = response, actions)
when kind in [:status, :headers, :data] do
if from = data.requests[ref] do
send(from, response)
if request = data.requests[ref] do
send(request.from_pid, response)
end

{data, actions}
end

defp handle_response(data, {:done, ref} = response, actions) do
{pid, data} = pop_in(data.requests[ref])
if pid, do: send(pid, response)
{request, data} = pop_in(data.requests[ref])
if request, do: send(request.from_pid, response)
{data, [cancel_request_timeout_action(ref) | actions]}
end

defp handle_response(data, {:error, ref, _error} = response, actions) do
{pid, data} = pop_in(data.requests[ref])
if pid, do: send(pid, response)
{request, data} = pop_in(data.requests[ref])
if request, do: send(request.from_pid, response)
{data, [cancel_request_timeout_action(ref) | actions]}
end

Expand All @@ -498,4 +499,82 @@ defmodule Finch.HTTP2.Pool do
max_sleep = trunc(min(max_backoff, base_backoff * factor))
:rand.uniform(max_sleep)
end

# a wrapper around Mint.HTTP2.request/5
# wrapping allows us to more easily encapsulate the conn within `data`
defp request(data, req) do
body = if req.body == nil, do: nil, else: :stream

case HTTP2.request(data.conn, req.method, Finch.Request.request_path(req), req.headers, body) do
{:ok, conn, ref} -> {:ok, put_in(data.conn, conn), ref}
{:error, conn, reason} -> {:error, put_in(data.conn, conn), reason}
end
end

# this is also a wrapper (Mint.HTTP2.stream_request_body/3)
defp stream_request_body(data, ref, body) do
case HTTP2.stream_request_body(data.conn, ref, body) do
{:ok, conn} -> {:ok, put_in(data.conn, conn)}
{:error, conn, reason} -> {:error, put_in(data.conn, conn), reason}
end
end

defp stream_chunks(data, ref, body) do
with {:ok, data} <- stream_request_body(data, ref, body) do
if data.requests[ref].status == :done do
stream_request_body(data, ref, :eof)
else
{:ok, data}
end
end
end

defp continue_requests(data) do
Enum.reduce(data.requests, {data, []}, fn {ref, request}, {data, actions} ->
with true <- request.status == :streaming,
true <- HTTP2.open?(data.conn, :write),
{:ok, data, new_actions} <- continue_request(data, ref) do
{data, new_actions ++ actions}
else
false ->
{data, actions}

{:error, data, %HTTPError{reason: :closed_for_writing}} ->
{data, [{:reply, request.from, {:error, "read_only"}} | actions]}

{:error, data, reason} ->
{data, [{:reply, request.from, {:error, reason}} | actions]}
end
end)
end

defp continue_request(data, ref) do
request = data.requests[ref]
reply_action = {:reply, request.from, {:ok, ref}}

with :streaming <- request.status,
window = smallest_window(data.conn, ref),
{request, chunks} = RequestStream.next_chunk(request, window),
data = put_in(data.requests[ref], request),
{:ok, data} <- stream_chunks(data, ref, chunks) do
actions = if request.status == :done, do: [reply_action], else: []

{:ok, data, actions}
else
:done ->
{:ok, data, [reply_action]}

{:error, data, reason} ->
{_from, data} = pop_in(data.requests[ref])

{:error, data, reason}
end
end

defp smallest_window(conn, ref) do
min(
HTTP2.get_window_size(conn, :connection),
HTTP2.get_window_size(conn, {:request, ref})
)
end
end
91 changes: 91 additions & 0 deletions lib/finch/http2/request_stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
defmodule Finch.HTTP2.RequestStream do
@moduledoc false

defstruct [:body, :from, :from_pid, :status, :buffer, :continuation]

def new(body, {from_pid, _from_ref} = from) do
enumerable =
case body do
{:stream, stream} -> stream
data -> List.wrap(data)
end

enumerable =
Stream.map(enumerable, fn
binary when is_binary(binary) ->
{binary, byte_size(binary)}

io_data ->
binary = IO.iodata_to_binary(io_data)
{binary, byte_size(binary)}
end)

reducer = &reduce_with_suspend/2

%__MODULE__{
body: body,
from: from,
from_pid: from_pid,
status: if(body == nil, do: :done, else: :streaming),
buffer: <<>>,
continuation: &Enumerable.reduce(enumerable, &1, reducer)
}
end

defp reduce_with_suspend(
{message, message_size},
{message_buffer, message_buffer_size, window}
)
when message_size + message_buffer_size > window do
{:suspend,
{[{message, message_size} | message_buffer], message_size + message_buffer_size, window}}
end

defp reduce_with_suspend(
{message, message_size},
{message_buffer, message_buffer_size, window}
) do
{:cont, {[message | message_buffer], message_size + message_buffer_size, window}}
end

# gets the next chunk of data that will fit into the given window size
def next_chunk(request, window)

# when the buffer is empty, continue reducing the stream
def next_chunk(%__MODULE__{buffer: <<>>} = request, window) do
continue_reduce(request, {[], 0, window})
end

def next_chunk(%__MODULE__{buffer: buffer} = request, window) do
case buffer do
<<bytes_to_send::binary-size(window), rest::binary>> ->
# when the buffer contains more bytes than a window, send as much of the
# buffer as we can
{put_in(request.buffer, rest), bytes_to_send}

_ ->
# when the buffer can fit in the windows, continue reducing using the buffer
# as the accumulator
continue_reduce(request, {[buffer], byte_size(buffer), window})
end
end

defp continue_reduce(request, acc) do
case request.continuation.({:cont, acc}) do
{finished, {messages, _size, _window}} when finished in [:done, :halted] ->
{put_in(request.status, :done), Enum.reverse(messages)}

{:suspended,
{[{overload_message, overload_message_size} | messages_that_fit], total_size, window_size},
next_continuation} ->
fittable_size = window_size - (total_size - overload_message_size)

<<fittable_binary::binary-size(fittable_size), overload_binary::binary>> =
overload_message

request = %{request | continuation: next_continuation, buffer: overload_binary}

{request, Enum.reverse([fittable_binary | messages_that_fit])}
end
end
end
Loading