-
Notifications
You must be signed in to change notification settings - Fork 496
/
ledger_buffer.go
248 lines (212 loc) · 8.1 KB
/
ledger_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package ledgerbackend
import (
"bytes"
"context"
"io"
"os"
"sync"
"time"
"github.com/pkg/errors"
"github.com/stellar/go/support/collections/heap"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/xdr"
)
type ledgerBatchObject struct {
payload []byte
startLedger int // Ledger sequence used as the priority for the priorityqueue.
}
type ledgerBuffer struct {
// Passed through from BufferedStorageBackend to control lifetime of ledgerBuffer instance
config BufferedStorageBackendConfig
dataStore datastore.DataStore
// context used to cancel workers within the ledgerBuffer
context context.Context
cancel context.CancelCauseFunc
wg sync.WaitGroup
// The pipes and data structures below help establish the ledgerBuffer invariant which is
// the number of tasks (both pending and in-flight) + len(ledgerQueue) + ledgerPriorityQueue.Len()
// is always less than or equal to the config.BufferSize
taskQueue chan uint32 // Buffer next object read
ledgerQueue chan []byte // Order corrected lcm batches
ledgerPriorityQueue *heap.Heap[ledgerBatchObject] // Priority is set to the sequence number
priorityQueueLock sync.Mutex
// Keep track of the ledgers to be processed and the next ordering
// the ledgers should be buffered
currentLedger uint32 // The current ledger that should be popped from ledgerPriorityQueue
nextTaskLedger uint32 // The next task ledger that should be added to taskQueue
ledgerRange Range
currentLedgerLock sync.RWMutex
}
func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBuffer, error) {
ctx, cancel := context.WithCancelCause(context.Background())
less := func(a, b ledgerBatchObject) bool {
return a.startLedger < b.startLedger
}
pq := heap.New(less, int(bsb.config.BufferSize))
ledgerBuffer := &ledgerBuffer{
config: bsb.config,
dataStore: bsb.dataStore,
taskQueue: make(chan uint32, bsb.config.BufferSize),
ledgerQueue: make(chan []byte, bsb.config.BufferSize),
ledgerPriorityQueue: pq,
currentLedger: ledgerRange.from,
nextTaskLedger: ledgerRange.from,
ledgerRange: ledgerRange,
context: ctx,
cancel: cancel,
}
// Start workers to read LCM files
ledgerBuffer.wg.Add(int(bsb.config.NumWorkers))
for i := uint32(0); i < bsb.config.NumWorkers; i++ {
go ledgerBuffer.worker(ctx)
}
// Upon initialization, the ledgerBuffer invariant is maintained because
// we create bsb.config.BufferSize tasks while the len(ledgerQueue) and ledgerPriorityQueue.Len() are 0.
// Effectively, this is len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize
// which enforces a limit of max tasks (both pending and in-flight) to be less than or equal to bsb.config.BufferSize.
// Note: when a task is in-flight it is no longer in the taskQueue
// but for easier conceptualization, len(taskQueue) can be interpreted as both pending and in-flight tasks
// where we assume the workers are empty and not processing any tasks.
for i := 0; i <= int(bsb.config.BufferSize); i++ {
ledgerBuffer.pushTaskQueue()
}
return ledgerBuffer, nil
}
func (lb *ledgerBuffer) pushTaskQueue() {
// In bounded mode, don't queue past the end ledger
if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
}
// sleepWithContext returns true upon sleeping without interruption from the context
func (lb *ledgerBuffer) sleepWithContext(ctx context.Context, d time.Duration) bool {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return false
case <-timer.C:
}
return true
}
func (lb *ledgerBuffer) worker(ctx context.Context) {
defer lb.wg.Done()
for {
select {
case <-ctx.Done():
return
case sequence := <-lb.taskQueue:
for attempt := uint32(0); attempt <= lb.config.RetryLimit; {
ledgerObject, err := lb.downloadLedgerObject(ctx, sequence)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// ledgerObject not found and unbounded
if !lb.ledgerRange.bounded {
if !lb.sleepWithContext(ctx, lb.config.RetryWait) {
return
}
continue
}
lb.cancel(errors.Wrapf(err, "ledger object containing sequence %v is missing", sequence))
return
}
// don't bother retrying if we've received the signal to shut down
if errors.Is(err, context.Canceled) {
return
}
if attempt == lb.config.RetryLimit {
err = errors.Wrapf(err, "maximum retries exceeded for downloading object containing sequence %v", sequence)
lb.cancel(err)
return
}
attempt++
if !lb.sleepWithContext(ctx, lb.config.RetryWait) {
return
}
continue
}
// When we store an object we still maintain the ledger buffer invariant because
// at this point the current task is finished and we add 1 ledger object to the priority queue.
// Thus, the number of tasks decreases by 1 and the priority queue length increases by 1.
// This keeps the overall total the same (<= BufferSize). As long as the the ledger buffer invariant
// was maintained in the previous state, it is still maintained during this state transition.
lb.storeObject(ledgerObject, sequence)
break
}
}
}
}
func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) {
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
reader, err := lb.dataStore.GetFile(ctx, objectKey)
if err != nil {
return nil, err
}
defer reader.Close()
objectBytes, err := io.ReadAll(reader)
if err != nil {
return nil, errors.Wrapf(err, "failed reading file: %s", objectKey)
}
return objectBytes, nil
}
func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
lb.priorityQueueLock.Lock()
defer lb.priorityQueueLock.Unlock()
lb.currentLedgerLock.Lock()
defer lb.currentLedgerLock.Unlock()
lb.ledgerPriorityQueue.Push(ledgerBatchObject{
payload: ledgerObject,
startLedger: int(sequence),
})
// Check if the nextLedger is the next item in the ledgerPriorityQueue
// The ledgerBuffer invariant is maintained here because items are transferred from the ledgerPriorityQueue to the ledgerQueue.
// Thus the overall sum of ledgerPriorityQueue.Len() + len(lb.ledgerQueue) remains the same.
for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) {
item := lb.ledgerPriorityQueue.Pop()
lb.ledgerQueue <- item.payload
lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile
}
}
func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) (xdr.LedgerCloseMetaBatch, error) {
for {
select {
case <-lb.context.Done():
return xdr.LedgerCloseMetaBatch{}, context.Cause(lb.context)
case <-ctx.Done():
return xdr.LedgerCloseMetaBatch{}, ctx.Err()
case compressedBinary := <-lb.ledgerQueue:
// The ledger buffer invariant is maintained here because
// we create an extra task when consuming one item from the ledger queue.
// Thus len(ledgerQueue) decreases by 1 and the number of tasks increases by 1.
// The overall sum below remains the same:
// len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize
lb.pushTaskQueue()
lcmBatch := xdr.LedgerCloseMetaBatch{}
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &lcmBatch)
_, err := decoder.ReadFrom(bytes.NewReader(compressedBinary))
if err != nil {
return xdr.LedgerCloseMetaBatch{}, err
}
return lcmBatch, nil
}
}
}
func (lb *ledgerBuffer) getLatestLedgerSequence() (uint32, error) {
lb.currentLedgerLock.Lock()
defer lb.currentLedgerLock.Unlock()
if lb.currentLedger == lb.ledgerRange.from {
return 0, nil
}
// Subtract 1 to get the latest ledger in buffer
return lb.currentLedger - 1, nil
}
func (lb *ledgerBuffer) close() {
lb.cancel(context.Canceled)
// wait for all workers to finish terminating
lb.wg.Wait()
}