Skip to content

Commit

Permalink
Merge pull request #2470 from poanetwork/pp-realtime-skipping
Browse files Browse the repository at this point in the history
Allow Realtime Fetcher to wait for small skips
  • Loading branch information
vbaranov committed Oct 14, 2019
2 parents 93e8757 + e8c2fa5 commit ee4b87f
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Current

### Features
- [#2470](https://github.com/poanetwork/blockscout/pull/2470) - Allow Realtime Fetcher to wait for small skips
- [#2733](https://github.com/poanetwork/blockscout/pull/2733) - Add cache for first page of uncles
- [#2735](https://github.com/poanetwork/blockscout/pull/2735) - Add pending transactions cache
- [#2726](https://github.com/poanetwork/blockscout/pull/2726) - Remove internal_transaction block_number setting from blocks runner
Expand Down
9 changes: 8 additions & 1 deletion apps/indexer/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ block_transformer =
transformer
end

max_skipping_distance =
case Integer.parse(System.get_env("MAX_SKIPPING_DISTANCE", "")) do
{num, ""} -> num
_ -> 5
end

config :indexer,
block_transformer: block_transformer,
ecto_repos: [Explorer.Repo],
Expand All @@ -36,7 +42,8 @@ config :indexer,
# bytes
memory_limit: 1 <<< 30,
first_block: System.get_env("FIRST_BLOCK") || "0",
last_block: System.get_env("LAST_BLOCK") || ""
last_block: System.get_env("LAST_BLOCK") || "",
max_skipping_distance: max_skipping_distance

# config :indexer, Indexer.Fetcher.ReplacedTransaction.Supervisor, disabled?: true
# config :indexer, Indexer.Fetcher.BlockReward.Supervisor, disabled?: true
Expand Down
30 changes: 16 additions & 14 deletions apps/indexer/lib/indexer/block/catchup/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,12 @@ defmodule Indexer.Block.Catchup.Fetcher do
) do
Logger.metadata(fetcher: :block_catchup)

{:ok, latest_block_number} =
case latest_block() do
nil ->
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)

number ->
{:ok, number}
end

case latest_block_number do
case latest_block(json_rpc_named_arguments) do
# let realtime indexer get the genesis block
0 ->
%{first_block_number: 0, missing_block_count: 0, shrunk: false}

_ ->
latest_block_number ->
# realtime indexer gets the current latest block
first = latest_block_number - 1
last = last_block()
Expand Down Expand Up @@ -347,12 +338,23 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end

defp latest_block do
defp latest_block(json_rpc_named_arguments) do
string_value = Application.get_env(:indexer, :last_block)

case Integer.parse(string_value) do
{integer, ""} -> integer
_ -> nil
{integer, ""} ->
integer

_ ->
{:ok, number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
# leave to realtime indexer the blocks in the skipping window
skipping_distance = Application.get_env(:indexer, :max_skipping_distance)

if number > skipping_distance do
number - skipping_distance
else
0
end
end
end
end
64 changes: 45 additions & 19 deletions apps/indexer/lib/indexer/block/realtime/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,24 @@ defmodule Indexer.Block.Realtime.Fetcher do
number = quantity_to_integer(quantity)
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen)
{new_previous_number, new_max_number} =
case start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
# The number may have not been inserted if it was part of a small skip
:skip ->
Logger.debug(["#{inspect(number)} was skipped"])
{previous_number, max_number_seen}

new_max_number = new_max_number(number, max_number_seen)
_ ->
{number, new_max_number(number, max_number_seen)}
end

Process.cancel_timer(timer)
new_timer = schedule_polling()

{:noreply,
%{
state
| previous_number: number,
| previous_number: new_previous_number,
max_number_seen: new_max_number,
timer: new_timer
}}
Expand All @@ -116,7 +123,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
{number, new_max_number} =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} when is_nil(max_number_seen) or number > max_number_seen ->
start_fetch_and_import(number, block_fetcher, previous_number, number)
# in case of polling the realtime fetcher should take care of all the
# blocks in the skipping window, because the cathup fetcher wont
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)

last_catchup_number = max(0, 10 - max_skipping_distance - 1)
starting_number = max(previous_number, last_catchup_number) || last_catchup_number

start_fetch_and_import(number, block_fetcher, starting_number, nil)

{max_number_seen, number}

Expand Down Expand Up @@ -212,27 +226,31 @@ defmodule Indexer.Block.Realtime.Fetcher do
end

defp start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do
start_at = determine_start_at(number, previous_number, max_number_seen)
fetching_action = determine_fetching_action(number, previous_number, max_number_seen)

for block_number_to_fetch <- start_at..number do
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
if fetching_action != :skip do
for block_number_to_fetch <- fetching_action do
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end
end

fetching_action
end

defp determine_start_at(number, nil, nil), do: number
def determine_fetching_action(number, previous_number, max_number_seen) do
cond do
reorg?(number, max_number_seen) ->
[number]

defp determine_start_at(number, nil, max_number_seen) do
determine_start_at(number, number - 1, max_number_seen)
end
can_be_skipped?(number, max_number_seen) ->
:skip

defp determine_start_at(number, previous_number, max_number_seen) do
if reorg?(number, max_number_seen) do
# set start_at to NOT fill in skipped numbers
number
else
# set start_at to fill in skipped numbers, if any
previous_number + 1
is_nil(previous_number) ->
[number]

true ->
(previous_number + 1)..number
end
end

Expand All @@ -242,6 +260,14 @@ defmodule Indexer.Block.Realtime.Fetcher do

defp reorg?(_, _), do: false

defp can_be_skipped?(number, max_number_seen) when is_integer(max_number_seen) and number > max_number_seen + 1 do
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)

max_skipping_distance > 1 and number <= max_number_seen + max_skipping_distance
end

defp can_be_skipped?(_, _), do: false

@reorg_delay 5_000

@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do

setup :verify_on_exit!

# run the tests without the skipping window
setup do
Application.put_env(:indexer, :max_skipping_distance, 0)
end

describe "start_link/1" do
# See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth
Expand Down
5 changes: 5 additions & 0 deletions apps/indexer/test/indexer/block/catchup/fetcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ defmodule Indexer.Block.Catchup.FetcherTest do
}
end

setup do
# run the tests without the skipping window
Application.put_env(:indexer, :max_skipping_distance, 0)
end

describe "import/1" do
test "fetches uncles asynchronously", %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Expand Down
52 changes: 52 additions & 0 deletions apps/indexer/test/indexer/block/realtime/fetcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end

setup do
# run the tests with a realistic skipping window
Application.put_env(:indexer, :max_skipping_distance, 3)
end

describe "Indexer.Block.Fetcher.fetch_and_import_range/1" do
@tag :no_geth
test "in range with internal transactions", %{
Expand Down Expand Up @@ -424,4 +429,51 @@ defmodule Indexer.Block.Realtime.FetcherTest do
}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end
end

describe "determine_fetching_action/4" do
test "when everything (except number) is nil results in fetching only the number" do
assert [14] == Realtime.Fetcher.determine_fetching_action(14, nil, nil)
end

test "when number is also max_number_seen results in fetching only the number" do
number = 23
assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, number)
assert [number] == Realtime.Fetcher.determine_fetching_action(number, 21, number)
end

test "when max_number_seen is nil, fetching will start from previous_number" do
# note: this is a way to force this behavior, used by `poll_latest_block_number`
number = 156
previous_number = 150
old_number = 94

assert (previous_number + 1)..number == Realtime.Fetcher.determine_fetching_action(number, previous_number, nil)
assert (old_number + 1)..number == Realtime.Fetcher.determine_fetching_action(number, old_number, nil)
end

test "when number immediately follows the previous_number it is fetched" do
max_number_seen = 26
number = 27

assert [number] == Realtime.Fetcher.determine_fetching_action(number, nil, max_number_seen)
end

test "when number is inside the allowed skipping window nothing is fetched" do
max_number_seen = 26

assert :skip == Realtime.Fetcher.determine_fetching_action(28, nil, max_number_seen)
assert :skip == Realtime.Fetcher.determine_fetching_action(29, nil, max_number_seen)
end

test "when number is over the allowed skipping window all the values since the previous_number will be fetched" do
max_number_seen = 390
previous_number = 381
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance)

number = max_number_seen + max_skipping_distance + 1

assert (previous_number + 1)..number ==
Realtime.Fetcher.determine_fetching_action(number, previous_number, max_number_seen)
end
end
end
1 change: 1 addition & 0 deletions docs/env-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ $ export NETWORK=POA
| `EMISSION_FORMAT` | | Should be set to `POA` if you have block emission indentical to POA Network. This env var is used only if `CHAIN_SPEC_PATH` is set | `STANDARD` | v2.0.4+ | | |
| `REWARDS_CONTRACT_ADDRESS` | | Emission rewards contract address. This env var is used only if `EMISSION_FORMAT` is set to `POA` | `0xeca443e8e1ab29971a45a9c57a6a9875701698a5` | v2.0.4+ | | |
| `BLOCKSCOUT_PROTOCOL` | | Url scheme for blockscout | in prod env `https` is used, in dev env `http` is used | master | | |
| `MAX_SKIPPING_DISTANCE` | | The maximum distance the indexer is allowed to wait for when notified of a number not following the lask known one. | 4 | master | |

0 comments on commit ee4b87f

Please sign in to comment.