diff --git a/lib/finch.ex b/lib/finch.ex index 4a1f1418..2f6259a3 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -197,9 +197,9 @@ defmodule Finch do @doc """ Builds an HTTP request to be sent with `request/3` or `stream/4`. - When making HTTP/1.x requests, it is possible to send the request body in a streaming fashion. - In order to do so, the `body` parameter needs to take form of a tuple `{:stream, body_stream}`, - where `body_stream` is a `Stream`. This feature is not yet supported for HTTP/2 requests. + It is possible to send the request body in a streaming fashion. In order to do so, the + `body` parameter needs to take form of a tuple `{:stream, body_stream}`, where `body_stream` + is a `Stream`. """ @spec build(Request.method(), Request.url(), Request.headers(), Request.body()) :: Request.t() defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index ddc24982..f13c73ce 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -9,6 +9,7 @@ defmodule Finch.HTTP2.Pool do alias Finch.Error alias Finch.Telemetry alias Finch.SSL + alias Finch.HTTP2.RequestStream require Logger @@ -152,8 +153,8 @@ defmodule Finch.HTTP2.Pool do # requests def disconnected(:enter, _, data) do :ok = - Enum.each(data.requests, fn {ref, from} -> - send(from, {:error, ref, Error.exception(:connection_closed)}) + Enum.each(data.requests, fn {ref, request} -> + send(request.from_pid, {:error, ref, Error.exception(:connection_closed)}) end) # It's possible that we're entering this state before we are alerted of the @@ -246,43 +247,30 @@ defmodule Finch.HTTP2.Pool do # Issue request to the upstream server. We store a ref to the request so we # know who to respond to when we've completed everything - def connected({:call, {from_pid, _} = from}, {:request, req, opts}, data) do - case HTTP2.request( - data.conn, - req.method, - Finch.Request.request_path(req), - req.headers, - req.body - ) do - {:ok, conn, ref} -> - data = - data - |> put_in([:conn], conn) - |> put_in([:requests, ref], from_pid) - - # Set a timeout to close the request after a given timeout - actions = [ - {:reply, from, {:ok, ref}}, - {{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil} - ] - - {:keep_state, data, actions} - - {:error, conn, %HTTPError{reason: :closed_for_writing}} -> - data = put_in(data.conn, conn) + def connected({:call, from}, {:request, req, opts}, data) do + request = RequestStream.new(req.body, from) + + with {:ok, data, ref} <- request(data, req), + data = put_in(data.requests[ref], request), + {:ok, data, actions} <- continue_request(data, ref) do + # Set a timeout to close the request after a given timeout + request_timeout = {{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil} + + {:keep_state, data, actions ++ [request_timeout]} + else + {:error, data, %HTTPError{reason: :closed_for_writing}} -> actions = [{:reply, from, {:error, "read_only"}}] - if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do + if HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) do {:next_state, :connected_read_only, data, actions} else {:next_state, :disconnected, data, actions} end - {:error, conn, error} -> - data = put_in(data.conn, conn) + {:error, data, error} -> actions = [{:reply, from, {:error, error}}] - if HTTP2.open?(conn) do + if HTTP2.open?(data.conn) do {:keep_state, data, actions} else {:next_state, :disconnected, data, actions} @@ -306,17 +294,18 @@ defmodule Finch.HTTP2.Pool do case HTTP2.stream(data.conn, message) do {:ok, conn, responses} -> data = put_in(data.conn, conn) - {data, actions} = handle_responses(data, responses) + {data, response_actions} = handle_responses(data, responses) cond do - HTTP2.open?(conn, :write) -> - {:keep_state, data, actions} + HTTP2.open?(data.conn, :write) -> + {data, streaming_actions} = continue_requests(data) + {:keep_state, data, response_actions ++ streaming_actions} - HTTP2.open?(conn, :read) && Enum.any?(data.requests) -> - {:next_state, :connected_read_only, data, actions} + HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) -> + {:next_state, :connected_read_only, data, response_actions} true -> - {:next_state, :disconnected, data, actions} + {:next_state, :disconnected, data, response_actions} end {:error, conn, error, responses} -> @@ -341,10 +330,10 @@ defmodule Finch.HTTP2.Pool do end def connected({:timeout, {:request_timeout, ref}}, _content, data) do - with {:pop, {from, data}} when not is_nil(from) <- {:pop, pop_in(data.requests[ref])}, + with {:pop, {request, data}} when not is_nil(request) <- {:pop, pop_in(data.requests[ref])}, {:ok, conn} <- HTTP2.cancel_request(data.conn, ref) do data = put_in(data.conn, conn) - send(from, {:error, ref, Error.exception(:request_timeout)}) + send(request.from_pid, {:error, ref, Error.exception(:request_timeout)}) {:keep_state, data} else {:error, conn, _error} -> @@ -373,8 +362,20 @@ defmodule Finch.HTTP2.Pool do @doc false def connected_read_only(event, content, data) - def connected_read_only(:enter, _old_state, _data) do - :keep_state_and_data + def connected_read_only(:enter, _old_state, data) do + {actions, data} = + Enum.flat_map_reduce(data.requests, data, fn + # request is awaiting a response and should stay in state + {_ref, %{status: :done}}, data -> + {[], data} + + # request is still sending data and should be discarded + {ref, %{status: :streaming} = request}, data -> + {^request, data} = pop_in(data.requests[ref]) + {[{:reply, request.from, {:error, Error.exception(:read_only)}}], data} + end) + + {:keep_state, data, actions} end # If we're in a read only state than respond with an error immediately @@ -432,11 +433,11 @@ defmodule Finch.HTTP2.Pool do # We might get a request timeout that fired in the moment when we received the # whole request, so we don't have the request in the state but we get the # timer event anyways. In those cases, we don't do anything. - {from, data} = pop_in(data.requests[ref]) + {request, data} = pop_in(data.requests[ref]) # Its possible that the request doesn't exist so we guard against that here. - if from != nil do - send(from, {:error, ref, Error.exception(:request_timeout)}) + if request != nil do + send(request.from_pid, {:error, ref, Error.exception(:request_timeout)}) end # If we're out of requests then we should enter the disconnected state. @@ -456,22 +457,22 @@ defmodule Finch.HTTP2.Pool do defp handle_response(data, {kind, ref, _value} = response, actions) when kind in [:status, :headers, :data] do - if from = data.requests[ref] do - send(from, response) + if request = data.requests[ref] do + send(request.from_pid, response) end {data, actions} end defp handle_response(data, {:done, ref} = response, actions) do - {pid, data} = pop_in(data.requests[ref]) - if pid, do: send(pid, response) + {request, data} = pop_in(data.requests[ref]) + if request, do: send(request.from_pid, response) {data, [cancel_request_timeout_action(ref) | actions]} end defp handle_response(data, {:error, ref, _error} = response, actions) do - {pid, data} = pop_in(data.requests[ref]) - if pid, do: send(pid, response) + {request, data} = pop_in(data.requests[ref]) + if request, do: send(request.from_pid, response) {data, [cancel_request_timeout_action(ref) | actions]} end @@ -498,4 +499,82 @@ defmodule Finch.HTTP2.Pool do max_sleep = trunc(min(max_backoff, base_backoff * factor)) :rand.uniform(max_sleep) end + + # a wrapper around Mint.HTTP2.request/5 + # wrapping allows us to more easily encapsulate the conn within `data` + defp request(data, req) do + body = if req.body == nil, do: nil, else: :stream + + case HTTP2.request(data.conn, req.method, Finch.Request.request_path(req), req.headers, body) do + {:ok, conn, ref} -> {:ok, put_in(data.conn, conn), ref} + {:error, conn, reason} -> {:error, put_in(data.conn, conn), reason} + end + end + + # this is also a wrapper (Mint.HTTP2.stream_request_body/3) + defp stream_request_body(data, ref, body) do + case HTTP2.stream_request_body(data.conn, ref, body) do + {:ok, conn} -> {:ok, put_in(data.conn, conn)} + {:error, conn, reason} -> {:error, put_in(data.conn, conn), reason} + end + end + + defp stream_chunks(data, ref, body) do + with {:ok, data} <- stream_request_body(data, ref, body) do + if data.requests[ref].status == :done do + stream_request_body(data, ref, :eof) + else + {:ok, data} + end + end + end + + defp continue_requests(data) do + Enum.reduce(data.requests, {data, []}, fn {ref, request}, {data, actions} -> + with true <- request.status == :streaming, + true <- HTTP2.open?(data.conn, :write), + {:ok, data, new_actions} <- continue_request(data, ref) do + {data, new_actions ++ actions} + else + false -> + {data, actions} + + {:error, data, %HTTPError{reason: :closed_for_writing}} -> + {data, [{:reply, request.from, {:error, "read_only"}} | actions]} + + {:error, data, reason} -> + {data, [{:reply, request.from, {:error, reason}} | actions]} + end + end) + end + + defp continue_request(data, ref) do + request = data.requests[ref] + reply_action = {:reply, request.from, {:ok, ref}} + + with :streaming <- request.status, + window = smallest_window(data.conn, ref), + {request, chunks} = RequestStream.next_chunk(request, window), + data = put_in(data.requests[ref], request), + {:ok, data} <- stream_chunks(data, ref, chunks) do + actions = if request.status == :done, do: [reply_action], else: [] + + {:ok, data, actions} + else + :done -> + {:ok, data, [reply_action]} + + {:error, data, reason} -> + {_from, data} = pop_in(data.requests[ref]) + + {:error, data, reason} + end + end + + defp smallest_window(conn, ref) do + min( + HTTP2.get_window_size(conn, :connection), + HTTP2.get_window_size(conn, {:request, ref}) + ) + end end diff --git a/lib/finch/http2/request_stream.ex b/lib/finch/http2/request_stream.ex new file mode 100644 index 00000000..91a179ab --- /dev/null +++ b/lib/finch/http2/request_stream.ex @@ -0,0 +1,91 @@ +defmodule Finch.HTTP2.RequestStream do + @moduledoc false + + defstruct [:body, :from, :from_pid, :status, :buffer, :continuation] + + def new(body, {from_pid, _from_ref} = from) do + enumerable = + case body do + {:stream, stream} -> stream + data -> List.wrap(data) + end + + enumerable = + Stream.map(enumerable, fn + binary when is_binary(binary) -> + {binary, byte_size(binary)} + + io_data -> + binary = IO.iodata_to_binary(io_data) + {binary, byte_size(binary)} + end) + + reducer = &reduce_with_suspend/2 + + %__MODULE__{ + body: body, + from: from, + from_pid: from_pid, + status: if(body == nil, do: :done, else: :streaming), + buffer: <<>>, + continuation: &Enumerable.reduce(enumerable, &1, reducer) + } + end + + defp reduce_with_suspend( + {message, message_size}, + {message_buffer, message_buffer_size, window} + ) + when message_size + message_buffer_size > window do + {:suspend, + {[{message, message_size} | message_buffer], message_size + message_buffer_size, window}} + end + + defp reduce_with_suspend( + {message, message_size}, + {message_buffer, message_buffer_size, window} + ) do + {:cont, {[message | message_buffer], message_size + message_buffer_size, window}} + end + + # gets the next chunk of data that will fit into the given window size + def next_chunk(request, window) + + # when the buffer is empty, continue reducing the stream + def next_chunk(%__MODULE__{buffer: <<>>} = request, window) do + continue_reduce(request, {[], 0, window}) + end + + def next_chunk(%__MODULE__{buffer: buffer} = request, window) do + case buffer do + <> -> + # when the buffer contains more bytes than a window, send as much of the + # buffer as we can + {put_in(request.buffer, rest), bytes_to_send} + + _ -> + # when the buffer can fit in the windows, continue reducing using the buffer + # as the accumulator + continue_reduce(request, {[buffer], byte_size(buffer), window}) + end + end + + defp continue_reduce(request, acc) do + case request.continuation.({:cont, acc}) do + {finished, {messages, _size, _window}} when finished in [:done, :halted] -> + {put_in(request.status, :done), Enum.reverse(messages)} + + {:suspended, + {[{overload_message, overload_message_size} | messages_that_fit], total_size, window_size}, + next_continuation} -> + fittable_size = window_size - (total_size - overload_message_size) + + <> = + overload_message + + request = %{request | continuation: next_continuation, buffer: overload_binary} + + {request, Enum.reverse([fittable_binary | messages_that_fit])} + end + end +end diff --git a/test/finch_test.exs b/test/finch_test.exs index ad267bfa..1f839540 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -216,7 +216,7 @@ defmodule FinchTest do end) end - test "successful post streaming request, with streaming body and query string", %{ + test "successful post HTTP/1 streaming request, with streaming body and query string", %{ bypass: bypass } do start_supervised!({Finch, name: finch_name()}) @@ -253,6 +253,111 @@ defmodule FinchTest do end) end + test "successful post HTTP/2 streaming request, with streaming body and query string", %{ + bypass: bypass + } do + start_supervised!( + {Finch, + name: finch_name(), + pools: %{ + default: [ + protocol: :http2, + count: 1, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} + ) + + data = :crypto.strong_rand_bytes(1_000) + # 1MB of data + req_stream = Stream.repeatedly(fn -> data end) |> Stream.take(1_000) + req_body = req_stream |> Enum.join("") + response_body = data + header_key = "content-type" + header_val = "application/octet-stream" + query_string = "query=value" + + Bypass.expect_once(bypass, "POST", "/", fn conn -> + assert conn.query_string == query_string + assert {:ok, ^req_body, conn} = Plug.Conn.read_body(conn) + + conn + |> Plug.Conn.put_resp_header(header_key, header_val) + |> Plug.Conn.send_resp(200, response_body) + end) + + assert {:ok, %Response{status: 200, headers: headers, body: ^response_body}} = + Finch.build( + :post, + endpoint(bypass, "?" <> query_string), + [{header_key, header_val}], + {:stream, req_stream} + ) + |> Finch.request(finch_name()) + + assert {_, ^header_val} = + Enum.find(headers, fn + {^header_key, _} -> true + _ -> false + end) + end + + test "successful post HTTP/2 with a large binary body", %{ + bypass: bypass + } do + start_supervised!( + {Finch, + name: finch_name(), + pools: %{ + default: [ + protocol: :http2, + count: 1, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} + ) + + data = :crypto.strong_rand_bytes(1_000) + # 2MB of data + req_body = :binary.copy(data, 2_000) + response_body = data + header_key = "content-type" + header_val = "application/octet-stream" + query_string = "query=value" + + Bypass.expect_once(bypass, "POST", "/", fn conn -> + assert conn.query_string == query_string + assert {:ok, ^req_body, conn} = Plug.Conn.read_body(conn) + + conn + |> Plug.Conn.put_resp_header(header_key, header_val) + |> Plug.Conn.send_resp(200, response_body) + end) + + assert {:ok, %Response{status: 200, headers: headers, body: ^response_body}} = + Finch.build( + :post, + endpoint(bypass, "?" <> query_string), + [{header_key, header_val}], + req_body + ) + |> Finch.request(finch_name()) + + assert {_, ^header_val} = + Enum.find(headers, fn + {^header_key, _} -> true + _ -> false + end) + end + test "successful get request, with query string, when given a %URI{}", %{bypass: bypass} do start_supervised!({Finch, name: finch_name()}) query_string = "query=value"