forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
339 lines (286 loc) · 10.2 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package producer
import (
"fmt"
"sync"
"time"
pb "github.com/hyperledger/fabric/protos/peer"
)
//---- event hub framework ----
//handlerListi uses map to implement a set of handlers. use mutex to access
//the map. Note that we don't have lock/unlock wrapper methods as the lock
//of handler list has to be done under the eventProcessor lock. See
//registerHandler, deRegisterHandler. register/deRegister methods
//will be called only when a new consumer chat starts/ends respectively
//and the big lock should have no performance impact
//
type handlerList interface {
add(ie *pb.Interest, h *handler) (bool, error)
del(ie *pb.Interest, h *handler) (bool, error)
foreach(ie *pb.Event, action func(h *handler))
}
type genericHandlerList struct {
sync.RWMutex
handlers map[*handler]bool
}
type chaincodeHandlerList struct {
sync.RWMutex
handlers map[string]map[string]map[*handler]bool
}
func (hl *chaincodeHandlerList) add(ie *pb.Interest, h *handler) (bool, error) {
hl.Lock()
defer hl.Unlock()
//chaincode registration info must be non-nil
if ie.GetChaincodeRegInfo() == nil {
return false, fmt.Errorf("chaincode information not provided for registering")
}
//chaincode registration info must be for a non-empty chaincode ID (even if the chaincode does not exist)
if ie.GetChaincodeRegInfo().ChaincodeId == "" {
return false, fmt.Errorf("chaincode ID not provided for registering")
}
//is there a event type map for the chaincode
emap, ok := hl.handlers[ie.GetChaincodeRegInfo().ChaincodeId]
if !ok {
emap = make(map[string]map[*handler]bool)
hl.handlers[ie.GetChaincodeRegInfo().ChaincodeId] = emap
}
//create handler map if this is the first handler for the type
var handlerMap map[*handler]bool
if handlerMap, _ = emap[ie.GetChaincodeRegInfo().EventName]; handlerMap == nil {
handlerMap = make(map[*handler]bool)
emap[ie.GetChaincodeRegInfo().EventName] = handlerMap
} else if _, ok = handlerMap[h]; ok {
return false, fmt.Errorf("handler exists for event type")
}
//the handler is added to the map
handlerMap[h] = true
return true, nil
}
func (hl *chaincodeHandlerList) del(ie *pb.Interest, h *handler) (bool, error) {
hl.Lock()
defer hl.Unlock()
//chaincode registration info must be non-nil
if ie.GetChaincodeRegInfo() == nil {
return false, fmt.Errorf("chaincode information not provided for de-registering")
}
//chaincode registration info must be for a non-empty chaincode ID (even if the chaincode does not exist)
if ie.GetChaincodeRegInfo().ChaincodeId == "" {
return false, fmt.Errorf("chaincode ID not provided for de-registering")
}
//if there's no event type map, nothing to do
emap, ok := hl.handlers[ie.GetChaincodeRegInfo().ChaincodeId]
if !ok {
return false, fmt.Errorf("chaincode ID not registered")
}
//if there are no handlers for the event type, nothing to do
var handlerMap map[*handler]bool
if handlerMap, _ = emap[ie.GetChaincodeRegInfo().EventName]; handlerMap == nil {
return false, fmt.Errorf("event name %s not registered for chaincode ID %s", ie.GetChaincodeRegInfo().EventName, ie.GetChaincodeRegInfo().ChaincodeId)
} else if _, ok = handlerMap[h]; !ok {
//the handler is not registered for the event type
return false, fmt.Errorf("handler not registered for event name %s for chaincode ID %s", ie.GetChaincodeRegInfo().EventName, ie.GetChaincodeRegInfo().ChaincodeId)
}
//remove the handler from the map
delete(handlerMap, h)
//if the last handler has been removed from handler map for a chaincode's event,
//remove the event map.
//if the last map of events have been removed for the chaincode UUID
//remove the chaincode UUID map
if len(handlerMap) == 0 {
delete(emap, ie.GetChaincodeRegInfo().EventName)
if len(emap) == 0 {
delete(hl.handlers, ie.GetChaincodeRegInfo().ChaincodeId)
}
}
return true, nil
}
func (hl *chaincodeHandlerList) foreach(e *pb.Event, action func(h *handler)) {
hl.Lock()
defer hl.Unlock()
//if there's no chaincode event in the event... nothing to do (why was this event sent ?)
if e.GetChaincodeEvent() == nil || e.GetChaincodeEvent().ChaincodeId == "" {
return
}
//get the event map for the chaincode
if emap := hl.handlers[e.GetChaincodeEvent().ChaincodeId]; emap != nil {
//get the handler map for the event
if handlerMap := emap[e.GetChaincodeEvent().EventName]; handlerMap != nil {
for h := range handlerMap {
action(h)
}
}
//send to handlers who want all events from the chaincode, but only if
//EventName is not already "" (chaincode should NOT send nameless events though)
if e.GetChaincodeEvent().EventName != "" {
if handlerMap := emap[""]; handlerMap != nil {
for h := range handlerMap {
action(h)
}
}
}
}
}
func (hl *genericHandlerList) add(ie *pb.Interest, h *handler) (bool, error) {
hl.Lock()
if _, ok := hl.handlers[h]; ok {
hl.Unlock()
return false, fmt.Errorf("handler exists for event type")
}
hl.handlers[h] = true
hl.Unlock()
return true, nil
}
func (hl *genericHandlerList) del(ie *pb.Interest, h *handler) (bool, error) {
hl.Lock()
if _, ok := hl.handlers[h]; !ok {
hl.Unlock()
return false, fmt.Errorf("handler does not exist for event type")
}
delete(hl.handlers, h)
hl.Unlock()
return true, nil
}
func (hl *genericHandlerList) foreach(e *pb.Event, action func(h *handler)) {
hl.Lock()
for h := range hl.handlers {
action(h)
}
hl.Unlock()
}
//eventProcessor has a map of event type to handlers interested in that
//event type. start() kicks of the event processor where it waits for Events
//from producers. We could easily generalize the one event handling loop to one
//per handlerMap if necessary.
//
type eventProcessor struct {
sync.RWMutex
eventConsumers map[pb.EventType]handlerList
//we could generalize this with mutiple channels each with its own size
eventChannel chan *pb.Event
//timeout duration for producer to send an event.
//if < 0, if buffer full, unblocks immediately and not send
//if 0, if buffer full, will block and guarantee the event will be sent out
//if > 0, if buffer full, blocks till timeout
timeout time.Duration
}
//global eventProcessor singleton created by initializeEvents. Openchain producers
//send events simply over a reentrant static method
var gEventProcessor *eventProcessor
func (ep *eventProcessor) start() {
logger.Info("Event processor started")
for {
//wait for event
e := <-ep.eventChannel
var hl handlerList
eType := getMessageType(e)
ep.Lock()
if hl, _ = ep.eventConsumers[eType]; hl == nil {
logger.Errorf("Event of type %s does not exist", eType)
ep.Unlock()
continue
}
//lock the handler map lock
ep.Unlock()
hl.foreach(e, func(h *handler) {
if e.Event != nil {
h.SendMessage(e)
}
})
}
}
//initialize and start
func initializeEvents(bufferSize uint, tout time.Duration) {
if gEventProcessor != nil {
panic("should not be called twice")
}
gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, bufferSize), timeout: tout}
addInternalEventTypes()
//start the event processor
go gEventProcessor.start()
}
//AddEventType supported event
func AddEventType(eventType pb.EventType) error {
gEventProcessor.Lock()
logger.Debugf("Registering %s", pb.EventType_name[int32(eventType)])
if _, ok := gEventProcessor.eventConsumers[eventType]; ok {
gEventProcessor.Unlock()
return fmt.Errorf("event type exists %s", pb.EventType_name[int32(eventType)])
}
switch eventType {
case pb.EventType_BLOCK:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
case pb.EventType_CHAINCODE:
gEventProcessor.eventConsumers[eventType] = &chaincodeHandlerList{handlers: make(map[string]map[string]map[*handler]bool)}
case pb.EventType_REJECTION:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
}
gEventProcessor.Unlock()
return nil
}
func registerHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("registerHandler %s", ie.EventType)
gEventProcessor.Lock()
defer gEventProcessor.Unlock()
if hl, ok := gEventProcessor.eventConsumers[ie.EventType]; !ok {
return fmt.Errorf("event type %s does not exist", ie.EventType)
} else if _, err := hl.add(ie, h); err != nil {
return fmt.Errorf("error registering handler for %s: %s", ie.EventType, err)
}
return nil
}
func deRegisterHandler(ie *pb.Interest, h *handler) error {
logger.Debugf("deRegisterHandler %s", ie.EventType)
gEventProcessor.Lock()
defer gEventProcessor.Unlock()
if hl, ok := gEventProcessor.eventConsumers[ie.EventType]; !ok {
return fmt.Errorf("event type %s does not exist", ie.EventType)
} else if _, err := hl.del(ie, h); err != nil {
return fmt.Errorf("error deregistering handler for %s: %s", ie.EventType, err)
}
return nil
}
//------------- producer API's -------------------------------
//Send sends the event to interested consumers
func Send(e *pb.Event) error {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
if e.Event == nil {
logger.Error("event not set")
return fmt.Errorf("event not set")
}
if gEventProcessor == nil {
logger.Debugf("Event processor is nil")
return nil
}
if gEventProcessor.timeout < 0 {
logger.Debugf("Event processor timeout < 0")
select {
case gEventProcessor.eventChannel <- e:
default:
return fmt.Errorf("could not send the blocking event")
}
} else if gEventProcessor.timeout == 0 {
logger.Debugf("Event processor timeout = 0")
gEventProcessor.eventChannel <- e
} else {
logger.Debugf("Event processor timeout > 0")
select {
case gEventProcessor.eventChannel <- e:
case <-time.After(gEventProcessor.timeout):
return fmt.Errorf("could not send the blocking event")
}
}
logger.Debugf("Event sent successfully")
return nil
}