/
serviceEmailManager.go
201 lines (178 loc) · 6.81 KB
/
serviceEmailManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package notifications
import (
"context"
"strconv"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/shreyb/managed-tokens/internal/db"
"github.com/shreyb/managed-tokens/internal/metrics"
)
// Metrics
var (
serviceErrorNotificationAttemptTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "managed_tokens",
Name: "service_error_email_last_sent_timestamp",
Help: "Last time managed tokens service attempted to send an service error notification",
},
[]string{
"service",
"success",
},
)
serviceErrorNotificationSendDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "managed_tokens",
Name: "service_error_email_send_duration_seconds",
Help: "Time in seconds it took to successfully send an service error email",
})
)
func init() {
metrics.MetricsRegistry.MustRegister(serviceErrorNotificationAttemptTimestamp)
metrics.MetricsRegistry.MustRegister(serviceErrorNotificationSendDuration)
}
// ServiceEmailManager contains all the information needed to receive Notifications for services and ensure they get sent in the
// correct email
type ServiceEmailManager struct {
ReceiveChan chan Notification
Service string
Email *email
Database *db.ManagedTokensDatabase
NotificationMinimum int
wg *sync.WaitGroup
}
type ServiceEmailManagerOption func(*ServiceEmailManager) error
// NewServiceEmailManager returns an EmailManager channel for callers to send Notifications on. It will collect messages and sort them according
// to the underlying type of the Notification, and when EmailManager is closed, will send emails. Set up the ManagedTokensDatabase and
// the NotificationMinimum via EmailManagerOptions passed in. If a ManagedTokensDatabase is not passed in via an EmailManagerOption,
// then the EmailManager will send all notifications
func NewServiceEmailManager(ctx context.Context, wg *sync.WaitGroup, service string, e *email, opts ...ServiceEmailManagerOption) *ServiceEmailManager {
funcLogger := log.WithFields(log.Fields{
"caller": "notifications.NewServiceEmailManager",
"service": service,
})
em := &ServiceEmailManager{
Service: service,
ReceiveChan: make(chan Notification),
wg: wg,
}
for _, opt := range opts {
if err := opt(em); err != nil {
funcLogger.Errorf("Error running functional option")
}
}
shouldTrackErrorCounts := true
ec, err := setErrorCountsByService(ctx, em.Service, em.Database) // Get our previous error information for this service
if err != nil {
funcLogger.Error("Error setting error counts. Will not track errors.")
shouldTrackErrorCounts = false
}
adminChan := make(chan Notification)
startAdminErrorAdder(adminChan)
runServiceNotificationHandler(ctx, em, adminChan, ec, shouldTrackErrorCounts)
return em
}
// runServiceNotificationHandler concurrently handles the routing and counting of errors that result from a Notification being sent
// on the ServiceEmailManager's ReceiveChan.
func runServiceNotificationHandler(ctx context.Context, em *ServiceEmailManager, adminChan chan<- Notification, ec *serviceErrorCounts, shouldTrackErrorCounts bool) {
funcLogger := log.WithFields(log.Fields{
"caller": "notifications.runServiceNotificationHandler",
"service": em.Service,
})
// Start listening for new notifications
go func() {
serviceErrorsTable := make(map[string]string, 0)
defer em.wg.Done()
defer close(adminChan)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
funcLogger.Error("Timeout exceeded in notification Manager")
} else {
funcLogger.Error(err)
}
return
case n, chanOpen := <-em.ReceiveChan:
// Channel is closed --> save errors to database and send notifications
if !chanOpen {
if shouldTrackErrorCounts {
if err := saveErrorCountsInDatabase(ctx, em.Service, em.Database, ec); err != nil {
funcLogger.Error("Error saving new error counts in database. Please investigate")
}
}
sendServiceEmailIfErrors(ctx, serviceErrorsTable, em)
return
}
// Channel is open: direct the message as needed
funcLogger.WithField("message", n.GetMessage()).Debug("Received notification message")
shouldSend := true
if shouldTrackErrorCounts {
shouldSend = adjustErrorCountsByServiceAndDirectNotification(n, ec, em.NotificationMinimum)
if !shouldSend {
log.WithField("service", n.GetService()).Debug("Error count less than error limit. Not sending notification")
continue
}
}
if shouldSend {
addPushErrorNotificationToServiceErrorsTable(n, serviceErrorsTable)
adminChan <- n
}
}
}
}()
}
func addPushErrorNotificationToServiceErrorsTable(n Notification, serviceErrorsTable map[string]string) {
// Note that we ONLY send push errors to the stakeholders. Only admins will get all Notifications.
funcLogger := log.WithFields(log.Fields{
"caller": "notifications.addPushErrorNotificationToServiceErrorsTable",
"service": n.GetService(),
})
msg := "Error counts either not tracked or exceeded error limit. Sending notification"
if nValue, ok := n.(*pushError); ok {
serviceErrorsTable[nValue.node] = n.GetMessage()
funcLogger.WithField("node", nValue.node).Debug(msg)
return
}
funcLogger.Debug(msg)
}
func sendServiceEmailIfErrors(ctx context.Context, serviceErrorsTable map[string]string, em *ServiceEmailManager) {
funcLogger := log.WithFields(log.Fields{
"caller": "notifications.sendServiceEmailIfErrors",
"service": em.Service,
})
if len(serviceErrorsTable) == 0 {
funcLogger.Debug("No errors to send for service")
return
}
tableString := aggregateServicePushErrors(serviceErrorsTable)
msg, err := prepareServiceEmail(ctx, tableString, em.Email)
if err != nil {
funcLogger.Error("Error preparing service email for sending")
}
start := time.Now()
var success bool
err = SendMessage(ctx, em.Email, msg)
dur := time.Since(start).Seconds()
if err != nil {
funcLogger.Error("Error sending email")
} else {
serviceErrorNotificationSendDuration.Observe(dur)
success = true
}
serviceErrorNotificationAttemptTimestamp.WithLabelValues(em.Service, strconv.FormatBool(success)).SetToCurrentTime()
}
// prepareServiceEmail sets a passed-in email object's templateStruct field to the passed in errorTable, and returns a string that contains
// email text according to the passed in errorTable and the email object's templatePath
func prepareServiceEmail(ctx context.Context, errorTable string, e *email) (string, error) {
timestamp := time.Now().Format(time.RFC822)
templateStruct := struct {
Timestamp string
ErrorTable string
}{
Timestamp: timestamp,
ErrorTable: errorTable,
}
return prepareMessageFromTemplate(strings.NewReader(serviceErrorsTemplate), templateStruct)
}