Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to run analyzers in parallel #456

Merged
merged 15 commits into from Aug 24, 2023
Merged

Ability to run analyzers in parallel #456

merged 15 commits into from Aug 24, 2023

Conversation

mitjat
Copy link
Collaborator

@mitjat mitjat commented Jun 22, 2023

Continuation of the accidentally-closed #405.

This PR

  • adds a fast_sync key to each block analyzer's config, wherein the block range and parallelism can be specified
  • runs those parallel analyzers first, then a single slow-sync analyzer
  • changes "slow-sync" consensus analyzer (now at most one) so it first processes the genesis right before the first block it will analyze. If that happens to be the chain genesis document, it grabs that; otherwise, it uses StateToGenesis at an appropriate height. This is different from how we've been processing genesis so far (analyze the newest chain's genesis document at most once, regardless of the configured range), but the new way allows the slow analyzer to do the right thing wrt genesis in any of the three scenarios:
    • it runs after fast-sync analyzers that have fully processed some contiguous range of blocks
    • it runs after a slow-sync analyzer has processed some blocks in its range (after k8s redeployment)
    • it is starting from scratch, i.e. no blocks have been analyzed yet.

NOTE: Though this PR brings fast-sync mode, the mode is not fully usable yet. This PR does NOT implement the actual fast-sync behavior (= disabling dead reckoning) for parallelism tolerance inside the analyzers themselves; that happens in other PRs (#457, #389). So the only way to test this config in the short term is to run "fast-sync" analyzers (that are no different from slow ones) with a parallelism level of 1. Even then, one quickly runs into violated DB constraints because fast mode does not fetch the genesis before starting.

Resolves https://app.clickup.com/t/8669qdzbh

Manual testing:

  • Compatibility current ops practices (slow sync only):
    • Ran slow-sync on an empty DB from block 8_048_956 (= damask genesis_height). Verified that it processed the chain genesis first.
    • Ran slow-sync for a few blocks, killed the process, started a new analyzer. Verified that it does not fetch the genesis and continues where it left off.
  • Fast sync:
    • Ran fast sync until block 8_049_000 (and parallelism=1), then continued with slow sync. Verified that all blocks are indexed and that the genesis is downloaded before slow sync starts. TODO once the fast-sync analyzer can avoid violating DB constraints.

@mitjat mitjat force-pushed the mitjat/parallel-analyzers branch 2 times, most recently from 196ba2e to f47dc6f Compare August 11, 2023 06:14
@mitjat mitjat marked this pull request as ready for review August 12, 2023 00:05
analyzer/block/block.go Outdated Show resolved Hide resolved
return isContiguous, maxProcessedHeight, nil
}

// Returns true if the block immediately preceding `height` has been processed in slow-sync mode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Returns true if the block immediately preceding `height` has been processed in slow-sync mode.
// Returns true if the block at `height` has been processed in slow-sync mode.

Or am i misunderstanding the code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that was a remnant of an earlier version of the code. You are correct; fixed.

// subrange [from, X] for some X.
// - If the most recently processed block was not processed by slow-sync (i.e. by fast sync, or not
// at all), triggers a finalization of the fast-sync process.
func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (ok bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (ok bool) {
func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) bool {

since the method doesn't utilized the named return.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name because it documents the (otherwise potentially ambiguous) return value. Unless we have some styleguide saying not to do this, I'd prefer to keep it.

var genesisDoc *genesis.Document
if r.GenesisHeight == firstSlowSyncHeight {
m.logger.Info("fetching genesis document before starting with the first block of a chain", "chain_context", r.ChainContext, "genesis_height", r.GenesisHeight)
genesisDoc, err = m.source.GenesisDocument(ctx, r.ChainContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return a different result to what StateToGenesis would return if called at the genesis height?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does differ, but I would appreciate @kostko 's input. Here's my hypothesis/understanding, please let me know if any of it is wrong:

  • StateToGenesis(H) represents the status immediately after block H has been applied.
  • Assume that a chain with chaincontext C has GenesisHeight(C) = H₀. Then the changes applied to the chain, in order, are: 1) block H₀ - 1 from previous chain, 2) genesis GetGenesis(C), 3) block H₀.
  • As a sanity check, the corollary is: For a chaincontext C, GetGenesis(C) != StateToGenesis(GenesisHeight(C))

}
if cfg.Analyzers.Emerald != nil {
if fastRange := cfg.Analyzers.Emerald.FastSyncRange(); fastRange != nil {
for i := 0; i < cfg.Analyzers.Emerald.FastSync.Parallelism; i++ {
Copy link
Member

@ptrus ptrus Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this repeated initialization code be moved into a helper method, that gets the BlockBasedAnalyzerConfig and Runtime passed in?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ... though the helper needs access to a bunch of stuff in this current scope (ctx, cfg, fastSyncAnalyzers, err, dbclient, logger). So to make it not crazy verbose and unreadable, I made it extensively use side effects instead. And it's defined as an anonymous func so it can do that.
LMK if you prefer the old version. I'm a little on the fence and can be swayed easily, but I like the new version slightly better and think the grossness is well contained and worth it.

Copy link
Collaborator Author

@mitjat mitjat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Peter! Responses below.

return isContiguous, maxProcessedHeight, nil
}

// Returns true if the block immediately preceding `height` has been processed in slow-sync mode.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that was a remnant of an earlier version of the code. You are correct; fixed.

// subrange [from, X] for some X.
// - If the most recently processed block was not processed by slow-sync (i.e. by fast sync, or not
// at all), triggers a finalization of the fast-sync process.
func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (ok bool) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name because it documents the (otherwise potentially ambiguous) return value. Unless we have some styleguide saying not to do this, I'd prefer to keep it.

var genesisDoc *genesis.Document
if r.GenesisHeight == firstSlowSyncHeight {
m.logger.Info("fetching genesis document before starting with the first block of a chain", "chain_context", r.ChainContext, "genesis_height", r.GenesisHeight)
genesisDoc, err = m.source.GenesisDocument(ctx, r.ChainContext)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does differ, but I would appreciate @kostko 's input. Here's my hypothesis/understanding, please let me know if any of it is wrong:

  • StateToGenesis(H) represents the status immediately after block H has been applied.
  • Assume that a chain with chaincontext C has GenesisHeight(C) = H₀. Then the changes applied to the chain, in order, are: 1) block H₀ - 1 from previous chain, 2) genesis GetGenesis(C), 3) block H₀.
  • As a sanity check, the corollary is: For a chaincontext C, GetGenesis(C) != StateToGenesis(GenesisHeight(C))

}
if cfg.Analyzers.Emerald != nil {
if fastRange := cfg.Analyzers.Emerald.FastSyncRange(); fastRange != nil {
for i := 0; i < cfg.Analyzers.Emerald.FastSync.Parallelism; i++ {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ... though the helper needs access to a bunch of stuff in this current scope (ctx, cfg, fastSyncAnalyzers, err, dbclient, logger). So to make it not crazy verbose and unreadable, I made it extensively use side effects instead. And it's defined as an anonymous func so it can do that.
LMK if you prefer the old version. I'm a little on the fence and can be swayed easily, but I like the new version slightly better and think the grossness is well contained and worth it.

@mitjat mitjat requested a review from ptrus August 16, 2023 15:05
Copy link
Collaborator

@Andrew7234 Andrew7234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very neat! Only one major comment

analyzer/block/block.go Outdated Show resolved Hide resolved
config/config.go Outdated
Comment on lines 274 to 275
// - State that would normally be dead-reckoned is fetched directly from
// the node via the StateToGenesis() RPC.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add

Suggested change
// - State that would normally be dead-reckoned is fetched directly from
// the node via the StateToGenesis() RPC.
// - State that would normally be dead-reckoned is calculated or fetched directly from
// the node via the StateToGenesis() RPC.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for things like num_transfers or num_txs etc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 updated for future-proofing, though at least for now, my plan is to keep dead-reckoning those fields, see below.

analyzer/block/block.go Outdated Show resolved Hide resolved
-- Whether the processed subrange is a contiguous range that starts at the input range.
COALESCE(
(COUNT(*) = MAX(height) - MIN(height) + 1) AND MIN(height) = $2,
TRUE -- If there are no processed blocks, we consider the range contiguous.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TRUE -- If there are no processed blocks, we consider the range contiguous.
TRUE -- If there are no unprocessed blocks, we consider the range contiguous.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or "If all blocks are processed, " to eliminate the double negative

Copy link
Collaborator Author

@mitjat mitjat Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is actually OK. If no blocks have been processed (= empty DB), then MAX(height) and MIN(height) will return NULL, and COALESCE will choose the TRUE.
Why we want this behavior: If a non-contiguous subrange of the range-to-be-analyzed has already been processed, that precludes slow-sync (which assumes all blocks will be processed in-order). So when there are no blocks, we do want to allow slow-sync, so we say "the already-processed range (= the empty set) is contiguous".

LMK if the comment can use rewording.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I misunderstood the comment then, makes sense to me now

@@ -93,6 +97,12 @@ func (m *processor) PreWork(ctx context.Context) error {
return nil
}

// Implements block.BlockProcessor interface.
func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int64) error {
// For runtimes, fast sync does not disable any dead reckoning and does not ignore any updates.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm we'll need to be careful with things like evm_contracts.gas_used, evm_tokens.total_supply, and evm_tokens.num_transfers(not impl yet). The analyzer will run without complaint because the dead-reckoning will simply fail to find any rows to update, but they'll need to be re-reckoned after the fast-sync section is complete.

Here's where we dead-reckon gas_used for contracts we didn't see the evm.Create tx for. (For the ones we do see evm.Create, we simply initialize the gas_used appropriately.

Here's how to dead-reckon the total_supply

WITH token_supply 
     AS (SELECT runtime, 
                token_address, 
                Sum(balance) AS supply 
         FROM   chain.evm_token_balances 
         GROUP  BY token_address) 
UPDATE chain.evm_tokens 
SET    total_supply = token_supply.supply 
FROM   token_supply 
WHERE  chain.evm_tokens.runtime = total_supply.runtime 
       AND chain.evm_tokens.token_address = token_supply.token_address 
       AND chain.evm_tokens.total_supply IS NULL; 

Also just fyi we'll probably start dead-reckoning the account_stats like runtimeAccount.numTxs because it's slow... but that'll be in a later PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. I've thought of those. As of this PR, the comment is true because dead reckoning is not disabled anywhere yet :). But my intention has actually been to keep it true (= keep dead reckoning even inside fast sync) longer-term too, unless we have strong incentives to disable it for individual values.

In general, for every value where we'll disable dead reckoning, I'm concerned about how to be reasonably sure that whatever is inside FinalizeFastSync brings us to the exact same state as dead reckoning would have. So my plan is to initially disable dead reckons only in a very limited set of cases:

  • Consensus state that is easily fetchable from StateToGenesis(), so we can avoid a whole RPC during fast-sync. I think that's all or most of governance, scheduler, registry state.
  • Native balances. The are also contained in consensus state. Because every transfer touches the fee accumulator, this will reduce db contention/deadlocks. More importantly, it will prevent us from generating negative balances, which the DB constraints disallow.
  • Staking info in consensus. That, too, is full contained in StateToGenesis() so it's easy to implement FinalizeFastSync(). The reason to disable it is because this is the one value where dead-reckoning updates are non-commutative: Instead of just summing things, we're doing some fractional math. So if we process blocks out of order, the dead-reckoned values will be wrong anyway, so might as well go the extra mile and just not dead-reckon them in the first place.

Later we can add more, if any dead-reckoned value turns out to cause significant slowdowns. I expect the main problem will be db contention. But yes, when we do that, we'll have to be careful to have the right code in FinalizeFastSync to counteract the disabled dead reckons.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm i think that works for evmTokens since all the operations are commutative, but I think the contracts.gasUsed insert needs to be updated to make it commutative as well, eg, if multiple evm.Calls are processed before the evm.Create is:

WITH
	contract_gas_used AS (
		SELECT SUM(gas_used) AS total_gas
		FROM chain.runtime_transactions
		WHERE runtime = $1 AND "to" = $2::text
	)
INSERT INTO chain.evm_contracts
    (runtime, contract_address, creation_tx, creation_bytecode, gas_used)
    VALUES ($1, $2, $3, $4, COALESCE(contract_gas_used.total_gas, 0))
    FROM contract_gas_used
	ON CONFLICT (runtime, contract_address) DO UPDATE
	SET gas_used = gas_used + $5

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we'll need to be extra careful to make all running total updates commutative from now on; maybe we can add a comment somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh good point. That goes back to the discussion on Slack, where you pointed out that dead reckoning for {chain,analysis}.{evm_tokens,evm_contracts} tables is order-sensitive wrt where in the db batch it happens. And before that, when you introduced dead reckoning for total_supply in evm_tokens, the solution with duplicating info across chain and analysis schemas felt quite hacky (but necessary).

Summarizing from Slack, ligthly edited:

In my mind, the original source of all of that trouble is that updates to contract addresses (taking contracts as an example; might apply to other tables too) can only happen after we confirm that an address is indeed a contract. Hence the sharing of data between chain and analysis tables, and complex updating rules.
So we should try to make those two types of updates ("X is [not] a contract" and "X has used another Y of gas") independent. I suggest we make the table of contracts contain all potential contracts, with an extra bool flag to say if this is really a contract. (Or we can (ab)use the nullness of the bytecode field for that purpose.) The evm_tokens table works that already, IIRC; token_type == 0 means that this contract is not really a token in a meaningful way. If we track contracts this way too, then every dead-reckoning step can be an upsert, rather than an update.

If we do that, the gnarly initial INSERT would go away too, because as soon as we needed a row to increase the gas_used of an address, we'd simply create it in chain.evm_contracts.

I suggest we merge this PR as-is, and I follow up with a PR that refactors evm_contracts as suggested above so that its dead-reckoning updates really are commutative. Same for evm_tokens; while we do have copying of dead-reckoned values from analysis set up in that table once a candidate-token is confirmed to be a real token, it would be simpler if we immediately inserted into chain.evm_tokens.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

@mitjat
Copy link
Collaborator Author

mitjat commented Aug 24, 2023

Added tests in block_test.go, and expanded the block-grabbing logic so it allows extending the range of processed blocks backwards. For example, if only blocks in the range [1000, 2000] have been processed so far, it is now possible to request a "patch-up" and process blocks [0, 1000]. Not strictly needed, but might be very useful if we ever have a typo in the configured processing ranges, and deploy that typo against the prod db.

@mitjat mitjat merged commit cf56c92 into main Aug 24, 2023
6 checks passed
@mitjat mitjat deleted the mitjat/parallel-analyzers branch August 24, 2023 16:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants