Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add itemBasedAnalyzer; refactor evm analyzers accordingly #511

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/block"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/analyzer/util"
analyzerCmd "github.com/oasisprotocol/nexus/cmd/analyzer"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
Expand Down Expand Up @@ -128,16 +129,6 @@ func setupAnalyzer(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg
return analyzer
}

// closingChannel returns a channel that closes when the wait group `wg` is done.
func closingChannel(wg *sync.WaitGroup) <-chan struct{} {
c := make(chan struct{})
go func() {
wg.Wait()
close(c)
}()
return c
}

// exactlyOneTrue checks a list of boolean values and returns an error unless exactly one true.
func exactlyOneTrue(bools ...bool) error {
count := 0
Expand Down Expand Up @@ -173,7 +164,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -213,7 +204,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -263,7 +254,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -306,7 +297,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -339,7 +330,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -378,7 +369,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
}()

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down Expand Up @@ -419,7 +410,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
}

// Wait for all analyzers to finish.
analyzersDone := closingChannel(&wg)
analyzersDone := util.ClosingChannel(&wg)
select {
case <-time.After(testsTimeout):
t.Fatal("timed out waiting for analyzer to finish")
Expand Down
158 changes: 34 additions & 124 deletions analyzer/evmcontractcode/evm_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package evmcontractcode
import (
"context"
"fmt"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/common"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
Expand All @@ -27,40 +25,40 @@ import (

const (
evmContractCodeAnalyzerPrefix = "evm_contract_code_"
maxDownloadBatch = 20
downloadTimeout = 61 * time.Second
)

type oasisAddress string

type main struct {
runtime common.Runtime
source nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
queueLengthMetric prometheus.Gauge
type processor struct {
runtime common.Runtime
source nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
}

var _ analyzer.Analyzer = (*main)(nil)
var _ item.ItemProcessor[*ContractCandidate] = (*processor)(nil)

func NewMain(
func NewAnalyzer(
runtime common.Runtime,
cfg config.ItemBasedAnalyzerConfig,
sourceClient nodeapi.RuntimeApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
m := &main{
logger = logger.With("analyzer", evmContractCodeAnalyzerPrefix+runtime)
p := &processor{
runtime: runtime,
source: sourceClient,
target: target,
logger: logger.With("analyzer", evmContractCodeAnalyzerPrefix+runtime),
queueLengthMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s%s_queue_length", evmContractCodeAnalyzerPrefix, runtime),
Help: "count of stale analysis.evm_contract_code rows",
}),
logger: logger,
}
prometheus.MustRegister(m.queueLengthMetric)
return m, nil
return item.NewAnalyzer[*ContractCandidate](
evmContractCodeAnalyzerPrefix+string(runtime),
cfg,
p,
target,
logger,
)
}

type ContractCandidate struct {
Expand All @@ -69,9 +67,9 @@ type ContractCandidate struct {
DownloadRound uint64
}

func (m main) getContractCandidates(ctx context.Context, limit int) ([]ContractCandidate, error) {
var candidates []ContractCandidate
rows, err := m.target.Query(ctx, queries.RuntimeEVMContractCodeAnalysisStale, m.runtime, limit)
func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*ContractCandidate, error) {
var candidates []*ContractCandidate
rows, err := p.target.Query(ctx, queries.RuntimeEVMContractCodeAnalysisStale, p.runtime, limit)
if err != nil {
return nil, fmt.Errorf("querying contract candidates: %w", err)
}
Expand All @@ -85,134 +83,46 @@ func (m main) getContractCandidates(ctx context.Context, limit int) ([]ContractC
); err != nil {
return nil, fmt.Errorf("scanning contract candidate: %w", err)
}
candidates = append(candidates, cc)
candidates = append(candidates, &cc)
}
return candidates, nil
}

func (m main) processContractCandidate(ctx context.Context, batch *storage.QueryBatch, candidate ContractCandidate) error {
m.logger.Info("downloading code", "addr", candidate.Addr, "eth_addr", candidate.EthAddr.Hex())
code, err := m.source.EVMGetCode(ctx, candidate.DownloadRound, candidate.EthAddr.Bytes())
func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, candidate *ContractCandidate) error {
p.logger.Info("downloading code", "addr", candidate.Addr, "eth_addr", candidate.EthAddr.Hex())
code, err := p.source.EVMGetCode(ctx, candidate.DownloadRound, candidate.EthAddr.Bytes())
if err != nil {
// Write nothing into the DB; we'll try again later.
return fmt.Errorf("downloading code for %x: %w", candidate.EthAddr, err)
}
if len(code) == 0 {
batch.Queue(
queries.RuntimeEVMContractCodeAnalysisSetIsContract,
m.runtime,
p.runtime,
candidate.Addr,
false, // is_contract
)
} else {
batch.Queue(
queries.RuntimeEVMContractCodeAnalysisSetIsContract,
m.runtime,
p.runtime,
candidate.Addr,
true, // is_contract
)
batch.Queue(
queries.RuntimeEVMContractRuntimeBytecodeUpsert,
m.runtime,
p.runtime,
candidate.Addr,
code,
)
}
return nil
}

func (m main) sendQueueLengthMetric(ctx context.Context) error {
func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := m.target.QueryRow(ctx, queries.RuntimeEVMContractCodeAnalysisStaleCount, m.runtime).Scan(&queueLength); err != nil {
return fmt.Errorf("querying number of stale contract code entries: %w", err)
}
m.queueLengthMetric.Set(float64(queueLength))
return nil
}

func (m main) processBatch(ctx context.Context) (int, error) {
contractCandidates, err := m.getContractCandidates(ctx, maxDownloadBatch)
if err != nil {
return 0, fmt.Errorf("getting contract candidates: %w", err)
}
m.logger.Info("processing", "num_contract_candidates", len(contractCandidates))
if len(contractCandidates) == 0 {
return 0, nil
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, downloadTimeout)
defer cancel()
group, groupCtx := errgroup.WithContext(ctxWithTimeout)

batches := make([]*storage.QueryBatch, 0, len(contractCandidates))

for _, cc := range contractCandidates {
cc := cc // Redeclare for unclobbered use within goroutine.
batch := &storage.QueryBatch{}
batches = append(batches, batch)
group.Go(func() error {
return m.processContractCandidate(groupCtx, batch, cc)
})
}

if err := group.Wait(); err != nil {
return 0, err
}

batch := &storage.QueryBatch{}
for _, b := range batches {
batch.Extend(b)
}
if err := m.target.SendBatch(ctx, batch); err != nil {
return 0, fmt.Errorf("sending batch: %w", err)
}
return len(contractCandidates), nil
}

func (m main) Start(ctx context.Context) {
backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected round time. All runtimes currently have the same round time.
6*time.Second,
)
if err != nil {
m.logger.Error("error configuring backoff policy",
"err", err,
)
return
if err := p.target.QueryRow(ctx, queries.RuntimeEVMContractCodeAnalysisStaleCount, p.runtime).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of stale contract code entries: %w", err)
}

for {
select {
case <-time.After(backoff.Timeout()):
// Process next block.
case <-ctx.Done():
m.logger.Warn("shutting down evm_contract_code analyzer", "reason", ctx.Err())
return
}

if err := m.sendQueueLengthMetric(ctx); err != nil {
m.logger.Warn("error sending queue length", "err", err)
}

numProcessed, err := m.processBatch(ctx)
if err != nil {
m.logger.Error("error processing batch", "err", err)
backoff.Failure()
continue
}

if numProcessed == 0 {
// Count this as a failure to reduce the polling when we are
// running faster than the block analyzer can find new contract candidates.
backoff.Failure()
continue
}

backoff.Success()
}
}

func (m main) Name() string {
return evmContractCodeAnalyzerPrefix + string(m.runtime)
return queueLength, nil
}
Loading
Loading