-
Notifications
You must be signed in to change notification settings - Fork 179
/
tracker.go
166 lines (147 loc) · 5.24 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package tracker
import (
"unsafe"
"go.uber.org/atomic"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
)
// NewestQCTracker is a helper structure which keeps track of the newest QC(by view)
// in concurrency safe way.
type NewestQCTracker struct {
newestQC *atomic.UnsafePointer
}
func NewNewestQCTracker() *NewestQCTracker {
tracker := &NewestQCTracker{
newestQC: atomic.NewUnsafePointer(unsafe.Pointer(nil)),
}
return tracker
}
// Track updates local state of NewestQC if the provided instance is newer(by view)
// Concurrently safe
func (t *NewestQCTracker) Track(qc *flow.QuorumCertificate) bool {
// to record the newest value that we have ever seen we need to use loop
// with CAS atomic operation to make sure that we always write the latest value
// in case of shared access to updated value.
for {
// take a snapshot
newestQC := t.NewestQC()
// verify that our update makes sense
if newestQC != nil && newestQC.View >= qc.View {
return false
}
// attempt to install new value, repeat in case of shared update.
if t.newestQC.CompareAndSwap(unsafe.Pointer(newestQC), unsafe.Pointer(qc)) {
return true
}
}
}
// NewestQC returns the newest QC(by view) tracked.
// Concurrently safe.
func (t *NewestQCTracker) NewestQC() *flow.QuorumCertificate {
return (*flow.QuorumCertificate)(t.newestQC.Load())
}
// NewestTCTracker is a helper structure which keeps track of the newest TC(by view)
// in concurrency safe way.
type NewestTCTracker struct {
newestTC *atomic.UnsafePointer
}
func NewNewestTCTracker() *NewestTCTracker {
tracker := &NewestTCTracker{
newestTC: atomic.NewUnsafePointer(unsafe.Pointer(nil)),
}
return tracker
}
// Track updates local state of NewestTC if the provided instance is newer(by view)
// Concurrently safe.
func (t *NewestTCTracker) Track(tc *flow.TimeoutCertificate) bool {
// to record the newest value that we have ever seen we need to use loop
// with CAS atomic operation to make sure that we always write the latest value
// in case of shared access to updated value.
for {
// take a snapshot
newestTC := t.NewestTC()
// verify that our update makes sense
if newestTC != nil && newestTC.View >= tc.View {
return false
}
// attempt to install new value, repeat in case of shared update.
if t.newestTC.CompareAndSwap(unsafe.Pointer(newestTC), unsafe.Pointer(tc)) {
return true
}
}
}
// NewestTC returns the newest TC(by view) tracked.
// Concurrently safe.
func (t *NewestTCTracker) NewestTC() *flow.TimeoutCertificate {
return (*flow.TimeoutCertificate)(t.newestTC.Load())
}
// NewestBlockTracker is a helper structure which keeps track of the newest block (by view)
// in concurrency safe way.
type NewestBlockTracker struct {
newestBlock *atomic.UnsafePointer
}
func NewNewestBlockTracker() *NewestBlockTracker {
tracker := &NewestBlockTracker{
newestBlock: atomic.NewUnsafePointer(unsafe.Pointer(nil)),
}
return tracker
}
// Track updates local state of newestBlock if the provided instance is newer (by view)
// Concurrently safe.
func (t *NewestBlockTracker) Track(block *model.Block) bool {
// to record the newest value that we have ever seen we need to use loop
// with CAS atomic operation to make sure that we always write the latest value
// in case of shared access to updated value.
for {
// take a snapshot
newestBlock := t.NewestBlock()
// verify that our update makes sense
if newestBlock != nil && newestBlock.View >= block.View {
return false
}
// attempt to install new value, repeat in case of shared update.
if t.newestBlock.CompareAndSwap(unsafe.Pointer(newestBlock), unsafe.Pointer(block)) {
return true
}
}
}
// NewestBlock returns the newest block (by view) tracked.
// Concurrently safe.
func (t *NewestBlockTracker) NewestBlock() *model.Block {
return (*model.Block)(t.newestBlock.Load())
}
// NewestPartialTcTracker tracks the newest partial TC (by view) in a concurrency safe way.
type NewestPartialTcTracker struct {
newestPartialTc *atomic.UnsafePointer
}
func NewNewestPartialTcTracker() *NewestPartialTcTracker {
tracker := &NewestPartialTcTracker{
newestPartialTc: atomic.NewUnsafePointer(unsafe.Pointer(nil)),
}
return tracker
}
// Track updates local state of newestPartialTc if the provided instance is newer (by view)
// Concurrently safe.
func (t *NewestPartialTcTracker) Track(partialTc *hotstuff.PartialTcCreated) bool {
// To record the newest value that we have ever seen, we need to use loop
// with CAS atomic operation to make sure that we always write the latest value
// in case of shared access to updated value.
for {
// take a snapshot
newestPartialTc := t.NewestPartialTc()
// verify that our partial TC is from a newer view
if newestPartialTc != nil && newestPartialTc.View >= partialTc.View {
return false
}
// attempt to install new value, repeat in case of shared update.
if t.newestPartialTc.CompareAndSwap(unsafe.Pointer(newestPartialTc), unsafe.Pointer(partialTc)) {
return true
}
}
}
// NewestPartialTc returns the newest partial TC (by view) tracked.
// Concurrently safe.
func (t *NewestPartialTcTracker) NewestPartialTc() *hotstuff.PartialTcCreated {
return (*hotstuff.PartialTcCreated)(t.newestPartialTc.Load())
}