-
Notifications
You must be signed in to change notification settings - Fork 12
/
events.go
123 lines (101 loc) · 2.71 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package actor
import (
"reflect"
"sync"
"github.com/wwj31/dogactor/actor/actorerr"
"github.com/wwj31/dogactor/log"
)
type Handler interface{}
type fnMeta struct {
CallValue reflect.Value
ArgType reflect.Type
}
type listener map[string]map[Id]fnMeta // map[evType][actorId]Handler
type evDispatcher struct {
sync.RWMutex
sys *System
listeners listener
}
func newEvent(s *System) evDispatcher {
return evDispatcher{listeners: make(listener), sys: s}
}
func (ed *evDispatcher) OnEvent(actorId Id, callback Handler) {
argType, n := argInfo(callback)
if n != 1 {
log.SysLog.Errorw("OnEvent Handler param len !=1",
"n", n, "actorId", actorId)
return
}
if argType.Kind() == reflect.Ptr {
log.SysLog.Errorw("OnEvent Handler param must be a value instead of the point",
"kind", argType.Kind(), "n", n, "actorId", actorId)
return
}
callValue := reflect.ValueOf(callback)
ed.Lock()
defer ed.Unlock()
typ := argType.String()
if ed.listeners[typ] == nil {
ed.listeners[typ] = make(map[string]fnMeta)
}
ed.listeners[typ][actorId] = fnMeta{
CallValue: callValue,
ArgType: argType,
}
}
func (ed *evDispatcher) CancelEvent(actorId Id, events ...interface{}) {
for _, event := range events {
rtype := reflect.TypeOf(event)
if rtype.Kind() == reflect.Ptr {
log.SysLog.Errorw("CancelEvent failed", "err", actorerr.CancelEventErr, "actorId", actorId, "event", event)
return
}
}
ed.Lock()
defer ed.Unlock()
for _, event := range events {
rType := reflect.TypeOf(event).String()
delete(ed.listeners[rType], actorId)
}
}
func (ed *evDispatcher) CancelAll(actorId Id) {
ed.Lock()
defer ed.Unlock()
for _, actors := range ed.listeners {
delete(actors, actorId)
}
}
func (ed *evDispatcher) DispatchEvent(sourceId Id, event interface{}) {
rType := reflect.TypeOf(event)
ed.RLock()
defer ed.RUnlock()
if listeners := ed.listeners[rType.String()]; listeners != nil {
for k, v := range listeners {
actorId := k
fnMeta := v
err := ed.sys.Send(sourceId, actorId, "", func() {
if fnMeta.ArgType.Kind() != rType.Kind() {
log.SysLog.Errorw("param type kind not equal", "fnMeta.ArgType", fnMeta.ArgType.Kind(), "rType", rType.Kind())
return
}
fnMeta.CallValue.Call([]reflect.Value{reflect.ValueOf(event)})
})
if err != nil {
log.SysLog.Errorw("DispatchEvent send to actor", "actorId", actorId, "err", err)
}
}
}
}
// Dissect the cb Handler's signature
func argInfo(cb Handler) (reflect.Type, int) {
cbType := reflect.TypeOf(cb)
if cbType.Kind() != reflect.Func {
log.SysLog.Errorw("nats: Handler needs to be a func")
return nil, 0
}
numArgs := cbType.NumIn()
if numArgs == 0 {
return nil, numArgs
}
return cbType.In(numArgs - 1), numArgs
}