/
invocation.go
117 lines (94 loc) · 3.48 KB
/
invocation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package saga
import (
"context"
"database/sql"
"time"
"github.com/wework/grabbit/gbus"
)
var _ gbus.Invocation = &sagaInvocation{}
var _ gbus.SagaInvocation = &sagaInvocation{}
type sagaInvocation struct {
*gbus.Glogged
decoratedBus gbus.Messaging
decoratedInvocation gbus.Invocation
inboundMsg *gbus.BusMessage
sagaID string
ctx context.Context
//the service that is executing the saga instance
hostingSvc string
//the service that sent the command/event that triggered the creation of the saga
startedBy string
/*
in case the command/event that triggered the creation of the saga was sent from a saga
then this field will hold the saga id of that instance
*/
startedBySaga string
/* the message-id of the message that created the saga */
startedByMessageID string
/* the rpc id of the message that created the saga */
startedByRPCID string
}
func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, targetService string, semantics gbus.Semantics) {
message.SagaID = si.sagaID
if semantics == gbus.REPLY {
message.CorrelationID = si.inboundMsg.ID
message.SagaCorrelationID = si.inboundMsg.SagaID
} else if semantics == gbus.CMD {
//if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
//https://github.com/wework/grabbit/issues/64
if targetService == si.hostingSvc {
message.SagaCorrelationID = message.SagaID
}
}
}
func (si *sagaInvocation) HostingSvc() string {
return si.hostingSvc
}
func (si *sagaInvocation) InvokingSvc() string {
return si.decoratedInvocation.InvokingSvc()
}
func (si *sagaInvocation) Reply(ctx context.Context, message *gbus.BusMessage) error {
_, targetService := si.decoratedInvocation.Routing()
si.setCorrelationIDs(message, targetService, gbus.REPLY)
return si.decoratedInvocation.Reply(ctx, message)
}
func (si *sagaInvocation) ReplyToInitiator(ctx context.Context, message *gbus.BusMessage) error {
si.setCorrelationIDs(message, si.startedBy, gbus.REPLY)
//override the correlation ids to those of the message creating the saga
message.SagaCorrelationID = si.startedBySaga
message.RPCID = si.startedByRPCID
message.CorrelationID = si.startedByMessageID
return si.decoratedInvocation.Bus().Send(ctx, si.startedBy, message)
}
func (si *sagaInvocation) Bus() gbus.Messaging {
return si
}
func (si *sagaInvocation) Tx() *sql.Tx {
return si.decoratedInvocation.Tx()
}
func (si *sagaInvocation) Ctx() context.Context {
return si.ctx
}
func (si *sagaInvocation) Send(ctx context.Context, toService string,
command *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
si.setCorrelationIDs(command, toService, gbus.CMD)
return si.decoratedBus.Send(ctx, toService, command, policies...)
}
func (si *sagaInvocation) Publish(ctx context.Context, exchange, topic string,
event *gbus.BusMessage, policies ...gbus.MessagePolicy) error {
si.setCorrelationIDs(event, "", gbus.EVT)
return si.decoratedBus.Publish(ctx, exchange, topic, event, policies...)
}
func (si *sagaInvocation) RPC(ctx context.Context, service string, request,
reply *gbus.BusMessage, timeout time.Duration) (*gbus.BusMessage, error) {
return si.decoratedBus.RPC(ctx, service, request, reply, timeout)
}
func (si *sagaInvocation) Routing() (exchange, routingKey string) {
return si.decoratedInvocation.Routing()
}
func (si *sagaInvocation) DeliveryInfo() gbus.DeliveryInfo {
return si.decoratedInvocation.DeliveryInfo()
}
func (si *sagaInvocation) SagaID() string {
return si.sagaID
}