-
Notifications
You must be signed in to change notification settings - Fork 496
/
account_data_processor.go
112 lines (94 loc) · 2.93 KB
/
account_data_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package processors
import (
"context"
"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
type AccountDataProcessor struct {
dataQ history.QData
batchInsertBuilder history.AccountDataBatchInsertBuilder
dataToUpdate []history.Data
dataToDelete []history.AccountDataKey
}
func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor {
p := &AccountDataProcessor{dataQ: dataQ}
p.reset()
return p
}
func (p *AccountDataProcessor) reset() {
p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder()
p.dataToUpdate = []history.Data{}
p.dataToDelete = []history.AccountDataKey{}
}
func (p *AccountDataProcessor) Name() string {
return "processors.AccountDataProcessor"
}
func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
// We're interested in data only
if change.Type != xdr.LedgerEntryTypeData {
return nil
}
switch {
case change.Pre == nil && change.Post != nil:
// Created
err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.Wrap(err, "Error adding to AccountDataBatchInsertBuilder")
}
case change.Pre != nil && change.Post == nil:
// Removed
data := change.Pre.Data.MustData()
key := history.AccountDataKey{
AccountID: data.AccountId.Address(),
DataName: string(data.DataName),
}
p.dataToDelete = append(p.dataToDelete, key)
default:
// Updated
p.dataToUpdate = append(p.dataToUpdate, p.ledgerEntryToRow(change.Post))
}
if p.batchInsertBuilder.Len()+len(p.dataToUpdate)+len(p.dataToDelete) > maxBatchSize {
if err := p.Commit(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
}
return nil
}
func (p *AccountDataProcessor) Commit(ctx context.Context) error {
defer p.reset()
err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "Error executing AccountDataBatchInsertBuilder")
}
if len(p.dataToUpdate) > 0 {
if err := p.dataQ.UpsertAccountData(ctx, p.dataToUpdate); err != nil {
return errors.Wrap(err, "error executing upsert")
}
}
if len(p.dataToDelete) > 0 {
count, err := p.dataQ.RemoveAccountData(ctx, p.dataToDelete)
if err != nil {
return errors.Wrap(err, "error executing removal")
}
if count != int64(len(p.dataToDelete)) {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when deleting %d account data",
count,
len(p.dataToDelete),
))
}
}
return nil
}
func (p *AccountDataProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Data {
data := entry.Data.MustData()
return history.Data{
AccountID: data.AccountId.Address(),
Name: string(data.DataName),
Value: history.AccountDataValue(data.DataValue),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(*entry),
}
}