Skip to content

Commit

Permalink
finish merge with http2 branch, mix format
Browse files Browse the repository at this point in the history
  • Loading branch information
sneako committed May 25, 2020
1 parent c1c1699 commit 0463566
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 76 deletions.
46 changes: 28 additions & 18 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule Finch.HTTP2.Pool do
# get the process unstuck.
fail_safe_timeout = if is_integer(timeout), do: max(2000, timeout * 2), else: :infinity
start_time = Telemetry.start(:response, metadata)

try do
result = response_waiting_loop(ref, monitor, %Response{}, fail_safe_timeout)

Expand Down Expand Up @@ -94,13 +95,19 @@ defmodule Finch.HTTP2.Pool do
end
end

def start_link(opts) do
:gen_statem.start_link(__MODULE__, opts, [])
def start_link({shp, registry, pool_config}) do
state = %{
shp: shp,
conn_opts: pool_config.conn_opts,
registry_value: pool_config.registry_value
}

:gen_statem.start_link(__MODULE__, {registry, state}, [])
end

@impl true
def init({{scheme, host, port}=shp, registry, _pool_size, pool_opts}) do
{:ok, _} = Registry.register(registry, shp, __MODULE__)
def init({registry, %{shp: {scheme, host, port}} = state}) do
{:ok, _} = Registry.register(registry, state.shp, {__MODULE__, state.registry_value})

data = %{
conn: nil,
Expand All @@ -110,7 +117,7 @@ defmodule Finch.HTTP2.Pool do
requests: %{},
backoff_base: 500,
backoff_max: 10_000,
connect_opts: pool_opts[:conn_opts] || [],
connect_opts: state.conn_opts
}

{:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}}
Expand All @@ -126,9 +133,10 @@ defmodule Finch.HTTP2.Pool do
# When entering a disconnected state we need to fail all of the pending
# requests
def disconnected(:enter, _, data) do
:ok = Enum.each(data.requests, fn {ref, from} ->
send(from, {:error, ref, %{reason: :connection_closed}})
end)
:ok =
Enum.each(data.requests, fn {ref, from} ->
send(from, {:error, ref, %{reason: :connection_closed}})
end)

data =
data
Expand All @@ -144,9 +152,11 @@ defmodule Finch.HTTP2.Pool do
metadata = %{
scheme: data.scheme,
host: data.host,
port: data.port,
port: data.port
}

start = Telemetry.start(:connect)

case HTTP2.connect(data.scheme, data.host, data.port, data.connect_opts) do
{:ok, conn} ->
Telemetry.stop(:connect, start, metadata)
Expand All @@ -156,6 +166,7 @@ defmodule Finch.HTTP2.Pool do
{:error, error} ->
metadata = Map.put(metadata, :error, error)
Telemetry.stop(:connect, start, metadata)

Logger.error([
"Failed to connect to #{data.scheme}://#{data.host}:#{data.port}: ",
Exception.message(error)
Expand All @@ -177,14 +188,13 @@ defmodule Finch.HTTP2.Pool do
{:keep_state_and_data, {:reply, from, {:error, %{reason: :disconnected}}}}
end

# We cancel all request timeouts as soon as we enter the :disconnected state, but
# some timeouts might fire while changing states, so we need to handle them here.
# Since we replied to all pending requests when entering the :disconnected state,
# we can just do nothing here.
def disconnected({:timeout, {:request_timeout, _ref}}, _content, _data) do
:keep_state_and_data
end

# We cancel all request timeouts as soon as we enter the :disconnected state, but
# some timeouts might fire while changing states, so we need to handle them here.
# Since we replied to all pending requests when entering the :disconnected state,
# we can just do nothing here.
def disconnected({:timeout, {:request_timeout, _ref}}, _content, _data) do
:keep_state_and_data
end

@doc false
def connected(event, content, data)
Expand All @@ -195,7 +205,7 @@ defmodule Finch.HTTP2.Pool do

# Issue request to the upstream server. We store a ref to the request so we
# know who to respond to when we've completed everything
def connected({:call, {from_pid, _}=from}, {:request, req, opts}, data) do
def connected({:call, {from_pid, _} = from}, {:request, req, opts}, data) do
case HTTP2.request(data.conn, req.method, req.path, req.headers, req.body) do
{:ok, conn, ref} ->
data =
Expand Down
2 changes: 1 addition & 1 deletion lib/finch/pool/selection_strategy/round_robin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Finch.Pool.SelectionStrategy.RoundRobin do

@impl true
def registry_value(%{count: count}) do
atomics = :atomics.new(1, [signed: false])
atomics = :atomics.new(1, signed: false)
%{strategy: __MODULE__, count: count, atomics: atomics}
end

Expand Down
1 change: 0 additions & 1 deletion lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ defmodule Finch.PoolManager do
pool_mod = pool_mod(pool_config.protocol)

Enum.map(1..pool_config.count, fn _ ->
# Choose pool type here...
{:ok, pid} = DynamicSupervisor.start_child(config.supervisor_name, {pool_mod, pool_args})
{pid, {pool_mod, registry_value}}
end)
Expand Down
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ defmodule Finch.MixProject do
{:nimble_options, "~> 0.2.0"},
{:telemetry, "~> 0.4.0"},
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:credo, "~> 1.3", only: [:dev, :test]},
{:bypass, "~> 1.0", only: :test},
{:cowboy, "~> 2.0", only: [:dev, :test]},
Expand Down
2 changes: 0 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
"cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
"cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"},
"credo": {:hex, :credo, "1.4.0", "92339d4cbadd1e88b5ee43d427b639b68a11071b6f73854e33638e30a0ea11f5", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1fd3b70dce216574ce3c18bdf510b57e7c4c85c2ec9cad4bff854abaf7e58658"},
"dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
"jason": {:hex, :jason, "1.2.0", "10043418c42d2493d0ee212d3fddd25d7ffe484380afad769a0a38795938e448", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "116747dbe057794c3a3e4e143b7c8390b29f634e16c78a7f59ba75bfa6852e7f"},
"makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"},
Expand Down
57 changes: 33 additions & 24 deletions test/finch/http2/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,50 @@ defmodule Finch.HTTP2.IntegrationTest do
end

test "sends http2 requests", %{url: url} do
start_supervised({Finch, name: TestFinch, pools: %{
default: [
protocol: :http2,
count: 5,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}})
start_supervised(
{Finch,
name: TestFinch,
pools: %{
default: [
protocol: :http2,
count: 5,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}}
)

assert {:ok, response} = Finch.request(TestFinch, :get, url)
assert response.body == "Hello world!"
end

test "multiplexes requests over a single pool", %{url: url} do
start_supervised({Finch, name: TestFinch, pools: %{
default: [
protocol: :http2,
count: 1,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}})
start_supervised(
{Finch,
name: TestFinch,
pools: %{
default: [
protocol: :http2,
count: 1,
conn_opts: [
transport_opts: [
verify: :verify_none
]
]
]
}}
)

# We create multiple requests here using a single connection. There is a delay
# in the response. But because we allow each request to run simultaneously
# they shouldn't block each other which we check with a rough time estimates
results =
(1..50)
|> Enum.map(fn _ -> Task.async(fn ->
1..50
|> Enum.map(fn _ ->
Task.async(fn ->
start = System.monotonic_time()
{:ok, _} = Finch.request(TestFinch, :get, url <> "/wait/1000")
System.monotonic_time() - start
Expand Down
Loading

0 comments on commit 0463566

Please sign in to comment.