Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to run analyzers in parallel #405

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analyzer/aggregate_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (a *AggregateStatsAnalyzer) Name() string {
return AggregateStatsAnalyzerName
}

func NewAggregateStatsAnalyzer(cfg *config.AggregateStatsConfig, target storage.TargetStorage, logger *log.Logger) (*AggregateStatsAnalyzer, error) {
func NewAggregateStatsAnalyzer(cfg *config.AggregateStatsConfig, target storage.TargetStorage, logger *log.Logger) (Analyzer, error) {
logger.Info("starting aggregate_stats analyzer")
return &AggregateStatsAnalyzer{
target: target,
Expand Down
7 changes: 7 additions & 0 deletions analyzer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type BlockRange struct {
To int64
}

type BlockAnalysisMode string

const (
FastSyncMode BlockAnalysisMode = "fast-sync"
SlowSyncMode BlockAnalysisMode = "slow-sync"
)

// RuntimeConfig specifies configuration parameters for
// processing the runtime layer.
type RuntimeConfig struct {
Expand Down
79 changes: 40 additions & 39 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ func OpenSignedTxNoVerify(signedTx *transaction.SignedTransaction) (*transaction
return &tx, nil
}

// Main is the main Analyzer for the consensus layer.
type Main struct {
// consensusAnalyzer is the main Analyzer for the consensus layer.
type consensusAnalyzer struct {
cfg analyzer.ConsensusConfig
mode analyzer.BlockAnalysisMode
target storage.TargetStorage
logger *log.Logger
metrics metrics.DatabaseMetrics
}

var _ analyzer.Analyzer = (*Main)(nil)
var _ analyzer.Analyzer = (*consensusAnalyzer)(nil)

// NewMain returns a new main analyzer for the consensus layer.
func NewMain(cfg *config.BlockBasedAnalyzerConfig, genesisChainContext string, sourceClient *source.ConsensusClient, target storage.TargetStorage, logger *log.Logger) (*Main, error) {
// NewConsensusAnalyzer returns a new main analyzer for the consensus layer.
func NewConsensusAnalyzer(cfg config.BlockRange, mode analyzer.BlockAnalysisMode, genesisChainContext string, sourceClient *source.ConsensusClient, target storage.TargetStorage, logger *log.Logger) (analyzer.Analyzer, error) {
// Configure analyzer.
blockRange := analyzer.BlockRange{
From: cfg.From,
Expand All @@ -79,17 +80,17 @@ func NewMain(cfg *config.BlockBasedAnalyzerConfig, genesisChainContext string, s
Source: sourceClient,
}

logger.Info("Starting consensus analyzer", "config", ac)
return &Main{
return &consensusAnalyzer{
cfg: ac,
mode: mode,
target: target,
logger: logger.With("analyzer", ConsensusAnalyzerName),
metrics: metrics.NewDefaultDatabaseMetrics(ConsensusAnalyzerName),
}, nil
}

// Start starts the main consensus analyzer.
func (m *Main) Start(ctx context.Context) {
func (m *consensusAnalyzer) Start(ctx context.Context) {
// Get block to be indexed.
var height int64

Expand Down Expand Up @@ -170,12 +171,12 @@ func (m *Main) Start(ctx context.Context) {
}

// Name returns the name of the Main.
func (m *Main) Name() string {
func (m *consensusAnalyzer) Name() string {
return ConsensusAnalyzerName
}

// source returns the source storage for the provided block height.
func (m *Main) source(height int64) (storage.ConsensusSourceStorage, error) {
func (m *consensusAnalyzer) source(height int64) (storage.ConsensusSourceStorage, error) {
r := m.cfg
if height >= r.Range.From && (r.Range.To == 0 || height <= r.Range.To) {
return r.Source, nil
Expand All @@ -185,7 +186,7 @@ func (m *Main) source(height int64) (storage.ConsensusSourceStorage, error) {
}

// latestBlock returns the latest block processed by the consensus analyzer.
func (m *Main) latestBlock(ctx context.Context) (int64, error) {
func (m *consensusAnalyzer) latestBlock(ctx context.Context) (int64, error) {
var latest int64
if err := m.target.QueryRow(
ctx,
Expand All @@ -199,7 +200,7 @@ func (m *Main) latestBlock(ctx context.Context) (int64, error) {
return latest, nil
}

func (m *Main) isGenesisProcessed(ctx context.Context, chainContext string) (bool, error) {
func (m *consensusAnalyzer) isGenesisProcessed(ctx context.Context, chainContext string) (bool, error) {
var processed bool
if err := m.target.QueryRow(
ctx,
Expand All @@ -211,7 +212,7 @@ func (m *Main) isGenesisProcessed(ctx context.Context, chainContext string) (boo
return processed, nil
}

func (m *Main) processGenesis(ctx context.Context, chainContext string) error {
func (m *consensusAnalyzer) processGenesis(ctx context.Context, chainContext string) error {
m.logger.Info("fetching genesis document")
genesisDoc, err := m.cfg.Source.GenesisDocument(ctx)
if err != nil {
Expand Down Expand Up @@ -255,7 +256,7 @@ func (m *Main) processGenesis(ctx context.Context, chainContext string) error {
// processBlock processes the provided block, retrieving all required information
// from source storage and committing an atomically-executed batch of queries
// to target storage.
func (m *Main) processBlock(ctx context.Context, height int64) error {
func (m *consensusAnalyzer) processBlock(ctx context.Context, height int64) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, ProcessBlockTimeout)
defer cancel()

Expand Down Expand Up @@ -355,7 +356,7 @@ func (m *Main) processBlock(ctx context.Context, height int64) error {
return nil
}

func (m *Main) queueBlockInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *consensusAnalyzer) queueBlockInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
batch.Queue(
queries.ConsensusBlockInsert,
data.BlockHeader.Height,
Expand All @@ -371,7 +372,7 @@ func (m *Main) queueBlockInserts(batch *storage.QueryBatch, data *storage.Consen
return nil
}

func (m *Main) queueEpochInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *consensusAnalyzer) queueEpochInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
batch.Queue(
queries.ConsensusEpochInsert,
data.Epoch,
Expand All @@ -390,7 +391,7 @@ func (m *Main) queueEpochInserts(batch *storage.QueryBatch, data *storage.Consen
return nil
}

func (m *Main) queueTransactionInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *consensusAnalyzer) queueTransactionInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
for i, txr := range data.TransactionsWithResults {
signedTx := txr.Transaction
result := txr.Result
Expand Down Expand Up @@ -468,7 +469,7 @@ func (m *Main) queueTransactionInserts(batch *storage.QueryBatch, data *storage.
}

// Enqueue DB statements to store events that were generated as the result of a TX execution.
func (m *Main) queueTxEventInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *consensusAnalyzer) queueTxEventInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
for i, txr := range data.TransactionsWithResults {
var txAccounts []staking.Address
for _, event := range txr.Result.Events {
Expand Down Expand Up @@ -502,7 +503,7 @@ func (m *Main) queueTxEventInserts(batch *storage.QueryBatch, data *storage.Cons
return nil
}

func (m *Main) queueRuntimeRegistrations(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *consensusAnalyzer) queueRuntimeRegistrations(batch *storage.QueryBatch, data *storage.RegistryData) error {
for _, runtimeEvent := range data.RuntimeRegisteredEvents {
var keyManager *string

Expand All @@ -523,7 +524,7 @@ func (m *Main) queueRuntimeRegistrations(batch *storage.QueryBatch, data *storag
return nil
}

func (m *Main) queueEntityEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *consensusAnalyzer) queueEntityEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
for _, entityEvent := range data.EntityEvents {
entityID := entityEvent.Entity.ID.String()

Expand All @@ -542,7 +543,7 @@ func (m *Main) queueEntityEvents(batch *storage.QueryBatch, data *storage.Regist
return nil
}

func (m *Main) queueNodeEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *consensusAnalyzer) queueNodeEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
for _, nodeEvent := range data.NodeEvents {
if nodeEvent.IsRegistration {
// A new node is registered.
Expand Down Expand Up @@ -582,7 +583,7 @@ func (m *Main) queueNodeEvents(batch *storage.QueryBatch, data *storage.Registry
return nil
}

func (m *Main) queueRegistryEventInserts(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *consensusAnalyzer) queueRegistryEventInserts(batch *storage.QueryBatch, data *storage.RegistryData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand All @@ -599,7 +600,7 @@ func (m *Main) queueRegistryEventInserts(batch *storage.QueryBatch, data *storag
return nil
}

func (m *Main) queueRootHashEventInserts(batch *storage.QueryBatch, data *storage.RootHashData) error {
func (m *consensusAnalyzer) queueRootHashEventInserts(batch *storage.QueryBatch, data *storage.RootHashData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand Down Expand Up @@ -635,15 +636,15 @@ const (
TransferTypeOther TransferType = "Other"
)

func (m *Main) queueRegularTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueRegularTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
return m.queueTransfers(batch, data, TransferTypeOther)
}

func (m *Main) queueDisbursementTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueDisbursementTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
return m.queueTransfers(batch, data, TransferTypeAccumulatorDisbursement)
}

func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData, targetType TransferType) error {
func (m *consensusAnalyzer) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData, targetType TransferType) error {
for _, transfer := range data.Transfers {
// Filter out transfers that are not of the target type.
typ := TransferTypeOther // type of the current transfer
Expand All @@ -667,7 +668,7 @@ func (m *Main) queueTransfers(batch *storage.QueryBatch, data *storage.StakingDa
return nil
}

func (m *Main) queueBurns(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueBurns(batch *storage.QueryBatch, data *storage.StakingData) error {
for _, burn := range data.Burns {
batch.Queue(queries.ConsensusDecreaseGeneralBalanceUpsert,
burn.Owner.String(),
Expand All @@ -678,7 +679,7 @@ func (m *Main) queueBurns(batch *storage.QueryBatch, data *storage.StakingData)
return nil
}

func (m *Main) queueEscrows(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueEscrows(batch *storage.QueryBatch, data *storage.StakingData) error {
for _, e := range data.AddEscrows {
owner := e.Owner.String()
escrower := e.Escrow.String()
Expand Down Expand Up @@ -745,7 +746,7 @@ func (m *Main) queueEscrows(batch *storage.QueryBatch, data *storage.StakingData
return nil
}

func (m *Main) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.StakingData) error {
for _, allowanceChange := range data.AllowanceChanges {
if allowanceChange.Allowance.IsZero() {
batch.Queue(queries.ConsensusAllowanceChangeDelete,
Expand All @@ -767,7 +768,7 @@ func (m *Main) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.St
return nil
}

func (m *Main) queueStakingEventInserts(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *consensusAnalyzer) queueStakingEventInserts(batch *storage.QueryBatch, data *storage.StakingData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand All @@ -784,7 +785,7 @@ func (m *Main) queueStakingEventInserts(batch *storage.QueryBatch, data *storage
return nil
}

func (m *Main) queueValidatorUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
func (m *consensusAnalyzer) queueValidatorUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
for _, validator := range data.Validators {
batch.Queue(queries.ConsensusValidatorNodeUpdate,
validator.ID.String(),
Expand All @@ -795,7 +796,7 @@ func (m *Main) queueValidatorUpdates(batch *storage.QueryBatch, data *storage.Sc
return nil
}

func (m *Main) queueCommitteeUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
func (m *consensusAnalyzer) queueCommitteeUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
batch.Queue(queries.ConsensusCommitteeMembersTruncate)
for namespace, committees := range data.Committees {
runtime := namespace.String()
Expand All @@ -820,7 +821,7 @@ func (m *Main) queueCommitteeUpdates(batch *storage.QueryBatch, data *storage.Sc
return nil
}

func (m *Main) queueSubmissions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *consensusAnalyzer) queueSubmissions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
for _, submission := range data.ProposalSubmissions {
if submission.Content.Upgrade != nil {
batch.Queue(queries.ConsensusProposalSubmissionInsert,
Expand Down Expand Up @@ -852,7 +853,7 @@ func (m *Main) queueSubmissions(batch *storage.QueryBatch, data *storage.Governa
return nil
}

func (m *Main) queueExecutions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *consensusAnalyzer) queueExecutions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
for _, execution := range data.ProposalExecutions {
batch.Queue(queries.ConsensusProposalExecutionsUpdate,
execution.ID,
Expand All @@ -862,7 +863,7 @@ func (m *Main) queueExecutions(batch *storage.QueryBatch, data *storage.Governan
return nil
}

func (m *Main) queueFinalizations(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *consensusAnalyzer) queueFinalizations(batch *storage.QueryBatch, data *storage.GovernanceData) error {
for _, finalization := range data.ProposalFinalizations {
batch.Queue(queries.ConsensusProposalUpdate,
finalization.ID,
Expand All @@ -877,7 +878,7 @@ func (m *Main) queueFinalizations(batch *storage.QueryBatch, data *storage.Gover
return nil
}

func (m *Main) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *consensusAnalyzer) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceData) error {
for _, vote := range data.Votes {
batch.Queue(queries.ConsensusVoteInsert,
vote.ID,
Expand All @@ -889,7 +890,7 @@ func (m *Main) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceDat
return nil
}

func (m *Main) queueGovernanceEventInserts(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *consensusAnalyzer) queueGovernanceEventInserts(batch *storage.QueryBatch, data *storage.GovernanceData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand All @@ -906,7 +907,7 @@ func (m *Main) queueGovernanceEventInserts(batch *storage.QueryBatch, data *stor
return nil
}

func (m *Main) queueSingleEventInserts(batch *storage.QueryBatch, eventData *parsedEvent, height int64) error {
func (m *consensusAnalyzer) queueSingleEventInserts(batch *storage.QueryBatch, eventData *parsedEvent, height int64) error {
accounts := extractUniqueAddresses(eventData.relatedAddresses)
body, err := json.Marshal(eventData.rawBody)
if err != nil {
Expand Down Expand Up @@ -941,7 +942,7 @@ func extractUniqueAddresses(accounts []staking.Address) []string {
}

// extractEventData extracts the type, the body (JSON-serialized), and the related accounts of an event.
func (m *Main) extractEventData(event nodeapi.Event) parsedEvent {
func (m *consensusAnalyzer) extractEventData(event nodeapi.Event) parsedEvent {
eventData := parsedEvent{
ty: event.Type,
rawBody: event.RawBody,
Expand Down
Loading
Loading