forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
278 lines (240 loc) · 8.13 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"time"
"github.com/op/go-logging"
)
var logger *logging.Logger // package-level logger
func init() {
logger = logging.MustGetLogger("consensus/util/events")
}
// Event is a type meant to clearly convey that the return type or parameter to a function will be supplied to/from an events.Manager
type Event interface{}
// Receiver is a consumer of events, ProcessEvent will be called serially
// as events arrive
type Receiver interface {
// ProcessEvent delivers an event to the Receiver, if it returns non-nil, the return is the next processed event
ProcessEvent(e Event) Event
}
// ------------------------------------------------------------
//
// Threaded object
//
// ------------------------------------------------------------
// threaded holds an exit channel to allow threads to break from a select
type threaded struct {
exit chan struct{}
}
// halt tells the threaded object's thread to exit
func (t *threaded) Halt() {
select {
case <-t.exit:
logger.Warning("Attempted to halt a threaded object twice")
default:
close(t.exit)
}
}
// ------------------------------------------------------------
//
// Event Manager
//
// ------------------------------------------------------------
// Manager provides a serialized interface for submitting events to
// a Receiver on the other side of the queue
type Manager interface {
Inject(Event) // A temporary interface to allow the event manager thread to skip the queue
Queue() chan<- Event // Get a write-only reference to the queue, to submit events
SetReceiver(Receiver) // Set the target to route events to
Start() // Starts the Manager thread TODO, these thread management things should probably go away
Halt() // Stops the Manager thread
}
// managerImpl is an implementation of Manger
type managerImpl struct {
threaded
receiver Receiver
events chan Event
}
// NewManagerImpl creates an instance of managerImpl
func NewManagerImpl() Manager {
return &managerImpl{
events: make(chan Event),
threaded: threaded{make(chan struct{})},
}
}
// SetReceiver sets the destination for events
func (em *managerImpl) SetReceiver(receiver Receiver) {
em.receiver = receiver
}
// Start creates the go routine necessary to deliver events
func (em *managerImpl) Start() {
go em.eventLoop()
}
// queue returns a write only reference to the event queue
func (em *managerImpl) Queue() chan<- Event {
return em.events
}
// SendEvent performs the event loop on a receiver to completion
func SendEvent(receiver Receiver, event Event) {
next := event
for {
// If an event returns something non-nil, then process it as a new event
next = receiver.ProcessEvent(next)
if next == nil {
break
}
}
}
// Inject can only safely be called by the managerImpl thread itself, it skips the queue
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil {
SendEvent(em.receiver, event)
}
}
// eventLoop is where the event thread loops, delivering events
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
case <-em.exit:
logger.Debug("eventLoop told to exit")
return
}
}
}
// ------------------------------------------------------------
//
// Event Timer
//
// ------------------------------------------------------------
// Timer is an interface for managing time driven events
// the special contract Timer gives which a traditional golang
// timer does not, is that if the event thread calls stop, or reset
// then even if the timer has already fired, the event will not be
// delivered to the event queue
type Timer interface {
SoftReset(duration time.Duration, event Event) // start a new countdown, only if one is not already started
Reset(duration time.Duration, event Event) // start a new countdown, clear any pending events
Stop() // stop the countdown, clear any pending events
Halt() // Stops the Timer thread
}
// TimerFactory abstracts the creation of Timers, as they may
// need to be mocked for testing
type TimerFactory interface {
CreateTimer() Timer // Creates an Timer which is stopped
}
// TimerFactoryImpl implements the TimerFactory
type timerFactoryImpl struct {
manager Manager // The Manager to use in constructing the event timers
}
// NewTimerFactoryImpl creates a new TimerFactory for the given Manager
func NewTimerFactoryImpl(manager Manager) TimerFactory {
return &timerFactoryImpl{manager}
}
// CreateTimer creates a new timer which deliver events to the Manager for this factory
func (etf *timerFactoryImpl) CreateTimer() Timer {
return newTimerImpl(etf.manager)
}
// timerStart is used to deliver the start request to the eventTimer thread
type timerStart struct {
hard bool // Whether to reset the timer if it is running
event Event // What event to push onto the event queue
duration time.Duration // How long to wait before sending the event
}
// timerImpl is an implementation of Timer
type timerImpl struct {
threaded // Gives us the exit chan
timerChan <-chan time.Time // When non-nil, counts down to preparing to do the event
startChan chan *timerStart // Channel to deliver the timer start events to the service go routine
stopChan chan struct{} // Channel to deliver the timer stop events to the service go routine
manager Manager // The event manager to deliver the event to after timer expiration
}
// newTimer creates a new instance of timerImpl
func newTimerImpl(manager Manager) Timer {
et := &timerImpl{
startChan: make(chan *timerStart),
stopChan: make(chan struct{}),
threaded: threaded{make(chan struct{})},
manager: manager,
}
go et.loop()
return et
}
// softReset tells the timer to start a new countdown, only if it is not currently counting down
// this will not clear any pending events
func (et *timerImpl) SoftReset(timeout time.Duration, event Event) {
et.startChan <- &timerStart{
duration: timeout,
event: event,
hard: false,
}
}
// reset tells the timer to start counting down from a new timeout, this also clears any pending events
func (et *timerImpl) Reset(timeout time.Duration, event Event) {
et.startChan <- &timerStart{
duration: timeout,
event: event,
hard: true,
}
}
// stop tells the timer to stop, and not to deliver any pending events
func (et *timerImpl) Stop() {
et.stopChan <- struct{}{}
}
// loop is where the timer thread lives, looping
func (et *timerImpl) loop() {
var eventDestChan chan<- Event
var event Event
for {
// A little state machine, relying on the fact that nil channels will block on read/write indefinitely
select {
case start := <-et.startChan:
if et.timerChan != nil {
if start.hard {
logger.Debug("Resetting a running timer")
} else {
continue
}
}
logger.Debug("Starting timer")
et.timerChan = time.After(start.duration)
if eventDestChan != nil {
logger.Debug("Timer cleared pending event")
}
event = start.event
eventDestChan = nil
case <-et.stopChan:
if et.timerChan == nil && eventDestChan == nil {
logger.Debug("Attempting to stop an unfired idle timer")
}
et.timerChan = nil
logger.Debug("Stopping timer")
if eventDestChan != nil {
logger.Debug("Timer cleared pending event")
}
eventDestChan = nil
event = nil
case <-et.timerChan:
logger.Debug("Event timer fired")
et.timerChan = nil
eventDestChan = et.manager.Queue()
case eventDestChan <- event:
logger.Debug("Timer event delivered")
eventDestChan = nil
case <-et.exit:
logger.Debug("Halting timer")
return
}
}
}