/
ledger_change_reader.go
164 lines (148 loc) · 5.22 KB
/
ledger_change_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package ingest
import (
"context"
"io"
"github.com/lantah/go/ingest/ledgerbackend"
"github.com/lantah/go/xdr"
)
// ChangeReader provides convenient, streaming access to a sequence of Changes.
type ChangeReader interface {
// Read should return the next `Change` in the leader. If there are no more
// changes left it should return an `io.EOF` error.
Read() (Change, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some changes available so reader can stop
// streaming them.
Close() error
}
// ledgerChangeReaderState defines possible states of LedgerChangeReader.
type ledgerChangeReaderState int
const (
// feeChangesState is active when LedgerChangeReader is reading fee changes.
feeChangesState ledgerChangeReaderState = iota
// metaChangesState is active when LedgerChangeReader is reading transaction meta changes.
metaChangesState
// evictionChangesState is active when LedgerChangeReader is reading ledger entry evictions.
evictionChangesState
// upgradeChanges is active when LedgerChangeReader is reading upgrade changes.
upgradeChangesState
)
// LedgerChangeReader is a ChangeReader which returns Changes from Gravity
// for a single ledger
type LedgerChangeReader struct {
*LedgerTransactionReader
state ledgerChangeReaderState
pending []Change
pendingIndex int
upgradeIndex int
}
// Ensure LedgerChangeReader implements ChangeReader
var _ ChangeReader = (*LedgerChangeReader)(nil)
// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger.
// Note that the returned LedgerChangeReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLedgerChangeReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerChangeReader, error) {
transactionReader, err := NewLedgerTransactionReader(ctx, backend, networkPassphrase, sequence)
if err != nil {
return nil, err
}
return &LedgerChangeReader{
LedgerTransactionReader: transactionReader,
state: feeChangesState,
}, nil
}
// NewLedgerChangeReaderFromLedgerCloseMeta constructs a new LedgerChangeReader instance bound to the given ledger.
// Note that the returned LedgerChangeReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase string, ledger xdr.LedgerCloseMeta) (*LedgerChangeReader, error) {
transactionReader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledger)
if err != nil {
return nil, err
}
return &LedgerChangeReader{
LedgerTransactionReader: transactionReader,
state: feeChangesState,
}, nil
}
// Read returns the next change in the stream.
// If there are no changes remaining io.EOF is returned as an error.
func (r *LedgerChangeReader) Read() (Change, error) {
// Changes within a ledger should be read in the following order:
// - fee changes of all transactions,
// - transaction meta changes of all transactions,
// - upgrade changes.
// Because a single transaction can introduce many changes we read all the
// changes from a single transaction and save them in r.pending.
// When Read() is called we stream pending changes first. We also call Read()
// recursively after adding some changes (what will return them from r.pending)
// to not duplicate the code.
if r.pendingIndex < len(r.pending) {
next := r.pending[r.pendingIndex]
r.pendingIndex++
if r.pendingIndex == len(r.pending) {
r.pendingIndex = 0
r.pending = r.pending[:0]
}
return next, nil
}
switch r.state {
case feeChangesState, metaChangesState:
tx, err := r.LedgerTransactionReader.Read()
if err != nil {
if err == io.EOF {
// If done streaming fee changes rewind to stream meta changes
if r.state == feeChangesState {
r.LedgerTransactionReader.Rewind()
}
r.state++
return r.Read()
}
return Change{}, err
}
switch r.state {
case feeChangesState:
r.pending = append(r.pending, tx.GetFeeChanges()...)
case metaChangesState:
metaChanges, err := tx.GetChanges()
if err != nil {
return Change{}, err
}
r.pending = append(r.pending, metaChanges...)
}
return r.Read()
case evictionChangesState:
entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries()
if err != nil {
return Change{}, err
}
changes := make([]Change, len(entries))
for i := range entries {
entry := entries[i]
// when a ledger entry is evicted it is removed from the ledger
changes[i] = Change{
Type: entry.Data.Type,
Pre: &entry,
Post: nil,
}
}
r.pending = append(r.pending, changes...)
r.state++
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes,
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
return r.Read()
}
}
return Change{}, io.EOF
}
// Close should be called when reading is finished.
func (r *LedgerChangeReader) Close() error {
r.pending = nil
return r.LedgerTransactionReader.Close()
}