-
Notifications
You must be signed in to change notification settings - Fork 496
/
ledger_source.go
128 lines (109 loc) · 3.36 KB
/
ledger_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package ledger
import (
"sync"
"time"
)
// Source exposes two helpers methods to help you find out the current
// ledger and yield every time there is a new ledger. Call `Close` when
// source is no longer used.
type Source interface {
CurrentLedger() uint32
NextLedger(currentSequence uint32) chan uint32
Close()
}
// HistoryDBSource utility struct to pass the SSE update frequency and a
// function to get the current ledger state.
type HistoryDBSource struct {
updateFrequency time.Duration
state *State
closedLock sync.Mutex
closed bool
}
// NewHistoryDBSource constructs a new instance of HistoryDBSource
func NewHistoryDBSource(updateFrequency time.Duration, state *State) *HistoryDBSource {
return &HistoryDBSource{
updateFrequency: updateFrequency,
state: state,
closedLock: sync.Mutex{},
}
}
// CurrentLedger returns the current ledger.
func (source *HistoryDBSource) CurrentLedger() uint32 {
return source.state.CurrentStatus().ExpHistoryLatest
}
// NextLedger returns a channel which yields every time there is a new ledger with a sequence number larger than currentSequence.
func (source *HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
// Make sure this is buffered channel of size 1. Otherwise, the go routine below
// will never return if `newLedgers` channel is not read. From Effective Go:
// > If the channel is unbuffered, the sender blocks until the receiver has received the value.
newLedgers := make(chan uint32, 1)
go func() {
for {
if source.updateFrequency > 0 {
time.Sleep(source.updateFrequency)
}
source.closedLock.Lock()
closed := source.closed
source.closedLock.Unlock()
if closed {
return
}
currentLedgerState := source.state.CurrentStatus()
if currentLedgerState.ExpHistoryLatest > currentSequence {
newLedgers <- currentLedgerState.ExpHistoryLatest
return
}
}
}()
return newLedgers
}
// Close closes the internal go routines.
func (source *HistoryDBSource) Close() {
source.closedLock.Lock()
defer source.closedLock.Unlock()
source.closed = true
}
// TestingSource is helper struct which implements the LedgerSource
// interface.
type TestingSource struct {
currentLedger uint32
newLedgers chan uint32
lock *sync.RWMutex
}
// NewTestingSource returns a TestingSource.
func NewTestingSource(currentLedger uint32) *TestingSource {
return &TestingSource{
currentLedger: currentLedger,
newLedgers: make(chan uint32),
lock: &sync.RWMutex{},
}
}
// CurrentLedger returns the current ledger.
func (source *TestingSource) CurrentLedger() uint32 {
source.lock.RLock()
defer source.lock.RUnlock()
return source.currentLedger
}
// AddLedger adds a new sequence to the newLedgers channel. AddLedger()
// will block until the new sequence is read
func (source *TestingSource) AddLedger(nextSequence uint32) {
source.newLedgers <- nextSequence
}
// NextLedger returns a channel which yields every time there is a new ledger.
func (source *TestingSource) NextLedger(currentSequence uint32) chan uint32 {
response := make(chan uint32, 1)
go func() {
for {
nextLedger := <-source.newLedgers
if nextLedger > source.currentLedger {
source.lock.Lock()
defer source.lock.Unlock()
source.currentLedger = nextLedger
response <- nextLedger
return
}
}
}()
return response
}
func (source *TestingSource) Close() {}