-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathengine.go
150 lines (122 loc) · 3.74 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package traces
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/onflow/flow-go-sdk"
gethCommon "github.com/onflow/go-ethereum/common"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
)
var _ models.Engine = &Engine{}
type Engine struct {
logger zerolog.Logger
status *models.EngineStatus
blocksPublisher *models.Publisher
blocks storage.BlockIndexer
traces storage.TraceIndexer
downloader Downloader
collector metrics.Collector
}
func NewTracesIngestionEngine(
initEVMHeight uint64,
blocksPublisher *models.Publisher,
blocks storage.BlockIndexer,
traces storage.TraceIndexer,
downloader Downloader,
logger zerolog.Logger,
collector metrics.Collector,
) *Engine {
height := &atomic.Uint64{}
height.Store(initEVMHeight)
return &Engine{
status: models.NewEngineStatus(),
logger: logger.With().Str("component", "trace-ingestion").Logger(),
blocksPublisher: blocksPublisher,
blocks: blocks,
traces: traces,
downloader: downloader,
collector: collector,
}
}
func (e *Engine) Run(ctx context.Context) error {
// subscribe to new blocks
e.blocksPublisher.Subscribe(e)
e.status.MarkReady()
return nil
}
// Notify is a handler that is being used to subscribe for new EVM block notifications.
// This method should be non-blocking.
func (e *Engine) Notify(data any) {
block, ok := data.(*models.Block)
if !ok {
e.logger.Error().Msg("invalid event type sent to trace ingestion")
return
}
l := e.logger.With().Uint64("evm-height", block.Height).Logger()
cadenceID, err := e.blocks.GetCadenceID(block.Height)
if err != nil {
l.Error().Err(err).Msg("failed to get cadence block ID")
return
}
go e.indexBlockTraces(block, cadenceID)
}
// indexBlockTraces iterates the block transaction hashes and tries to download the traces
func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Identifier) {
ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout)
defer cancel()
const maxConcurrentDownloads = 5 // limit number of concurrent downloads
limiter := make(chan struct{}, maxConcurrentDownloads)
wg := sync.WaitGroup{}
for _, h := range evmBlock.TransactionHashes {
wg.Add(1)
limiter <- struct{}{} // acquire a slot
go func(h gethCommon.Hash) {
defer wg.Done()
defer func() { <-limiter }() // release a slot after done
l := e.logger.With().
Str("tx-id", h.String()).
Str("cadence-block-id", cadenceBlockID.String()).
Logger()
err := retry.Fibonacci(ctx, time.Second*1, func(ctx context.Context) error {
trace, err := e.downloader.Download(h, cadenceBlockID)
if err != nil {
l.Warn().Err(err).Msg("retrying failed download")
return retry.RetryableError(err)
}
return e.traces.StoreTransaction(h, trace, nil)
})
if err != nil {
e.collector.TraceDownloadFailed()
l.Error().Err(err).Msg("failed to download trace")
return
}
l.Info().Msg("trace downloaded successfully")
}(h)
}
wg.Wait()
}
// ID is required by the publisher interface and we return a random uuid since the
// subscription will only happen once by this engine
func (e *Engine) ID() uuid.UUID {
return uuid.New()
}
// Error is required by the publisher, and we just return a nil,
// since the errors are handled gracefully in the indexBlockTraces
func (e *Engine) Error() <-chan error {
return nil
}
func (e *Engine) Stop() {
e.status.MarkStopped()
}
func (e *Engine) Done() <-chan struct{} {
return e.status.IsDone()
}
func (e *Engine) Ready() <-chan struct{} {
return e.status.IsReady()
}