Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use closure for BufferedTask Task.Supervisor.async_nolink call #707

Merged
merged 7 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
#
{Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Socket Task),
excluded_lastnames: ~w(Address DateTime Full Name Number Repo Time Unit),
excluded_lastnames: ~w(Address DateTime Fetcher Full Name Number Repo Time Unit),
priority: :low},

# For some checks, you can also set other parameters
Expand Down
2 changes: 2 additions & 0 deletions apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ defmodule EthereumJSONRPCTest do
assert is_binary(subscription_id)
end

# Infura timeouts on 2018-09-12
@tag :no_geth
test "delivers new heads to caller", %{
block_interval: block_interval,
subscribe_named_arguments: subscribe_named_arguments
Expand Down
29 changes: 15 additions & 14 deletions apps/indexer/lib/indexer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ defmodule Indexer.Application do
use Application

alias Indexer.{
CoinBalanceFetcher,
BlockFetcher,
InternalTransactionFetcher,
PendingTransactionFetcher,
TokenFetcher,
TokenBalanceFetcher
Block,
CoinBalance,
InternalTransaction,
PendingTransaction,
Token,
TokenBalance
}

@impl Application
Expand All @@ -28,14 +28,15 @@ defmodule Indexer.Application do
|> Enum.into(%{})

children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},
{CoinBalanceFetcher, name: CoinBalanceFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{PendingTransactionFetcher, name: PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{InternalTransactionFetcher,
name: InternalTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{TokenFetcher, name: TokenFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{TokenBalanceFetcher, name: TokenBalanceFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{BlockFetcher.Supervisor, [block_fetcher_supervisor_named_arguments, [name: BlockFetcher.Supervisor]]}
{CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalance.Supervisor]]},
{PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{InternalTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransaction.Supervisor]]},
{Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: Token.Supervisor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalance.Supervisor]]},
{Block.Supervisor, [block_fetcher_supervisor_named_arguments, [name: Block.Supervisor]]}
]

opts = [strategy: :one_for_one, name: Indexer.Supervisor]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.Catchup.Supervisor do
defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` with exponential backoff for restarts.
"""
Expand All @@ -8,15 +8,15 @@ defmodule Indexer.BlockFetcher.Catchup.Supervisor do

require Logger

alias Indexer.{BlockFetcher, BoundInterval}
alias Indexer.BlockFetcher.Catchup
alias Indexer.{Block, BoundInterval}
alias Indexer.Block.Catchup

# milliseconds
@block_interval 5_000

@enforce_keys ~w(bound_interval catchup)a
@enforce_keys ~w(bound_interval fetcher)a
defstruct bound_interval: nil,
catchup: %Catchup{},
fetcher: %Catchup.Fetcher{},
task: nil

def child_spec(arg) do
Expand Down Expand Up @@ -45,22 +45,22 @@ defmodule Indexer.BlockFetcher.Catchup.Supervisor do
end

defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: false, callback_module: Catchup}
block_fetcher = %Block.Fetcher{common_block_fetcher | broadcast: false, callback_module: Catchup.Fetcher}

block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)
bound_interval = BoundInterval.within(minimum_interval..(minimum_interval * 10))

%__MODULE__{
catchup: %Catchup{block_fetcher: block_fetcher},
fetcher: %Catchup.Fetcher{block_fetcher: block_fetcher},
bound_interval: bound_interval
}
end

@impl GenServer
def handle_info(:catchup_index, %__MODULE__{catchup: %Catchup{} = catchup} = state) do
def handle_info(:catchup_index, %__MODULE__{fetcher: %Catchup.Fetcher{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Indexer.TaskSupervisor, Catchup, :task, [catchup])}}
%__MODULE__{state | task: Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup.Fetcher, :task, [catchup])}}
end

def handle_info(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
defmodule Indexer.BlockFetcher.Catchup do
defmodule Indexer.Block.Catchup.Fetcher do
@moduledoc """
Fetches and indexes block ranges from the block before the latest block to genesis (0) that are missing.
"""

require Logger

import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
import Indexer.Block.Fetcher, only: [fetch_and_import_range: 2]

alias Explorer.Chain

alias Indexer.{
CoinBalanceFetcher,
BlockFetcher,
InternalTransactionFetcher,
Block,
CoinBalance,
InternalTransaction,
Sequence,
TokenFetcher,
TokenBalanceFetcher
Token,
TokenBalance
}

@behaviour BlockFetcher
@behaviour Block.Fetcher

# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
Expand Down Expand Up @@ -48,14 +48,16 @@ defmodule Indexer.BlockFetcher.Catchup do
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. Upto `block_concurrency * receipts_batch_size * receipts_concurrency` (defaults to
`#{@blocks_concurrency * BlockFetcher.default_receipts_batch_size() * BlockFetcher.default_receipts_batch_size()}`
`#{
@blocks_concurrency * Block.Fetcher.default_receipts_batch_size() * Block.Fetcher.default_receipts_batch_size()
}`
) receipts can be requested from the JSONRPC at once over all connections.

"""
def task(
%__MODULE__{
blocks_batch_size: blocks_batch_size,
block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments}
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
Expand Down Expand Up @@ -98,7 +100,7 @@ defmodule Indexer.BlockFetcher.Catchup do

@async_import_remaning_block_data_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a

@impl BlockFetcher
@impl Block.Fetcher
def import(_, options) when is_map(options) do
{async_import_remaning_block_data_options, chain_import_options} =
Map.split(options, @async_import_remaning_block_data_options)
Expand Down Expand Up @@ -130,20 +132,20 @@ defmodule Indexer.BlockFetcher.Catchup do
block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash))
%{address_hash: address_hash, block_number: block_number}
end)
|> CoinBalanceFetcher.async_fetch_balances()
|> CoinBalance.Fetcher.async_fetch_balances()

transaction_hashes
|> Enum.map(fn transaction_hash ->
block_number = Map.fetch!(transaction_hash_to_block_number, to_string(transaction_hash))
%{block_number: block_number, hash: transaction_hash}
end)
|> InternalTransactionFetcher.async_fetch(10_000)
|> InternalTransaction.Fetcher.async_fetch(10_000)

tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()

TokenBalanceFetcher.async_fetch(token_balances)
TokenBalance.Fetcher.async_fetch(token_balances)
end

defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
Expand All @@ -160,7 +162,7 @@ defmodule Indexer.BlockFetcher.Catchup do

# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
defp fetch_and_import_range_from_sequence(
%__MODULE__{block_fetcher: %BlockFetcher{} = block_fetcher},
%__MODULE__{block_fetcher: %Block.Fetcher{} = block_fetcher},
_.._ = range,
sequence
) do
Expand Down
38 changes: 38 additions & 0 deletions apps/indexer/lib/indexer/block/catchup/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Indexer.Block.Catchup.Supervisor do
@moduledoc """
Supervises `Indexer.Block.Catchup.TaskSupervisor` and `Indexer.Block.Catchup.BoundIntervalSupervisor`
"""

use Supervisor

alias Indexer.Block.Catchup.BoundIntervalSupervisor

def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end

def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}

Supervisor.child_spec(default, [])
end

def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end

@impl Supervisor
def init(bound_interval_supervisor_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}
],
strategy: :one_for_one
)
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher do
defmodule Indexer.Block.Fetcher do
@moduledoc """
Fetches and indexes block ranges.
"""
Expand All @@ -8,7 +8,7 @@ defmodule Indexer.BlockFetcher do
alias Explorer.Chain.{Block, Import}
alias Indexer.{AddressExtraction, TokenTransfers}
alias Indexer.Address.{CoinBalances, TokenBalances}
alias Indexer.BlockFetcher.Receipts
alias Indexer.Block.Fetcher.Receipts

@type address_hash_to_fetched_balance_block_number :: %{String.t() => Block.block_number()}
@type transaction_hash_to_block_number :: %{String.t() => Block.block_number()}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
defmodule Indexer.BlockFetcher.Receipts do
defmodule Indexer.Block.Fetcher.Receipts do
@moduledoc """
Fetches transaction receipts after the transactions have been fetched with the blocks in `Indexer.BlockFetcher`.
"""

require Logger

alias Indexer.BlockFetcher
alias Indexer.Block

def fetch(%BlockFetcher{} = _state, []), do: {:ok, %{logs: [], receipts: []}}
def fetch(%Block.Fetcher{} = _state, []), do: {:ok, %{logs: [], receipts: []}}

def fetch(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state,
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state,
transaction_params
) do
Logger.debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.Realtime do
defmodule Indexer.Block.Realtime.Fetcher do
@moduledoc """
Fetches and indexes block ranges from latest block forward using a WebSocket.
"""
Expand All @@ -8,19 +8,19 @@ defmodule Indexer.BlockFetcher.Realtime do
require Logger

import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
import Indexer.BlockFetcher, only: [fetch_and_import_range: 2]
import Indexer.Block.Fetcher, only: [fetch_and_import_range: 2]

alias EthereumJSONRPC.Subscription
alias Explorer.Chain
alias Indexer.{AddressExtraction, BlockFetcher, TokenBalances, TokenFetcher}
alias Indexer.{AddressExtraction, Block, Token, TokenBalances}

@behaviour BlockFetcher
@behaviour Block.Fetcher

@enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription)a

@type t :: %__MODULE__{
block_fetcher: %BlockFetcher{
block_fetcher: %Block.Fetcher{
broadcast: true,
callback_module: __MODULE__,
json_rpc_named_arguments: EthereumJSONRPC.json_rpc_named_arguments(),
Expand All @@ -35,9 +35,9 @@ defmodule Indexer.BlockFetcher.Realtime do
end

@impl GenServer
def init(%{block_fetcher: %BlockFetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments})
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments})
when is_list(subscribe_named_arguments) do
{:ok, %__MODULE__{block_fetcher: %BlockFetcher{block_fetcher | broadcast: true, callback_module: __MODULE__}},
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: true, callback_module: __MODULE__}},
{:continue, {:init, subscribe_named_arguments}}}
end

Expand All @@ -54,7 +54,7 @@ defmodule Indexer.BlockFetcher.Realtime do
def handle_info(
{subscription, {:ok, %{"number" => quantity}}},
%__MODULE__{
block_fetcher: %BlockFetcher{} = block_fetcher,
block_fetcher: %Block.Fetcher{} = block_fetcher,
subscription: %Subscription{} = subscription
} = state
)
Expand Down Expand Up @@ -111,7 +111,7 @@ defmodule Indexer.BlockFetcher.Realtime do

@import_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a

@impl BlockFetcher
@impl Block.Fetcher
def import(
block_fetcher,
%{
Expand Down Expand Up @@ -154,11 +154,11 @@ defmodule Indexer.BlockFetcher.Realtime do
defp async_import_remaining_block_data(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)
|> TokenFetcher.async_fetch()
|> Token.Fetcher.async_fetch()
end

defp internal_transactions(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params, transactions_params: transactions_params}
) do
case transactions_params
Expand Down Expand Up @@ -191,7 +191,7 @@ defmodule Indexer.BlockFetcher.Realtime do
end

defp balances(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params} = options
) do
with {:ok, fetched_balances_params} <-
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.Realtime.Supervisor do
defmodule Indexer.Block.Realtime.Supervisor do
@moduledoc """
Supervises realtime block fetcher.
"""
Expand All @@ -17,17 +17,17 @@ defmodule Indexer.BlockFetcher.Realtime.Supervisor do
transport_options = Keyword.fetch!(subscribe_named_arguments, :transport_options)
url = Keyword.fetch!(transport_options, :url)
web_socket_module = Keyword.fetch!(transport_options, :web_socket)
web_socket = Indexer.BlockFetcher.Realtime.WebSocket
web_socket = Indexer.Block.Realtime.WebSocket

block_fetcher_subscribe_named_arguments =
put_in(subscribe_named_arguments[:transport_options][:web_socket_options], %{web_socket: web_socket})

[
{web_socket_module, [url, [name: web_socket]]},
{Indexer.BlockFetcher.Realtime,
{Indexer.Block.Realtime.Fetcher,
[
%{block_fetcher: block_fetcher, subscribe_named_arguments: block_fetcher_subscribe_named_arguments},
[name: Indexer.BlockFetcher.Realtime]
[name: Indexer.Block.Realtime.Fetcher]
]}
]
end
Expand Down
Loading