/
def.go
91 lines (75 loc) · 2.21 KB
/
def.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package saga
import (
"reflect"
"sync"
"github.com/wework/grabbit/gbus/metrics"
"github.com/wework/grabbit/gbus"
)
var _ gbus.HandlerRegister = &Def{}
//MsgToFuncPair helper struct
type MsgToFuncPair struct {
Filter *gbus.MessageFilter
SagaFuncName string
}
//Def defines a saga type
type Def struct {
glue *Glue
sagaType reflect.Type
startedBy []string
lock *sync.Mutex
sagaConfFns []gbus.SagaConfFn
msgToFunc []*MsgToFuncPair
}
//HandleMessage implements HandlerRegister interface
func (sd *Def) HandleMessage(message gbus.Message, handler gbus.MessageHandler) error {
sd.addMsgToHandlerMapping("", sd.glue.svcName, message, handler)
return sd.glue.registerMessage(message)
}
//HandleEvent implements HandlerRegister interface
func (sd *Def) HandleEvent(exchange, topic string, event gbus.Message, handler gbus.MessageHandler) error {
sd.addMsgToHandlerMapping(exchange, topic, event, handler)
return sd.glue.registerEvent(exchange, topic, event)
}
func (sd *Def) getHandledMessages() []string {
messages := make([]string, 0)
for _, pair := range sd.msgToFunc {
if pair.Filter.MsgName != "" {
messages = append(messages, pair.Filter.MsgName)
}
}
return messages
}
func (sd *Def) addMsgToHandlerMapping(exchange, routingKey string, message gbus.Message, handler gbus.MessageHandler) {
handlerName := handler.Name()
metrics.AddHandlerMetrics(handlerName)
msgToFunc := &MsgToFuncPair{
Filter: gbus.NewMessageFilter(exchange, routingKey, message),
SagaFuncName: handlerName}
sd.msgToFunc = append(sd.msgToFunc, msgToFunc)
}
func (sd *Def) newInstance() *Instance {
instance := NewInstance(sd.sagaType, sd.msgToFunc)
return sd.configureSaga(instance)
}
func (sd *Def) configureSaga(instance *Instance) *Instance {
saga := instance.UnderlyingInstance
for _, conf := range sd.sagaConfFns {
saga = conf(saga)
}
return instance
}
func (sd *Def) shouldStartNewSaga(message *gbus.BusMessage) bool {
fqn := message.PayloadFQN
for _, starts := range sd.startedBy {
if fqn == starts {
return true
}
}
return false
}
// func (sd *Def) canTimeout() bool {
// sd.sagaType.Implements(u reflect.Type)
// }
func (sd *Def) String() string {
return gbus.GetTypeFQN(sd.sagaType)
}