forked from iotaledger/hive.go
/
event.go
144 lines (117 loc) · 4.51 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package event
import (
"go.uber.org/atomic"
"github.com/izuc/zipp.foundation/core/debug"
"github.com/izuc/zipp.foundation/core/generics/orderedmap"
"github.com/izuc/zipp.foundation/core/workerpool"
)
// Event represents an object that is triggered to notify code of "interesting updates" that may affect its behavior.
type Event[T any] struct {
beforeHooks *orderedmap.OrderedMap[uint64, func(T)]
hooks *orderedmap.OrderedMap[uint64, func(T)]
afterHooks *orderedmap.OrderedMap[uint64, func(T)]
asyncHandlers *orderedmap.OrderedMap[uint64, *handler[T]]
}
// New creates a new Event.
func New[T any]() (newEvent *Event[T]) {
return &Event[T]{
beforeHooks: orderedmap.New[uint64, func(T)](),
hooks: orderedmap.New[uint64, func(T)](),
afterHooks: orderedmap.New[uint64, func(T)](),
asyncHandlers: orderedmap.New[uint64, *handler[T]](),
}
}
// Attach allows to register a Closure that is executed asynchronously when the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) Attach(closure *Closure[T], triggerMaxCount ...uint64) {
if closure == nil {
return
}
// By default, we use the global worker pool.
e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), Loop))
}
// AttachWithWorkerPool allows to register a Closure that is executed asynchronously in the specified worker pool when the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) AttachWithWorkerPool(closure *Closure[T], wp *workerpool.UnboundedWorkerPool, triggerMaxCount ...uint64) {
if closure == nil {
return
}
e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), wp))
}
// HookBefore allows to register a Closure that is executed before the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) HookBefore(closure *Closure[T], triggerMaxCount ...uint64) {
if closure == nil {
return
}
e.beforeHooks.Set(closure.ID, e.callbackFromClosure(closure, triggerMaxCount...))
}
// Hook allows to register a Closure that is executed when the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) Hook(closure *Closure[T], triggerMaxCount ...uint64) {
if closure == nil {
return
}
e.hooks.Set(closure.ID, e.callbackFromClosure(closure, triggerMaxCount...))
}
// HookAfter allows to register a Closure that is executed after the Event triggered.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) HookAfter(closure *Closure[T], triggerMaxCount ...uint64) {
if closure == nil {
return
}
e.afterHooks.Set(closure.ID, e.callbackFromClosure(closure, triggerMaxCount...))
}
// Detach allows to unregister a Closure that was previously registered.
func (e *Event[T]) Detach(closure *Closure[T]) {
if closure == nil {
return
}
e.detachID(closure.ID)
}
// DetachAll removes all registered callbacks.
func (e *Event[T]) DetachAll() {
e.beforeHooks.Clear()
e.hooks.Clear()
e.afterHooks.Clear()
e.asyncHandlers.Clear()
}
// Trigger calls the registered callbacks with the given parameters.
func (e *Event[T]) Trigger(event T) {
for _, queue := range []*orderedmap.OrderedMap[uint64, func(T)]{e.beforeHooks, e.hooks, e.afterHooks} {
queue.ForEach(func(closureID uint64, callback func(T)) bool {
callback(event)
return true
})
}
e.triggerEventHandlers(event)
}
func (e *Event[T]) triggerEventHandlers(event T) {
e.asyncHandlers.ForEach(func(closureID uint64, h *handler[T]) bool {
var closureStackTrace string
if debug.GetEnabled() {
closureStackTrace = debug.ClosureStackTrace(h.callback)
}
h.wp.Submit(func() { h.callback(event) }, closureStackTrace)
return true
})
}
func (e *Event[T]) callbackFromClosure(closure *Closure[T], triggerMaxCount ...uint64) func(T) {
callbackFunc := closure.Function
if len(triggerMaxCount) > 0 && triggerMaxCount[0] > 0 {
triggerCount := atomic.NewUint64(0)
callbackFunc = func(event T) {
closure.Function(event)
if triggerCount.Inc() >= triggerMaxCount[0] {
e.detachID(closure.ID)
}
}
}
return callbackFunc
}
func (e *Event[T]) detachID(closureID uint64) {
e.beforeHooks.Delete(closureID)
e.hooks.Delete(closureID)
e.afterHooks.Delete(closureID)
e.asyncHandlers.Delete(closureID)
}