Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round robin pool selection #45

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
elixir: ['1.7.4', '1.10.2']
include:
- elixir: '1.7.4'
otp: '19.x'
otp: '21.2'
- elixir: '1.10.2'
otp: '22.x'
runs-on: ubuntu-16.04
Expand Down
32 changes: 27 additions & 5 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Finch do
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)

alias Finch.PoolManager
alias Finch.{Pool, PoolManager}

use Supervisor

Expand All @@ -30,6 +30,7 @@ defmodule Finch do
@atom_to_method Enum.zip(@atom_methods, @methods) |> Enum.into(%{})
@default_pool_size 10
@default_pool_count 1
@pool_selection_strategies [:random, :round_robin]

@pool_config_schema [
protocol: [
Expand All @@ -49,9 +50,22 @@ defmodule Finch do
],
conn_opts: [
type: :keyword_list,
doc:
"These options are passed to `Mint.HTTP.connect/4` whenever a new connection is established. `:mode` is not configurable as Finch must control this setting. Typically these options are used to configure proxying, https settings, or connect timeouts.",
doc: """
These options are passed to `Mint.HTTP.connect/4` whenever a new connection is established.
`:mode` is not configurable as Finch must control this setting.
Typically these options are used to configure proxying, https settings, or connect timeouts.
""",
default: []
],
strategy: [
type: {:one_of, @pool_selection_strategies},
doc: """
How requests will be routed to your pools. Only relevant when count > 1.
The following options are available: `#{
Enum.map_join(@pool_selection_strategies, "`, `", &inspect/1)
}`
""",
default: :random
]
]

Expand All @@ -66,7 +80,7 @@ defmodule Finch do
The following atom methods are supported: `#{Enum.map_join(@atom_methods, "`, `", &inspect/1)}`.
You can use any arbitrary method by providing it as a `String.t()`.
"""
@type http_method() :: :get | :post | :head | :patch | :delete | :options | :put | String.t()
@type http_method() :: :get | :post | :put | :patch | :delete | :head | :options | String.t()

@typedoc """
A Uniform Resource Locator, the address of a resource on the Web.
Expand Down Expand Up @@ -151,7 +165,7 @@ defmodule Finch do

shp = {uri.scheme, uri.host, uri.port}

{pool, pool_mod} = PoolManager.get_pool(name, shp)
{pool, {pool_mod, _}} = PoolManager.get_pool(name, shp)
pool_mod.request(pool, req, opts)
end
end
Expand Down Expand Up @@ -254,11 +268,19 @@ defmodule Finch do
size: valid[:size],
count: valid[:count],
conn_opts: valid[:conn_opts],
strategy: pool_strategy(valid[:strategy]),
protocol: valid[:protocol]
}
end

defp supervisor_name(name), do: :"#{name}.Supervisor"
defp manager_name(name), do: :"#{name}.PoolManager"
defp pool_supervisor_name(name), do: :"#{name}.PoolSupervisor"

defp pool_strategy(type) do
case type do
:round_robin -> Pool.SelectionStrategy.RoundRobin
:random -> Pool.SelectionStrategy.Random
end
end
end
2 changes: 1 addition & 1 deletion lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Finch.Conn do
scheme: scheme,
host: host,
port: port,
opts: opts.conn_opts,
opts: opts,
parent: parent,
last_checkin: System.monotonic_time(),
mint: nil
Expand Down
41 changes: 24 additions & 17 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ defmodule Finch.HTTP1.Pool do
}
end

def start_link({shp, registry_name, pool_size, conn_opts}) do
opts = [worker: {__MODULE__, {registry_name, shp, conn_opts}}, pool_size: pool_size]
def start_link({shp, registry_name, pool_config}) do
state = %{
shp: shp,
conn_opts: pool_config.conn_opts,
registry_value: pool_config.registry_value
}

opts = [worker: {__MODULE__, {registry_name, state}}, pool_size: pool_config.size]
NimblePool.start_link(opts)
end

Expand Down Expand Up @@ -57,44 +63,45 @@ defmodule Finch.HTTP1.Pool do
end

@impl NimblePool
def init_pool({registry, shp, opts}) do
# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
{:ok, {shp, opts}}
def init_pool({registry, state}) do
# Register our pool with our module name and the registry value as the key.
# This allows the caller to determine the correct pool module to use to make the request,
# and use alternative pool selection strategies.
{:ok, _} = Registry.register(registry, state.shp, {__MODULE__, state.registry_value})
{:ok, state}
end

@impl NimblePool
def init_worker({{scheme, host, port}, opts} = pool_state) do
{:ok, Conn.new(scheme, host, port, opts, self()), pool_state}
def init_worker(%{shp: {scheme, host, port}} = pool_state) do
{:ok, Conn.new(scheme, host, port, pool_state.conn_opts, self()), pool_state}
end

@impl NimblePool
def handle_checkout(:checkout, _, %{mint: nil} = conn) do
def handle_checkout(:checkout, _, %{mint: nil} = conn, pool_state) do
idle_time = System.monotonic_time() - conn.last_checkin
{:ok, {conn, idle_time}, conn}
{:ok, {conn, idle_time}, conn, pool_state}
end

def handle_checkout(:checkout, {pid, _}, conn) do
def handle_checkout(:checkout, {pid, _}, conn, pool_state) do
idle_time = System.monotonic_time() - conn.last_checkin

with {:ok, conn} <- Conn.set_mode(conn, :passive),
:ok <- Conn.transfer(conn, pid) do
{:ok, {conn, idle_time}, conn}
{:ok, {conn, idle_time}, conn, pool_state}
else
{:error, _error} ->
{:remove, :closed}
{:remove, :closed, pool_state}
end
end

@impl NimblePool
def handle_checkin(state, _from, conn) do
def handle_checkin(state, _from, conn, pool_state) do
with :prechecked <- state,
{:ok, conn} <- Conn.set_mode(conn, :active) do
{:ok, %{conn | last_checkin: System.monotonic_time()}}
{:ok, %{conn | last_checkin: System.monotonic_time()}, pool_state}
else
_ ->
{:remove, :closed}
{:remove, :closed, pool_state}
end
end

Expand Down
46 changes: 28 additions & 18 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule Finch.HTTP2.Pool do
# get the process unstuck.
fail_safe_timeout = if is_integer(timeout), do: max(2000, timeout * 2), else: :infinity
start_time = Telemetry.start(:response, metadata)

try do
result = response_waiting_loop(ref, monitor, %Response{}, fail_safe_timeout)

Expand Down Expand Up @@ -94,13 +95,19 @@ defmodule Finch.HTTP2.Pool do
end
end

def start_link(opts) do
:gen_statem.start_link(__MODULE__, opts, [])
def start_link({shp, registry, pool_config}) do
state = %{
shp: shp,
conn_opts: pool_config.conn_opts,
registry_value: pool_config.registry_value
}

:gen_statem.start_link(__MODULE__, {registry, state}, [])
end

@impl true
def init({{scheme, host, port}=shp, registry, _pool_size, pool_opts}) do
{:ok, _} = Registry.register(registry, shp, __MODULE__)
def init({registry, %{shp: {scheme, host, port}} = state}) do
{:ok, _} = Registry.register(registry, state.shp, {__MODULE__, state.registry_value})

data = %{
conn: nil,
Expand All @@ -110,7 +117,7 @@ defmodule Finch.HTTP2.Pool do
requests: %{},
backoff_base: 500,
backoff_max: 10_000,
connect_opts: pool_opts[:conn_opts] || [],
connect_opts: state.conn_opts
}

{:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}}
Expand All @@ -126,9 +133,10 @@ defmodule Finch.HTTP2.Pool do
# When entering a disconnected state we need to fail all of the pending
# requests
def disconnected(:enter, _, data) do
:ok = Enum.each(data.requests, fn {ref, from} ->
send(from, {:error, ref, %{reason: :connection_closed}})
end)
:ok =
Enum.each(data.requests, fn {ref, from} ->
send(from, {:error, ref, %{reason: :connection_closed}})
end)

data =
data
Expand All @@ -144,9 +152,11 @@ defmodule Finch.HTTP2.Pool do
metadata = %{
scheme: data.scheme,
host: data.host,
port: data.port,
port: data.port
}

start = Telemetry.start(:connect)

case HTTP2.connect(data.scheme, data.host, data.port, data.connect_opts) do
{:ok, conn} ->
Telemetry.stop(:connect, start, metadata)
Expand All @@ -156,6 +166,7 @@ defmodule Finch.HTTP2.Pool do
{:error, error} ->
metadata = Map.put(metadata, :error, error)
Telemetry.stop(:connect, start, metadata)

Logger.error([
"Failed to connect to #{data.scheme}://#{data.host}:#{data.port}: ",
Exception.message(error)
Expand All @@ -177,14 +188,13 @@ defmodule Finch.HTTP2.Pool do
{:keep_state_and_data, {:reply, from, {:error, %{reason: :disconnected}}}}
end

# We cancel all request timeouts as soon as we enter the :disconnected state, but
# some timeouts might fire while changing states, so we need to handle them here.
# Since we replied to all pending requests when entering the :disconnected state,
# we can just do nothing here.
def disconnected({:timeout, {:request_timeout, _ref}}, _content, _data) do
:keep_state_and_data
end

# We cancel all request timeouts as soon as we enter the :disconnected state, but
# some timeouts might fire while changing states, so we need to handle them here.
# Since we replied to all pending requests when entering the :disconnected state,
# we can just do nothing here.
def disconnected({:timeout, {:request_timeout, _ref}}, _content, _data) do
:keep_state_and_data
end

@doc false
def connected(event, content, data)
Expand All @@ -195,7 +205,7 @@ defmodule Finch.HTTP2.Pool do

# Issue request to the upstream server. We store a ref to the request so we
# know who to respond to when we've completed everything
def connected({:call, {from_pid, _}=from}, {:request, req, opts}, data) do
def connected({:call, {from_pid, _} = from}, {:request, req, opts}, data) do
case HTTP2.request(data.conn, req.method, req.path, req.headers, req.body) do
{:ok, conn, ref} ->
data =
Expand Down
1 change: 1 addition & 0 deletions lib/finch/pool.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Finch.Pool do
@moduledoc false

# Defines a behaviour that both http1 and http2 pools need to implement.

@callback request(pid(), map(), list()) :: {:ok, Finch.Response.t()} | {:error, term()}
Expand Down
23 changes: 23 additions & 0 deletions lib/finch/pool/selection_strategy.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Finch.Pool.SelectionStrategy do
@moduledoc """
Strategies for choosing which pool to route a request to.
"""

@type pool_config :: map()
@type pool_mod :: module()
@type registry_value :: map()
@type pool_list :: [{pid(), {pool_mod(), registry_value()}}]

@doc """
Returns a map that will be stored alongside the pool pids in the Regsitry.
Must include at least a `:strategy` key with a Strategy module as the value,
but can also include more information to be used by the strategy implementation.
"""
@callback registry_value(pool_config()) :: registry_value()

@doc """
Receives the list of pool pids that is stored in the Registry and must
return a single pool pid.
"""
@callback choose_pool(pool_list()) :: pid()
end
15 changes: 15 additions & 0 deletions lib/finch/pool/selection_strategy/random.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Finch.Pool.SelectionStrategy.Random do
@moduledoc """
Randomly chooses a pool. Minimal overhead, but not necessarily optimal.
"""

@behaviour Finch.Pool.SelectionStrategy

@impl true
def registry_value(_), do: %{strategy: __MODULE__}

@impl true
def choose_pool(pids) do
Enum.random(pids)
end
end
19 changes: 19 additions & 0 deletions lib/finch/pool/selection_strategy/round_robin.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Finch.Pool.SelectionStrategy.RoundRobin do
@moduledoc """
Cycle through the pools one by one, in a consistent order.
"""
@behaviour Finch.Pool.SelectionStrategy

@impl true
def registry_value(%{count: count}) do
atomics = :atomics.new(1, signed: false)
%{strategy: __MODULE__, count: count, atomics: atomics}
end

@impl true
def choose_pool([{_, {_, registry_value}} | _] = pools) do
%{atomics: atomics, count: count} = registry_value
index = :atomics.add_get(atomics, 1, 1)
Enum.at(pools, rem(index, count))
end
end
17 changes: 12 additions & 5 deletions lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ defmodule Finch.PoolManager do
pool

pools ->
# TODO implement alternative strategies
Enum.random(pools)
choose_pool(pools)
end
end

Expand All @@ -57,13 +56,13 @@ defmodule Finch.PoolManager do

defp do_start_pools(shp, config) do
pool_config = pool_config(config, shp)
pool_args = {shp, config.registry_name, pool_config.size, pool_config}
registry_value = pool_registry_value(pool_config)
pool_args = {shp, config.registry_name, Map.put(pool_config, :registry_value, registry_value)}
pool_mod = pool_mod(pool_config.protocol)

Enum.map(1..pool_config.count, fn _ ->
# Choose pool type here...
{:ok, pid} = DynamicSupervisor.start_child(config.supervisor_name, {pool_mod, pool_args})
{pid, pool_mod}
{pid, {pool_mod, registry_value}}
end)
|> hd()
end
Expand All @@ -77,4 +76,12 @@ defmodule Finch.PoolManager do

defp pool_mod(:http1), do: Finch.HTTP1.Pool
defp pool_mod(:http2), do: Finch.HTTP2.Pool

defp pool_registry_value(%{strategy: strategy} = pool_config) do
strategy.registry_value(pool_config)
end

defp choose_pool([{_, {_, %{strategy: strategy}}} | _] = pools) do
strategy.choose_pool(pools)
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Finch.MixProject do
[
{:mint, "~> 1.0"},
{:castore, "~> 0.1.5"},
{:nimble_pool, "~> 0.1.0"},
# {:nimble_pool, "~> 0.1.0"},
{:nimble_pool, github: "dashbitco/nimble_pool"},
{:nimble_options, "~> 0.2.0"},
{:telemetry, "~> 0.4.0"},
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
Expand Down
Loading