forked from hyperledger/fabric
/
msgs.go
268 lines (226 loc) · 6.45 KB
/
msgs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package msgstore
import (
"sync"
"time"
"github.com/hyperledger/fabric/gossip/common"
)
var noopLock = func() {}
// Noop is a function that doesn't do anything
func Noop(_ interface{}) {
}
// invalidationTrigger is invoked on each message that was invalidated because of a message addition
// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations
// then the invalidation trigger on 0 was called when 1 was added.
type invalidationTrigger func(message interface{})
// NewMessageStore returns a new MessageStore with the message replacing
// policy and invalidation trigger passed.
func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore {
return newMsgStore(pol, trigger)
}
// NewMessageStoreExpirable returns a new MessageStore with the message replacing
// policy and invalidation trigger passed. It supports old message expiration after msgTTL, during expiration first external
// lock taken, expiration callback invoked and external lock released. Callback and external lock can be nil.
func NewMessageStoreExpirable(pol common.MessageReplacingPolicy, trigger invalidationTrigger, msgTTL time.Duration, externalLock func(), externalUnlock func(), externalExpire func(interface{})) MessageStore {
store := newMsgStore(pol, trigger)
store.msgTTL = msgTTL
if externalLock != nil {
store.externalLock = externalLock
}
if externalUnlock != nil {
store.externalUnlock = externalUnlock
}
if externalExpire != nil {
store.expireMsgCallback = externalExpire
}
go store.expirationRoutine()
return store
}
func newMsgStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) *messageStoreImpl {
return &messageStoreImpl{
pol: pol,
messages: make([]*msg, 0),
invTrigger: trigger,
externalLock: noopLock,
externalUnlock: noopLock,
expireMsgCallback: func(m interface{}) {},
expiredCount: 0,
doneCh: make(chan struct{}),
}
}
// MessageStore adds messages to an internal buffer.
// When a message is received, it might:
// - Be added to the buffer
// - Discarded because of some message already in the buffer (invalidated)
// - Make a message already in the buffer to be discarded (invalidates)
// When a message is invalidated, the invalidationTrigger is invoked on that message.
type MessageStore interface {
// add adds a message to the store
// returns true or false whether the message was added to the store
Add(msg interface{}) bool
// Checks if message is valid for insertion to store
// returns true or false whether the message can be added to the store
CheckValid(msg interface{}) bool
// size returns the amount of messages in the store
Size() int
// get returns all messages in the store
Get() []interface{}
// Stop all associated go routines
Stop()
// Purge purges all messages that are accepted by
// the given predicate
Purge(func(interface{}) bool)
}
type messageStoreImpl struct {
pol common.MessageReplacingPolicy
lock sync.RWMutex
messages []*msg
invTrigger invalidationTrigger
msgTTL time.Duration
expiredCount int
externalLock func()
externalUnlock func()
expireMsgCallback func(msg interface{})
doneCh chan struct{}
stopOnce sync.Once
}
type msg struct {
data interface{}
created time.Time
expired bool
}
// add adds a message to the store
func (s *messageStoreImpl) Add(message interface{}) bool {
s.lock.Lock()
defer s.lock.Unlock()
n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
switch s.pol(message, m.data) {
case common.MessageInvalidated:
return false
case common.MessageInvalidates:
s.invTrigger(m.data)
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
}
}
s.messages = append(s.messages, &msg{data: message, created: time.Now()})
return true
}
func (s *messageStoreImpl) Purge(shouldBePurged func(interface{}) bool) {
shouldMsgBePurged := func(m *msg) bool {
return shouldBePurged(m.data)
}
if !s.isPurgeNeeded(shouldMsgBePurged) {
return
}
s.lock.Lock()
defer s.lock.Unlock()
n := len(s.messages)
for i := 0; i < n; i++ {
if !shouldMsgBePurged(s.messages[i]) {
continue
}
s.invTrigger(s.messages[i].data)
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
}
}
// Checks if message is valid for insertion to store
func (s *messageStoreImpl) CheckValid(message interface{}) bool {
s.lock.RLock()
defer s.lock.RUnlock()
for _, m := range s.messages {
if s.pol(message, m.data) == common.MessageInvalidated {
return false
}
}
return true
}
// size returns the amount of messages in the store
func (s *messageStoreImpl) Size() int {
s.lock.RLock()
defer s.lock.RUnlock()
return len(s.messages) - s.expiredCount
}
// get returns all messages in the store
func (s *messageStoreImpl) Get() []interface{} {
res := make([]interface{}, 0)
s.lock.RLock()
defer s.lock.RUnlock()
for _, msg := range s.messages {
if !msg.expired {
res = append(res, msg.data)
}
}
return res
}
func (s *messageStoreImpl) expireMessages() {
s.externalLock()
s.lock.Lock()
defer s.lock.Unlock()
defer s.externalUnlock()
n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
if !m.expired {
if time.Since(m.created) > s.msgTTL {
m.expired = true
s.expireMsgCallback(m.data)
s.expiredCount++
}
} else {
if time.Since(m.created) > (s.msgTTL * 2) {
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
s.expiredCount--
}
}
}
}
func (s *messageStoreImpl) isPurgeNeeded(shouldBePurged func(*msg) bool) bool {
s.lock.RLock()
defer s.lock.RUnlock()
for _, m := range s.messages {
if shouldBePurged(m) {
return true
}
}
return false
}
func (s *messageStoreImpl) expirationRoutine() {
for {
select {
case <-s.doneCh:
return
case <-time.After(s.expirationCheckInterval()):
hasMessageExpired := func(m *msg) bool {
if !m.expired && time.Since(m.created) > s.msgTTL {
return true
} else if time.Since(m.created) > (s.msgTTL * 2) {
return true
}
return false
}
if s.isPurgeNeeded(hasMessageExpired) {
s.expireMessages()
}
}
}
}
func (s *messageStoreImpl) Stop() {
stopFunc := func() {
close(s.doneCh)
}
s.stopOnce.Do(stopFunc)
}
func (s *messageStoreImpl) expirationCheckInterval() time.Duration {
return s.msgTTL / 100
}