/
ingest_manager.go
64 lines (54 loc) · 2 KB
/
ingest_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package ingest
import (
sdk "github.com/cosmos/cosmos-sdk/types"
)
// IngestManager is an interface that defines the methods for the ingest manager.
// Ingest manager handles the processing of blocks and ingesting data into various sinks
// tha are defined by the Ingester interface.
type IngestManager interface {
// RegisterIngester registers an ingester.
RegisterIngester(ingester Ingester)
// ProcessBlock processes the block and ingests data into various sinks.
// Must never panic. If panic occurs, it is silently logged and ignored.
// If the ingester returns an error, it is silently logged and ignored.
ProcessBlock(ctx sdk.Context)
}
// Ingester is an interface that defines the methods for the ingester.
// Ingester ingests data into a sink.
type Ingester interface {
// ProcessBlock processes the block and ingests data into a sink.
// Returns error if the ingester fails to ingest data.
ProcessBlock(ctx sdk.Context) error
GetName() string
}
// ingesterImpl is an implementation of IngesterManager.
type ingestManagerImpl struct {
ingesters []Ingester
}
var _ IngestManager = &ingestManagerImpl{}
// NewIngestManager creates a new IngestManager.
func NewIngestManager() IngestManager {
return &ingestManagerImpl{
ingesters: []Ingester{},
}
}
// RegisterIngester implements IngestManager.
func (im *ingestManagerImpl) RegisterIngester(ingester Ingester) {
im.ingesters = append(im.ingesters, ingester)
}
// ProcessBlock implements IngestManager.
func (im *ingestManagerImpl) ProcessBlock(ctx sdk.Context) {
defer func() {
if r := recover(); r != nil {
// Panics are silently logged and ignored.
ctx.Logger().Error("panic while processing block during ingest", "err", r)
}
}()
// Ingesters must be set in the app. If not, we do nothing.
for _, ingester := range im.ingesters {
if err := ingester.ProcessBlock(ctx); err != nil {
// The error is silently logged and ignored.
ctx.Logger().Error("error processing block during ingest", "err", err, "ingester", ingester.GetName())
}
}
}