Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use closure for BufferedTask Task.Supervisor.async_nolink call #707

Merged
merged 7 commits into from Sep 12, 2018

Separate usage of Task.Supervisor

Have a separate subtree for each fetcher with a supervisor supervising
the fetcher and a Task.Supervisor that is only used by the fetcher, so
that looking at the Task.Supervisor in that subtree will show which
fetcher is causing any memory, message, or blocking problems.
  • Loading branch information...
KronicDeth committed Sep 11, 2018
commit 09051ceb8ae50a0c8d6228e60c1ba50b7ad1ded0
@@ -76,7 +76,7 @@
#
{Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Socket Task),
excluded_lastnames: ~w(Address DateTime Full Name Number Repo Time Unit),
excluded_lastnames: ~w(Address DateTime Fetcher Full Name Number Repo Time Unit),
priority: :low},

# For some checks, you can also set other parameters
@@ -6,12 +6,12 @@ defmodule Indexer.Application do
use Application

alias Indexer.{
CoinBalanceFetcher,
CoinBalance,
BlockFetcher,
InternalTransactionFetcher,
PendingTransactionFetcher,
TokenFetcher,
TokenBalanceFetcher
InternalTransaction,
PendingTransaction,
Token,
TokenBalance
}

@impl Application
@@ -28,13 +28,14 @@ defmodule Indexer.Application do
|> Enum.into(%{})

children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},
{CoinBalanceFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalanceFetcher]]},
{PendingTransactionFetcher, name: PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{InternalTransactionFetcher,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransactionFetcher]]},
{TokenFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenFetcher]]},
{TokenBalanceFetcher, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalanceFetcher]]},
{CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalance.Supervisor]]},
{PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{InternalTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransaction.Supervisor]]},
{Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: Token.Supervisor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalance.Supervisor]]},
{BlockFetcher.Supervisor, [block_fetcher_supervisor_named_arguments, [name: BlockFetcher.Supervisor]]}
]

@@ -10,12 +10,12 @@ defmodule Indexer.BlockFetcher.Catchup do
alias Explorer.Chain

alias Indexer.{
CoinBalanceFetcher,
BlockFetcher,
InternalTransactionFetcher,
CoinBalance,
InternalTransaction,
Sequence,
TokenFetcher,
TokenBalanceFetcher
Token,
TokenBalance
}

@behaviour BlockFetcher
@@ -130,20 +130,20 @@ defmodule Indexer.BlockFetcher.Catchup do
block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash))
%{address_hash: address_hash, block_number: block_number}
end)
|> CoinBalanceFetcher.async_fetch_balances()
|> CoinBalance.Fetcher.async_fetch_balances()

transaction_hashes
|> Enum.map(fn transaction_hash ->
block_number = Map.fetch!(transaction_hash_to_block_number, to_string(transaction_hash))
%{block_number: block_number, hash: transaction_hash}
end)
|> InternalTransactionFetcher.async_fetch(10_000)
|> InternalTransaction.Fetcher.async_fetch(10_000)

tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()

TokenBalanceFetcher.async_fetch(token_balances)
TokenBalance.Fetcher.async_fetch(token_balances)
end

defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
@@ -0,0 +1,110 @@
defmodule Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
"""

# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer

require Logger

alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup

# milliseconds
@block_interval 5_000

@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil

def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end

@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end

@impl GenServer
def init(named_arguments) do
state = new(named_arguments)

send(self(), :catchup_index)

{:ok, state}
end

defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}

block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))

%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
}
end

@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup, :task, [catchup])}}
end

def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")

BoundInterval.increase(bound_interval)

_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")

BoundInterval.decrease(bound_interval)
end

Process.demonitor(ref, [:flush])

interval = new_bound_interval.current

Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)

Process.send_after(self(), :catchup_index, interval)

{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
end

def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)

send(self(), :catchup_index)

{:noreply, %__MODULE__{state | task: nil}}
end
end
@@ -1,110 +1,38 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
Supervises `Indexer.BlockFetcher.Catchup.TaskSupervisor` and `Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor`
"""

# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
use Supervisor

require Logger
alias Indexer.BlockFetcher.Catchup.BoundIntervalSupervisor

alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup

# milliseconds
@block_interval 5_000

@enforce_keys ~w(bound_interval catchup)a
defstruct bound_interval: nil,
catchup: %Catchup{},
task: nil

def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end

@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_map(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end

@impl GenServer
def init(named_arguments) do
state = new(named_arguments)

send(self(), :catchup_index)

{:ok, state}
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end

defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}

block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))

%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
bound_interval: bound_interval
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
end

@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Indexer.TaskSupervisor, Catchup, :task, [catchup])}}
Supervisor.child_spec(default, [])
end

def handle_info(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%__MODULE__{
bound_interval: bound_interval,
task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{first_block_number}-0")

BoundInterval.increase(bound_interval)

_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")

BoundInterval.decrease(bound_interval)
end

Process.demonitor(ref, [:flush])

interval = new_bound_interval.current

Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)

Process.send_after(self(), :catchup_index, interval)

{:noreply, %__MODULE__{state | bound_interval: new_bound_interval, task: nil}}
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end

def handle_info(
{:DOWN, ref, :process, pid, reason},
%__MODULE__{task: %Task{pid: pid, ref: ref}} = state
) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)

send(self(), :catchup_index)

{:noreply, %__MODULE__{state | task: nil}}
@impl Supervisor
def init(bound_interval_supervisor_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.BlockFetcher.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}
],
strategy: :one_for_one
)
end
end
@@ -12,7 +12,7 @@ defmodule Indexer.BlockFetcher.Realtime do

alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{AddressExtraction, BlockFetcher, TokenBalances, TokenFetcher}
alias Indexer.{AddressExtraction, BlockFetcher, Token, TokenBalances}

@behaviour BlockFetcher

@@ -154,7 +154,7 @@ defmodule Indexer.BlockFetcher.Realtime do
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()
end

defp internal_transactions(
@@ -158,6 +158,19 @@ defmodule Indexer.BufferedTask do
GenServer.call(server, {:buffer, entries}, timeout)
end

def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end

def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}

Supervisor.child_spec(default, [])
end

@doc false
def debug_count(server) do
GenServer.call(server, :debug_count)
@@ -198,7 +211,7 @@ defmodule Indexer.BufferedTask do
| {:state, state}
]}
) :: {:ok, pid()} | {:error, {:already_started, pid()}}
def start_link([{module, base_init_opts}, genserver_opts]) do
def start_link({module, base_init_opts}, genserver_opts \\ []) do
default_opts = Application.get_all_env(:indexer)
init_opts = Keyword.merge(default_opts, base_init_opts)

@@ -1,4 +1,4 @@
defmodule Indexer.CoinBalanceFetcher do
defmodule Indexer.CoinBalance.Fetcher do
@moduledoc """
Fetches `t:Explorer.Chain.Address.CoinBalance.t/0` and updates `t:Explorer.Chain.Address.t/0` `fetched_coin_balance` and
`fetched_coin_balance_block_number` to value at max `t:Explorer.Chain.Address.CoinBalance.t/0` `block_number` for the given `t:Explorer.Chain.Address.t/` `hash`.
@@ -19,7 +19,7 @@ defmodule Indexer.CoinBalanceFetcher do
max_batch_size: 500,
max_concurrency: 4,
init_chunk_size: 1000,
task_supervisor: Indexer.TaskSupervisor
task_supervisor: Indexer.CoinBalance.TaskSupervisor
]

@doc """
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.