Skip to content

Commit

Permalink
add itemBasedAnalyzer; refactor evm analyzers accordingly
Browse files Browse the repository at this point in the history
enable stopOnEmptyBatch

nit: pointer receivers

refactor node_stats and metadata_registry analyzers

move metadata_registry to own package

add itemBasedAnalyzer tests

adjust tests

nit

nits

test

nits

move closingChannel() to util

address comments

add interItemDelay flag

refactor item analyzer args into cfg

change stopOnEmptyBatch -> stopOnEmptyQueue

propagate error from batch; nit

linter

update tests

nit
  • Loading branch information
Andrew7234 committed Sep 12, 2023
1 parent 4fb8274 commit 15c5add
Show file tree
Hide file tree
Showing 14 changed files with 1,076 additions and 749 deletions.
25 changes: 8 additions & 17 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/block"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/analyzer/util"
analyzerCmd "github.com/oasisprotocol/nexus/cmd/analyzer"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
Expand Down Expand Up @@ -128,16 +129,6 @@ func setupAnalyzer(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg
return analyzer
}

// closingChannel returns a channel that closes when the wait group `wg` is done.
func closingChannel(wg *sync.WaitGroup) <-chan struct{} {
c := make(chan struct{})
go func() {
wg.Wait()
close(c)
}()
return c
}

// exactlyOneTrue checks a list of boolean values and returns an error unless exactly one true.
func exactlyOneTrue(bools ...bool) error {
count := 0
Expand Down Expand Up @@ -173,7 +164,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -213,7 +204,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -263,7 +254,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -306,7 +297,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -339,7 +330,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -378,7 +369,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -419,7 +410,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down
158 changes: 34 additions & 124 deletions analyzer/evmcontractcode/evm_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package evmcontractcode
import (
"context"
"fmt"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/common"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
Expand All @@ -27,40 +25,40 @@ import (

const (
evmContractCodeAnalyzerPrefix = "evm_contract_code_"
maxDownloadBatch = 20
downloadTimeout = 61 * time.Second
)

type oasisAddress string

type main struct {
runtime common.Runtime
source nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
queueLengthMetric prometheus.Gauge
type processor struct {
runtime common.Runtime
source nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
}

var _ analyzer.Analyzer = (*main)(nil)
var _ item.ItemProcessor[*ContractCandidate] = (*processor)(nil)

func NewMain(
func NewAnalyzer(
runtime common.Runtime,
cfg config.ItemBasedAnalyzerConfig,
sourceClient nodeapi.RuntimeApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
m := &main{
logger = logger.With("analyzer", evmContractCodeAnalyzerPrefix+runtime)
p := &processor{
runtime: runtime,
source: sourceClient,
target: target,
logger: logger.With("analyzer", evmContractCodeAnalyzerPrefix+runtime),
queueLengthMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s%s_queue_length", evmContractCodeAnalyzerPrefix, runtime),
Help: "count of stale analysis.evm_contract_code rows",
}),
logger: logger,
}
prometheus.MustRegister(m.queueLengthMetric)
return m, nil
return item.NewAnalyzer[*ContractCandidate](
evmContractCodeAnalyzerPrefix+string(runtime),
cfg,
p,
target,
logger,
)
}

type ContractCandidate struct {
Expand All @@ -69,9 +67,9 @@ type ContractCandidate struct {
DownloadRound uint64
}

func (m main) getContractCandidates(ctx context.Context, limit int) ([]ContractCandidate, error) {
var candidates []ContractCandidate
rows, err := m.target.Query(ctx, queries.RuntimeEVMContractCodeAnalysisStale, m.runtime, limit)
func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*ContractCandidate, error) {
var candidates []*ContractCandidate
rows, err := p.target.Query(ctx, queries.RuntimeEVMContractCodeAnalysisStale, p.runtime, limit)
if err != nil {
return nil, fmt.Errorf("querying contract candidates: %w", err)
}
Expand All @@ -85,134 +83,46 @@ func (m main) getContractCandidates(ctx context.Context, limit int) ([]ContractC
); err != nil {
return nil, fmt.Errorf("scanning contract candidate: %w", err)
}
candidates = append(candidates, cc)
candidates = append(candidates, &cc)
}
return candidates, nil
}

func (m main) processContractCandidate(ctx context.Context, batch *storage.QueryBatch, candidate ContractCandidate) error {
m.logger.Info("downloading code", "addr", candidate.Addr, "eth_addr", candidate.EthAddr.Hex())
code, err := m.source.EVMGetCode(ctx, candidate.DownloadRound, candidate.EthAddr.Bytes())
func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, candidate *ContractCandidate) error {
p.logger.Info("downloading code", "addr", candidate.Addr, "eth_addr", candidate.EthAddr.Hex())
code, err := p.source.EVMGetCode(ctx, candidate.DownloadRound, candidate.EthAddr.Bytes())
if err != nil {
// Write nothing into the DB; we'll try again later.
return fmt.Errorf("downloading code for %x: %w", candidate.EthAddr, err)
}
if len(code) == 0 {
batch.Queue(
queries.RuntimeEVMContractCodeAnalysisSetIsContract,
m.runtime,
p.runtime,
candidate.Addr,
false, // is_contract
)
} else {
batch.Queue(
queries.RuntimeEVMContractCodeAnalysisSetIsContract,
m.runtime,
p.runtime,
candidate.Addr,
true, // is_contract
)
batch.Queue(
queries.RuntimeEVMContractRuntimeBytecodeUpsert,
m.runtime,
p.runtime,
candidate.Addr,
code,
)
}
return nil
}

func (m main) sendQueueLengthMetric(ctx context.Context) error {
func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := m.target.QueryRow(ctx, queries.RuntimeEVMContractCodeAnalysisStaleCount, m.runtime).Scan(&queueLength); err != nil {
return fmt.Errorf("querying number of stale contract code entries: %w", err)
}
m.queueLengthMetric.Set(float64(queueLength))
return nil
}

func (m main) processBatch(ctx context.Context) (int, error) {
contractCandidates, err := m.getContractCandidates(ctx, maxDownloadBatch)
if err != nil {
return 0, fmt.Errorf("getting contract candidates: %w", err)
}
m.logger.Info("processing", "num_contract_candidates", len(contractCandidates))
if len(contractCandidates) == 0 {
return 0, nil
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, downloadTimeout)
defer cancel()
group, groupCtx := errgroup.WithContext(ctxWithTimeout)

batches := make([]*storage.QueryBatch, 0, len(contractCandidates))

for _, cc := range contractCandidates {
cc := cc // Redeclare for unclobbered use within goroutine.
batch := &storage.QueryBatch{}
batches = append(batches, batch)
group.Go(func() error {
return m.processContractCandidate(groupCtx, batch, cc)
})
}

if err := group.Wait(); err != nil {
return 0, err
}

batch := &storage.QueryBatch{}
for _, b := range batches {
batch.Extend(b)
}
if err := m.target.SendBatch(ctx, batch); err != nil {
return 0, fmt.Errorf("sending batch: %w", err)
}
return len(contractCandidates), nil
}

func (m main) Start(ctx context.Context) {
backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
6*time.Second,
)
if err != nil {
m.logger.Error("error configuring backoff policy",
"err", err,
)
return
if err := p.target.QueryRow(ctx, queries.RuntimeEVMContractCodeAnalysisStaleCount, p.runtime).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of stale contract code entries: %w", err)
}

for {
select {
case <-time.After(backoff.Timeout()):
// Process next block.
case <-ctx.Done():
m.logger.Warn("shutting down evm_contract_code analyzer", "reason", ctx.Err())
return
}

if err := m.sendQueueLengthMetric(ctx); err != nil {
m.logger.Warn("error sending queue length", "err", err)
}

numProcessed, err := m.processBatch(ctx)
if err != nil {
m.logger.Error("error processing batch", "err", err)
backoff.Failure()
continue
}

if numProcessed == 0 {
// Count this as a failure to reduce the polling when we are
// running faster than the block analyzer can find new contract candidates.
backoff.Failure()
continue
}

backoff.Success()
}
}

func (m main) Name() string {
return evmContractCodeAnalyzerPrefix + string(m.runtime)
return queueLength, nil
}

0 comments on commit 15c5add

Please sign in to comment.