This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
forked from stellar/go
-
Notifications
You must be signed in to change notification settings - Fork 3
/
account_data_processor.go
107 lines (92 loc) · 2.64 KB
/
account_data_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package processors
import (
"context"
"github.com/xdbfoundation/go/ingest"
"github.com/xdbfoundation/go/services/frontier/internal/db2/history"
"github.com/xdbfoundation/go/support/errors"
"github.com/xdbfoundation/go/xdr"
)
type AccountDataProcessor struct {
dataQ history.QData
cache *ingest.ChangeCompactor
}
func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor {
p := &AccountDataProcessor{dataQ: dataQ}
p.reset()
return p
}
func (p *AccountDataProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
}
func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
// We're interested in data only
if change.Type != xdr.LedgerEntryTypeData {
return nil
}
err := p.cache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
err = p.Commit(ctx)
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *AccountDataProcessor) Commit(ctx context.Context) error {
var (
datasToUpsert []history.Data
datasToDelete []history.AccountDataKey
)
changes := p.cache.GetChanges()
for _, change := range changes {
switch {
case change.Pre == nil && change.Post != nil:
// Created
datasToUpsert = append(datasToUpsert, p.ledgerEntryToRow(change.Post))
case change.Pre != nil && change.Post == nil:
// Removed
data := change.Pre.Data.MustData()
key := history.AccountDataKey{
AccountID: data.AccountId.Address(),
DataName: string(data.DataName),
}
datasToDelete = append(datasToDelete, key)
default:
// Updated
datasToUpsert = append(datasToUpsert, p.ledgerEntryToRow(change.Post))
}
}
if len(datasToUpsert) > 0 {
if err := p.dataQ.UpsertAccountData(ctx, datasToUpsert); err != nil {
return errors.Wrap(err, "error executing upsert")
}
}
if len(datasToDelete) > 0 {
count, err := p.dataQ.RemoveAccountData(ctx, datasToDelete)
if err != nil {
return errors.Wrap(err, "error executing removal")
}
if count != int64(len(datasToDelete)) {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when deleting %d account data",
count,
len(datasToDelete),
))
}
}
return nil
}
func (p *AccountDataProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Data {
data := entry.Data.MustData()
return history.Data{
AccountID: data.AccountId.Address(),
Name: string(data.DataName),
Value: history.AccountDataValue(data.DataValue),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(*entry),
}
}