/
manager.go
156 lines (133 loc) · 4.12 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package notification
import (
"context"
"fmt"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/target/goalert/util/log"
"go.opencensus.io/trace"
)
// Manager is used as an intermediary between Senders and Receivers.
// It should be contstructed first (with NewManager()) and passed to
// Senders and Receivers that require it.
type Manager struct {
providers map[string]*namedSender
searchOrder []*namedSender
Receiver
mx *sync.RWMutex
stubNotifiers bool
}
var _ Sender = &Manager{}
// NewManager initializes a new Manager.
func NewManager() *Manager {
return &Manager{
mx: new(sync.RWMutex),
providers: make(map[string]*namedSender),
}
}
// SetStubNotifiers will cause all notifications senders to be stubbed out.
//
// This causes all notifications to be marked as delivered, but not actually sent.
func (mgr *Manager) SetStubNotifiers() {
mgr.stubNotifiers = true
}
// Status will return the current status of a message.
func (mgr *Manager) Status(ctx context.Context, messageID, providerMsgID string) (*MessageStatus, error) {
parts := strings.SplitN(providerMsgID, ":", 2)
if len(parts) != 2 {
return nil, errors.Errorf("invalid provider message ID '%s'", providerMsgID)
}
provider := mgr.providers[parts[0]]
if provider == nil {
return nil, errors.Errorf("unknown provider ID '%s'", parts[0])
}
checker, ok := provider.Sender.(StatusChecker)
if !ok {
return nil, ErrStatusUnsupported
}
ctx, sp := trace.StartSpan(ctx, "NotificationManager.Status")
sp.AddAttributes(
trace.StringAttribute("provider.id", parts[0]),
trace.StringAttribute("provider.message.id", parts[1]),
)
defer sp.End()
stat, err := checker.Status(ctx, messageID, parts[1])
if stat != nil {
stat = stat.wrap(ctx, provider)
}
return stat, err
}
// RegisterSender will register a sender under a given DestType and name.
// A sender for the same name and type will replace an existing one, if any.
func (mgr *Manager) RegisterSender(t DestType, name string, s Sender) {
mgr.mx.Lock()
defer mgr.mx.Unlock()
_, ok := mgr.providers[name]
if ok {
panic("name already taken")
}
if mgr.stubNotifiers {
// disable notification sending
s = stubSender{}
}
n := &namedSender{name: name, Sender: s, destType: t}
mgr.providers[name] = n
mgr.searchOrder = append(mgr.searchOrder, n)
if rs, ok := s.(ReceiverSetter); ok {
rs.SetReceiver(&namedReceiver{ns: n, Receiver: mgr})
}
}
// RegisterReceiver will set the given Receiver as the target for all Receive() calls.
// It will panic if called multiple times.
func (mgr *Manager) RegisterReceiver(r Receiver) {
if mgr.Receiver != nil {
panic("tried to register a second Receiver")
}
mgr.Receiver = r
}
// Send implements the Sender interface by trying all registered senders for the type given
// in Notification. An error is returned if there are no registered senders for the type
// or if an error is returned from all of them.
func (mgr *Manager) Send(ctx context.Context, msg Message) (*MessageStatus, error) {
mgr.mx.RLock()
defer mgr.mx.RUnlock()
destType := msg.Destination().Type
ctx = log.WithFields(ctx, log.Fields{
"ProviderType": destType,
"CallbackID": msg.ID(),
})
if a, ok := msg.(Alert); ok {
ctx = log.WithField(ctx, "AlertID", a.AlertID)
}
var tried bool
for _, s := range mgr.searchOrder {
if s.destType != destType {
continue
}
tried = true
sendCtx := log.WithField(ctx, "ProviderName", s.name)
sendCtx, sp := trace.StartSpan(sendCtx, "NotificationManager.Send")
sp.AddAttributes(
trace.StringAttribute("provider.id", s.name),
trace.StringAttribute("message.type", msg.Type().String()),
trace.StringAttribute("message.id", msg.ID()),
)
status, err := s.Send(sendCtx, msg)
sp.End()
if err != nil {
log.Log(sendCtx, errors.Wrap(err, "send notification"))
continue
}
log.Logf(sendCtx, "notification sent")
metricSentTotal.
WithLabelValues(msg.Destination().Type.String(), msg.Type().String()).
Inc()
// status already wrapped via namedSender
return status, nil
}
if !tried {
return nil, fmt.Errorf("no senders registered for type '%s'", destType)
}
return nil, errors.New("all notification senders failed")
}