-
Notifications
You must be signed in to change notification settings - Fork 496
/
ledgers_processor.go
93 lines (81 loc) · 2.14 KB
/
ledgers_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package processors
import (
"context"
"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
type ledgerInfo struct {
header xdr.LedgerHeaderHistoryEntry
successTxCount int
failedTxCount int
opCount int
txSetOpCount int
}
type LedgersProcessor struct {
batch history.LedgerBatchInsertBuilder
ledgers map[uint32]*ledgerInfo
ingestVersion int
}
func NewLedgerProcessor(batch history.LedgerBatchInsertBuilder, ingestVersion int) *LedgersProcessor {
return &LedgersProcessor{
batch: batch,
ledgers: map[uint32]*ledgerInfo{},
ingestVersion: ingestVersion,
}
}
func (p *LedgersProcessor) Name() string {
return "processors.LedgersProcessor"
}
func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *ledgerInfo {
sequence := lcm.LedgerSequence()
entry, ok := p.ledgers[sequence]
if !ok {
entry = &ledgerInfo{header: lcm.LedgerHeaderHistoryEntry()}
p.ledgers[sequence] = entry
}
return entry
}
func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
entry := p.ProcessLedger(lcm)
opCount := len(transaction.Envelope.Operations())
entry.txSetOpCount += opCount
if transaction.Result.Successful() {
entry.successTxCount++
entry.opCount += opCount
} else {
entry.failedTxCount++
}
return nil
}
func (p *LedgersProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
if len(p.ledgers) == 0 {
return nil
}
var min, max uint32
for ledger, entry := range p.ledgers {
err := p.batch.Add(
entry.header,
entry.successTxCount,
entry.failedTxCount,
entry.opCount,
entry.txSetOpCount,
p.ingestVersion,
)
if err != nil {
return errors.Wrapf(err, "error adding ledger %d to batch", ledger)
}
if min == 0 || ledger < min {
min = ledger
}
if max == 0 || ledger > max {
max = ledger
}
}
if err := p.batch.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error flushing ledgers %d - %d", min, max)
}
return nil
}