-
Notifications
You must be signed in to change notification settings - Fork 0
/
accounts_processor.go
136 lines (115 loc) · 3.69 KB
/
accounts_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package processors
import (
"context"
"github.com/guregu/null/zero"
"github.com/lantah/go/ingest"
"github.com/lantah/go/services/orbitr/internal/db2/history"
"github.com/lantah/go/support/errors"
"github.com/lantah/go/xdr"
)
type AccountsProcessor struct {
accountsQ history.QAccounts
cache *ingest.ChangeCompactor
}
func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {
p := &AccountsProcessor{accountsQ: accountsQ}
p.reset()
return p
}
func (p *AccountsProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
}
func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeAccount {
return nil
}
err := p.cache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
err = p.Commit(ctx)
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *AccountsProcessor) Commit(ctx context.Context) error {
batchUpsertAccounts := []history.AccountEntry{}
removeBatch := []string{}
changes := p.cache.GetChanges()
for _, change := range changes {
changed, err := change.AccountChangedExceptSigners()
if err != nil {
return errors.Wrap(err, "Error running change.AccountChangedExceptSigners")
}
if !changed {
continue
}
switch {
case change.Post != nil:
// Created and updated
row := p.ledgerEntryToRow(*change.Post)
batchUpsertAccounts = append(batchUpsertAccounts, row)
case change.Pre != nil && change.Post == nil:
// Removed
account := change.Pre.Data.MustAccount()
accountID := account.AccountId.Address()
removeBatch = append(removeBatch, accountID)
default:
return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil")
}
}
// Upsert accounts
if len(batchUpsertAccounts) > 0 {
err := p.accountsQ.UpsertAccounts(ctx, batchUpsertAccounts)
if err != nil {
return errors.Wrap(err, "errors in UpsertAccounts")
}
}
if len(removeBatch) > 0 {
rowsAffected, err := p.accountsQ.RemoveAccounts(ctx, removeBatch)
if err != nil {
return errors.Wrap(err, "error in RemoveAccounts")
}
if rowsAffected != int64(len(removeBatch)) {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when removing %d accounts",
rowsAffected,
len(removeBatch),
))
}
}
return nil
}
func (p *AccountsProcessor) ledgerEntryToRow(entry xdr.LedgerEntry) history.AccountEntry {
account := entry.Data.MustAccount()
liabilities := account.Liabilities()
var inflationDestination = ""
if account.InflationDest != nil {
inflationDestination = account.InflationDest.Address()
}
return history.AccountEntry{
AccountID: account.AccountId.Address(),
Balance: int64(account.Balance),
BuyingLiabilities: int64(liabilities.Buying),
SellingLiabilities: int64(liabilities.Selling),
SequenceNumber: int64(account.SeqNum),
SequenceLedger: zero.IntFrom(int64(account.SeqLedger())),
SequenceTime: zero.IntFrom(int64(account.SeqTime())),
NumSubEntries: uint32(account.NumSubEntries),
InflationDestination: inflationDestination,
Flags: uint32(account.Flags),
HomeDomain: string(account.HomeDomain),
MasterWeight: account.MasterKeyWeight(),
ThresholdLow: account.ThresholdLow(),
ThresholdMedium: account.ThresholdMedium(),
ThresholdHigh: account.ThresholdHigh(),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(entry),
NumSponsored: uint32(account.NumSponsored()),
NumSponsoring: uint32(account.NumSponsoring()),
}
}