Skip to content

Commit

Permalink
Merge pull request #2748 from poanetwork/ab-batch-token-updates
Browse files Browse the repository at this point in the history
rewrite token updater
  • Loading branch information
vbaranov authored Nov 25, 2019
2 parents e88e779 + 6ff74e4 commit 8ea8b43
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 89 deletions.
6 changes: 4 additions & 2 deletions apps/explorer/lib/explorer/chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2992,8 +2992,10 @@ defmodule Explorer.Chain do
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Token.cataloged_tokens()
def stream_cataloged_token_contract_address_hashes(initial, reducer, hours_ago_updated \\ 48)
when is_function(reducer, 2) do
hours_ago_updated
|> Token.cataloged_tokens()
|> order_by(asc: :updated_at)
|> Repo.stream_reduce(initial, reducer)
end
Expand Down
9 changes: 6 additions & 3 deletions apps/explorer/lib/explorer/chain/token.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ defmodule Explorer.Chain.Token do
@doc """
Builds an `Ecto.Query` to fetch the cataloged tokens.
These are tokens with cataloged field set to true.
These are tokens with cataloged field set to true and updated_at is earlier or equal than an hour ago.
"""
def cataloged_tokens do
def cataloged_tokens(hours \\ 48) do
date_now = DateTime.utc_now()
hours_ago_date = DateTime.add(date_now, -:timer.hours(hours), :millisecond)

from(
token in __MODULE__,
select: token.contract_address_hash,
where: token.cataloged == true
where: token.cataloged == true and token.updated_at <= ^hours_ago_date
)
end
end
36 changes: 33 additions & 3 deletions apps/explorer/lib/explorer/token/metadata_retriever.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,45 @@ defmodule Explorer.Token.MetadataRetriever do
It will retry to fetch each function in the Smart Contract according to :token_functions_reader_max_retries
configured in the application env case one of them raised error.
"""
@spec get_functions_of(Hash.t()) :: Map.t()
@spec get_functions_of([String.t()] | Hash.t() | String.t()) :: Map.t() | {:ok, [Map.t()]}
def get_functions_of(hashes) when is_list(hashes) do
requests =
hashes
|> Enum.flat_map(fn hash ->
@contract_functions
|> Enum.map(fn {function, args} ->
%{contract_address: hash, function_name: function, args: args}
end)
end)

updated_at = DateTime.utc_now()

fetched_result =
requests
|> Reader.query_contracts(@contract_abi)
|> Enum.chunk_every(4)
|> Enum.zip(hashes)
|> Enum.map(fn {result, hash} ->
formatted_result =
["decimals", "name", "symbol", "totalSupply"]
|> Enum.zip(result)
|> format_contract_functions_result(hash)

formatted_result
|> Map.put(:contract_address_hash, hash)
|> Map.put(:updated_at, updated_at)
end)

{:ok, fetched_result}
end

def get_functions_of(%Hash{byte_count: unquote(Hash.Address.byte_count())} = address) do
address_string = Hash.to_string(address)

get_functions_of(address_string)
end

@spec get_functions_of(String.t()) :: Map.t()
def get_functions_of(contract_address_hash) do
def get_functions_of(contract_address_hash) when is_binary(contract_address_hash) do
contract_address_hash
|> fetch_functions_from_contract(@contract_functions)
|> format_contract_functions_result(contract_address_hash)
Expand Down
19 changes: 16 additions & 3 deletions apps/explorer/test/explorer/chain/token_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,27 @@ defmodule Explorer.Chain.TokenTest do

import Explorer.Factory

alias Explorer.Chain
alias Explorer.Chain.Token
alias Explorer.Repo

describe "cataloged_tokens/0" do
test "filters only cataloged tokens" do
token = insert(:token, cataloged: true)
{:ok, date} = DateTime.now("Etc/UTC")
hours_ago_date = DateTime.add(date, -:timer.hours(60), :millisecond)
token = insert(:token, cataloged: true, updated_at: hours_ago_date)
insert(:token, cataloged: false)

assert Repo.all(Chain.Token.cataloged_tokens()) == [token.contract_address_hash]
assert Repo.all(Token.cataloged_tokens()) == [token.contract_address_hash]
end

test "filter tokens by updated_at field" do
{:ok, date} = DateTime.now("Etc/UTC")
hours_ago_date = DateTime.add(date, -:timer.hours(60), :millisecond)

token = insert(:token, cataloged: true, updated_at: hours_ago_date)
insert(:token, cataloged: true)

assert Repo.all(Token.cataloged_tokens()) == [token.contract_address_hash]
end
end
end
13 changes: 8 additions & 5 deletions apps/explorer/test/explorer/chain_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3589,24 +3589,27 @@ defmodule Explorer.ChainTest do

describe "stream_cataloged_token_contract_address_hashes/2" do
test "reduces with given reducer and accumulator" do
%Token{contract_address_hash: catalog_address} = insert(:token, cataloged: true)
today = DateTime.utc_now()
yesterday = Timex.shift(today, days: -1)
%Token{contract_address_hash: catalog_address} = insert(:token, cataloged: true, updated_at: yesterday)
insert(:token, cataloged: false)
assert Chain.stream_cataloged_token_contract_address_hashes([], &[&1 | &2]) == {:ok, [catalog_address]}
assert Chain.stream_cataloged_token_contract_address_hashes([], &[&1 | &2], 1) == {:ok, [catalog_address]}
end

test "sorts the tokens by updated_at in ascending order" do
today = DateTime.utc_now()
yesterday = Timex.shift(today, days: -1)
two_days_ago = Timex.shift(today, days: -2)

token1 = insert(:token, %{cataloged: true, updated_at: today})
token2 = insert(:token, %{cataloged: true, updated_at: yesterday})
token1 = insert(:token, %{cataloged: true, updated_at: yesterday})
token2 = insert(:token, %{cataloged: true, updated_at: two_days_ago})

expected_response =
[token1, token2]
|> Enum.sort(&(Timex.to_unix(&1.updated_at) < Timex.to_unix(&2.updated_at)))
|> Enum.map(& &1.contract_address_hash)

assert Chain.stream_cataloged_token_contract_address_hashes([], &(&2 ++ [&1])) == {:ok, expected_response}
assert Chain.stream_cataloged_token_contract_address_hashes([], &(&2 ++ [&1]), 12) == {:ok, expected_response}
end
end

Expand Down
56 changes: 56 additions & 0 deletions apps/explorer/test/explorer/token/metadata_retriever_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,62 @@ defmodule Explorer.Token.MetadataRetrieverTest do
assert MetadataRetriever.get_functions_of(token.contract_address_hash) == expected
end

test "returns results for multiple coins" do
token = insert(:token, contract_address: build(:contract_address))

expect(
EthereumJSONRPC.Mox,
:json_rpc,
1,
fn requests, _opts ->
{:ok,
Enum.map(requests, fn
%{id: id, method: "eth_call", params: [%{data: "0x313ce567", to: _}, "latest"]} ->
%{
id: id,
result: "0x0000000000000000000000000000000000000000000000000000000000000012"
}

%{id: id, method: "eth_call", params: [%{data: "0x06fdde03", to: _}, "latest"]} ->
%{
id: id,
result:
"0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000642616e636f720000000000000000000000000000000000000000000000000000"
}

%{id: id, method: "eth_call", params: [%{data: "0x95d89b41", to: _}, "latest"]} ->
%{
id: id,
result:
"0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000003424e540000000000000000000000000000000000000000000000000000000000"
}

%{id: id, method: "eth_call", params: [%{data: "0x18160ddd", to: _}, "latest"]} ->
%{
id: id,
result: "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000"
}
end)}
end
)

assert {:ok,
[
%{
name: "Bancor",
symbol: "BNT",
total_supply: 1_000_000_000_000_000_000,
decimals: 18
},
%{
name: "Bancor",
symbol: "BNT",
total_supply: 1_000_000_000_000_000_000,
decimals: 18
}
]} = MetadataRetriever.get_functions_of([token.contract_address_hash, token.contract_address_hash])
end

test "returns only the functions that were read without error" do
token = insert(:token, contract_address: build(:contract_address))

Expand Down
87 changes: 62 additions & 25 deletions apps/indexer/lib/indexer/fetcher/token_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,90 @@ defmodule Indexer.Fetcher.TokenUpdater do
@moduledoc """
Updates metadata for cataloged tokens
"""

use GenServer
use Indexer.Fetcher

require Logger

alias Explorer.Chain
alias Explorer.Chain.Token
alias Explorer.Chain.{Hash, Token}
alias Explorer.Token.MetadataRetriever
alias Indexer.BufferedTask

def start_link([initial_state, gen_server_options]) do
GenServer.start_link(__MODULE__, initial_state, gen_server_options)
end
@behaviour BufferedTask

@max_batch_size 10
@max_concurrency 4
@defaults [
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
task_supervisor: Indexer.Fetcher.TokenUpdater.TaskSupervisor,
metadata: [fetcher: :token_updater]
]

@doc false
def child_spec([init_options, gen_server_options]) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)

unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end

@impl true
def init(state) do
send(self(), :update_tokens)
merged_init_opts =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state)

{:ok, state}
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end

@impl true
def handle_info(:update_tokens, state) do
{:ok, tokens} = Chain.stream_cataloged_token_contract_address_hashes([], &[&1 | &2])
@impl BufferedTask
def init(initial, reducer, _) do
metadata_updater_inverval = Application.get_env(:indexer, :metadata_updater_seconds_interval)
hour_interval = Kernel.round(metadata_updater_inverval / (60 * 60))

{:ok, tokens} = Chain.stream_cataloged_token_contract_address_hashes(initial, reducer, hour_interval)

tokens
|> Enum.reverse()
|> update_metadata()
end

@impl BufferedTask
def run(entries, _json_rpc_named_arguments) do
Logger.debug("updating tokens")

Process.send_after(self(), :update_tokens, :timer.seconds(state.update_interval))
entries
|> Enum.map(&to_string/1)
|> MetadataRetriever.get_functions_of()
|> case do
{:ok, params} ->
update_metadata(params)

{:noreply, state}
other ->
Logger.error(fn -> ["failed to update tokens: ", inspect(other)] end,
error_count: Enum.count(entries)
)

{:retry, entries}
end
end

@doc false
def update_metadata(token_addresses) when is_list(token_addresses) do
def update_metadata(metadata_list) when is_list(metadata_list) do
options = [necessity_by_association: %{[contract_address: :smart_contract] => :optional}]

Enum.each(token_addresses, fn address ->
case Chain.token_from_address_hash(address, options) do
Enum.each(metadata_list, fn %{contract_address_hash: contract_address_hash} = metadata ->
{:ok, hash} = Hash.Address.cast(contract_address_hash)

case Chain.token_from_address_hash(hash, options) do
{:ok, %Token{cataloged: true} = token} ->
update_metadata(token)
update_metadata(token, metadata)
end
end)
end

def update_metadata(%Token{contract_address_hash: contract_address_hash} = token) do
contract_functions = MetadataRetriever.get_functions_of(contract_address_hash)

Chain.update_token(%{token | updated_at: DateTime.utc_now()}, contract_functions)
def update_metadata(%Token{} = token, metadata) do
Chain.update_token(%{token | updated_at: DateTime.utc_now()}, metadata)
end
end
5 changes: 2 additions & 3 deletions apps/indexer/lib/indexer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ defmodule Indexer.Supervisor do
subscribe_named_arguments: subscribe_named_arguments
} = named_arguments

metadata_updater_inverval = Application.get_env(:indexer, :metadata_updater_seconds_interval)

block_fetcher =
named_arguments
|> Map.drop(~w(block_interval blocks_concurrency memory_monitor subscribe_named_arguments realtime_overrides)a)
Expand Down Expand Up @@ -118,12 +116,13 @@ defmodule Indexer.Supervisor do
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{TokenUpdater.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]},
{StakingPools.Supervisor, [[memory_monitor: memory_monitor]]},

# Out-of-band fetchers
{CoinBalanceOnDemand.Supervisor, [json_rpc_named_arguments]},
{TokenUpdater.Supervisor, [%{update_interval: metadata_updater_inverval}]},

# Temporary workers
{UncatalogedTokenTransfers.Supervisor, [[]]},
Expand Down
Loading

0 comments on commit 8ea8b43

Please sign in to comment.