Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
313 lines (253 sloc) 10 KB
defmodule Indexer.Fetcher.InternalTransaction do
@moduledoc """
Fetches and indexes `t:Explorer.Chain.InternalTransaction.t/0`.
See `async_fetch/1` for details on configuring limits.
"""
use Indexer.Fetcher
use Spandex.Decorators
require Logger
import Indexer.Block.Fetcher, only: [async_import_coin_balances: 2]
alias Explorer.Chain
alias Explorer.Chain.{Block, Hash}
alias Explorer.Chain.Cache.{Accounts, Blocks}
alias Indexer.{BufferedTask, Tracer}
alias Indexer.Transform.Addresses
@behaviour BufferedTask
@max_batch_size 10
@max_concurrency 4
@defaults [
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
task_supervisor: Indexer.Fetcher.InternalTransaction.TaskSupervisor,
metadata: [fetcher: :internal_transaction]
]
@doc """
Asynchronously fetches internal transactions.
## Limiting Upstream Load
Internal transactions are an expensive upstream operation. The number of
results to fetch is configured by `@max_batch_size` and represents the number
of transaction hashes to request internal transactions in a single JSONRPC
request. Defaults to `#{@max_batch_size}`.
The `@max_concurrency` attribute configures the number of concurrent requests
of `@max_batch_size` to allow against the JSONRPC. Defaults to `#{@max_concurrency}`.
*Note*: The internal transactions for individual transactions cannot be paginated,
so the total number of internal transactions that could be produced is unknown.
"""
@spec async_fetch([%{required(:block_number) => Block.block_number(), required(:hash) => Hash.Full.t()}]) :: :ok
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
entries = Enum.map(transactions_fields, &entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
end
@doc """
Asynchronously fetches internal transactions.
## Limiting Upstream Load
Internal transactions are an expensive upstream operation. The number of
results to fetch is configured by `@max_batch_size` and represents the number
of transaction hashes to request internal transactions in a single JSONRPC
request. Defaults to `#{@max_batch_size}`.
The `@max_concurrency` attribute configures the number of concurrent requests
of `@max_batch_size` to allow against the JSONRPC. Defaults to `#{@max_concurrency}`.
*Note*: The internal transactions for individual transactions cannot be paginated,
so the total number of internal transactions that could be produced is unknown.
"""
@spec async_block_fetch([%{required(:block_number) => Block.block_number()}]) :: :ok
def async_block_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
entries = Enum.map(transactions_fields, &block_entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
end
@doc false
def child_spec([init_options, gen_server_options]) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)
unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end
merged_init_opts =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, json_rpc_named_arguments) do
{:ok, final} =
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity ->
Chain.stream_blocks_with_unfetched_internal_transactions(
[:number],
initial,
fn block_fields, acc ->
block_fields
|> block_entry()
|> reducer.(acc)
end
)
_ ->
Chain.stream_transactions_with_unfetched_internal_transactions(
[:block_number, :hash, :index],
initial,
fn transaction_fields, acc ->
transaction_fields
|> entry()
|> reducer.(acc)
end
)
end
final
end
defp entry(%{block_number: block_number, hash: %Hash{bytes: bytes}, index: index}) when is_integer(block_number) do
{block_number, bytes, index}
end
defp params({block_number, hash_bytes, index}) when is_integer(block_number) do
{:ok, hash} = Hash.Full.cast(hash_bytes)
%{block_number: block_number, hash_data: to_string(hash), transaction_index: index}
end
defp block_entry(%{number: block_number}) when is_integer(block_number) do
block_number
end
defp block_params(block_number) when is_integer(block_number) do
%{number: block_number}
end
@impl BufferedTask
@decorate trace(
name: "fetch",
resource: "Indexer.Fetcher.InternalTransaction.run/2",
service: :indexer,
tracer: Tracer
)
def run(entries, json_rpc_named_arguments) do
variant = Keyword.fetch!(json_rpc_named_arguments, :variant)
unique_entries = unique_entries(entries, variant)
unique_entries_count = Enum.count(unique_entries)
Logger.metadata(count: unique_entries_count)
Logger.debug("fetching internal transactions for transactions")
variant
|> case do
EthereumJSONRPC.Parity ->
unique_entries
|> EthereumJSONRPC.fetch_block_internal_transactions(json_rpc_named_arguments)
_ ->
unique_entries
|> Enum.map(&params/1)
|> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments)
end
|> case do
{:ok, internal_transactions_params} ->
import_internal_transaction(internal_transactions_params, json_rpc_named_arguments, unique_entries)
{:error, reason} ->
Logger.error(fn -> ["failed to fetch internal transactions for transactions: ", inspect(reason)] end,
error_count: unique_entries_count
)
# re-queue the de-duped entries
{:retry, unique_entries}
:ignore ->
:ok
end
end
defp import_internal_transaction(internal_transactions_params, json_rpc_named_arguments, unique_entries) do
internal_transactions_indexed_at_blocks =
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity -> Enum.map(unique_entries, &block_params/1)
_ -> []
end
unique_entries_count = Enum.count(unique_entries)
internal_transactions_params_without_failed_creations = remove_failed_creations(internal_transactions_params)
addresses_params =
Addresses.extract_addresses(%{
internal_transactions: internal_transactions_params_without_failed_creations
})
address_hash_to_block_number =
Enum.into(addresses_params, %{}, fn %{fetched_coin_balance_block_number: block_number, hash: hash} ->
{hash, block_number}
end)
imports =
Chain.import(%{
addresses: %{params: addresses_params},
internal_transactions: %{params: internal_transactions_params_without_failed_creations},
internal_transactions_indexed_at_blocks: %{
params: internal_transactions_indexed_at_blocks,
with: :number_only_changeset
},
timeout: :infinity
})
case imports do
{:ok, imported} ->
Accounts.drop(imported[:addreses])
Blocks.drop_nonconsensus(imported[:remove_consensus_of_missing_transactions_blocks])
async_import_coin_balances(imported, %{
address_hash_to_fetched_balance_block_number: address_hash_to_block_number
})
{:error, step, reason, _changes_so_far} ->
Logger.error(
fn ->
[
"failed to import internal transactions for transactions: ",
inspect(reason)
]
end,
step: step,
error_count: unique_entries_count
)
# re-queue the de-duped entries
{:retry, unique_entries}
end
end
defp unique_entries(entries, EthereumJSONRPC.Parity), do: Enum.uniq(entries)
# Protection and improved reporting for https://github.com/poanetwork/blockscout/issues/289
defp unique_entries(entries, _) do
entries_by_hash_bytes = Enum.group_by(entries, &elem(&1, 1))
if map_size(entries_by_hash_bytes) < length(entries) do
{unique_entries, duplicate_entries} =
entries_by_hash_bytes
|> Map.values()
|> uniques_and_duplicates()
Logger.error(fn ->
duplicate_entries
|> Stream.with_index()
|> Enum.reduce(
["Duplicate entries being used to fetch internal transactions:\n"],
fn {entry, index}, acc ->
[acc, " ", to_string(index + 1), ". ", inspect(entry), "\n"]
end
)
end)
unique_entries
else
entries
end
end
defp uniques_and_duplicates(groups) do
Enum.reduce(groups, {[], []}, fn group, {acc_uniques, acc_duplicates} ->
case group do
[unique] ->
{[unique | acc_uniques], acc_duplicates}
[unique | _] = duplicates ->
{[unique | acc_uniques], duplicates ++ acc_duplicates}
end
end)
end
defp remove_failed_creations(internal_transactions_params) do
internal_transactions_params
|> Enum.map(fn internal_transaction_params ->
internal_transaction_params[:trace_address]
failed_parent_index =
Enum.find(internal_transaction_params[:trace_address], fn trace_address ->
parent = Enum.at(internal_transactions_params, trace_address)
!is_nil(parent[:error])
end)
failed_parent = failed_parent_index && Enum.at(internal_transactions_params, failed_parent_index)
if failed_parent do
internal_transaction_params
|> Map.delete(:created_contract_address_hash)
|> Map.delete(:created_contract_code)
|> Map.delete(:gas_used)
|> Map.delete(:output)
|> Map.put(:error, failed_parent[:error])
else
internal_transaction_params
end
end)
end
end
You can’t perform that action at this time.