-
Notifications
You must be signed in to change notification settings - Fork 9
/
aggregationManager.go
216 lines (185 loc) · 6.94 KB
/
aggregationManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package main
import (
"sync"
"time"
gometrics "github.com/rcrowley/go-metrics"
uuid "github.com/satori/go.uuid"
)
var (
gaugeNumberOfPlans = gometrics.NewGauge()
gaugeAggregatedPlansTotal = gometrics.NewGauge()
)
type PlanAggregation struct {
planName string
maxIncidentCount int
collectionWindow time.Duration
aggregationWindow time.Duration
resetWindow time.Duration
aggregationStatus bool
incidentCount int
collectionTime time.Time
aggregationTime time.Time
batchID uuid.UUID
}
type AggregationManager struct {
Quit chan int // send signal to shutdown
config *Config
logger *Logger
mainWg *sync.WaitGroup
irisClient *IrisClient
planAggregationMap map[string]PlanAggregation
aggregationMutex sync.RWMutex
}
func (a *AggregationManager) RegisterMetrics(reg gometrics.Registry) {
reg.Register("aggregationManager.internal.plan_aggregation_settings_total", gaugeNumberOfPlans)
reg.Register("aggregationManager.internal.plans_under_aggregation_total", gaugeAggregatedPlansTotal)
}
// NewAggregationManager create new AggregationManager
func NewAggregationManager(irisClient *IrisClient, config *Config, logger *Logger, quit chan int, wg *sync.WaitGroup, metricsRegistry *gometrics.Registry) *AggregationManager {
aggregationManager := AggregationManager{
Quit: quit, // send signal to shutdown
config: config,
irisClient: irisClient,
logger: logger,
mainWg: wg,
planAggregationMap: make(map[string]PlanAggregation),
}
if appConfig.MetricsDryRun == false && metricsRegistry != nil {
aggregationManager.RegisterMetrics(*metricsRegistry)
}
return &aggregationManager
}
// check if messages for this plan are under aggregation
func (a *AggregationManager) checkAggregation(plan string) (uuid.UUID, bool) {
a.aggregationMutex.RLock()
defer a.aggregationMutex.RUnlock()
if val, ok := a.planAggregationMap[plan]; ok {
return val.batchID, val.aggregationStatus
}
// plan's aggregation settings have not been initialized for this plan yet, do not aggregate
return uuid.UUID{}, false
}
// count incidents for a plan and check if it should trigger aggregation
func (a *AggregationManager) countIncident(planName string) (uuid.UUID, bool) {
a.aggregationMutex.Lock()
defer a.aggregationMutex.Unlock()
if planAgg, ok := a.planAggregationMap[planName]; ok {
planAgg.incidentCount++
if !planAgg.aggregationStatus {
if planAgg.incidentCount == 1 {
// start new collection window if this is the first incident
planAgg.collectionTime = time.Now()
} else {
if planAgg.incidentCount > planAgg.maxIncidentCount {
// start aggregation window and create a new batch ID for this aggregation
planAgg.aggregationStatus = true
planAgg.aggregationTime = time.Now()
batchID := uuid.NewV4()
planAgg.batchID = batchID
a.planAggregationMap[planName] = planAgg
a.logger.Infof("Started aggregation for plan %s", planAgg.planName)
// signal caller that new aggregation has been triggered
return batchID, true
}
}
}
// did not trigger a new aggregation
a.planAggregationMap[planName] = planAgg
return planAgg.batchID, false
}
// plan does not exist in the map, do not aggregate
return uuid.UUID{}, false
}
// check if the correct amount of time has passed to reset timers and counters for each plan
func (a *AggregationManager) resetAggregation() {
a.aggregationMutex.Lock()
defer a.aggregationMutex.Unlock()
var aggregatedPlansTotal int64
for _, planAgg := range a.planAggregationMap {
if planAgg.aggregationStatus {
aggregatedPlansTotal++
// if aggregationWindow time has elapsed since we started aggregating stop aggregation
// we don't want to aggregate forever so stop aggregation even if messages are still actively coming in
// TODO: change iris-api UI to reflect this aggregation behavior change.
if time.Since(planAgg.aggregationTime) > planAgg.aggregationWindow {
planAgg.incidentCount = 0
planAgg.aggregationStatus = false
a.planAggregationMap[planAgg.planName] = planAgg
a.logger.Infof("Stopped aggregation for plan %s", planAgg.planName)
}
} else {
if planAgg.incidentCount > 0 {
// if the collectionwindow has elapsed without triggering aggregation reset it and the count
if time.Since(planAgg.collectionTime) > planAgg.collectionWindow {
planAgg.incidentCount = 0
a.planAggregationMap[planAgg.planName] = planAgg
}
}
}
}
gaugeAggregatedPlansTotal.Update(aggregatedPlansTotal)
}
// check if the aggregations settings have changed for each plan and update them if necessary
func (a *AggregationManager) updateSettings(aggSettingResp []PlanAggregationResp) {
a.aggregationMutex.Lock()
defer a.aggregationMutex.Unlock()
gaugeNumberOfPlans.Update(int64(len(aggSettingResp)))
activePlans := make(map[string]bool)
for _, resp := range aggSettingResp {
planName := resp.PlanName
activePlans[planName] = true
// convert int number of seconds to durations
planAgg := PlanAggregation{
planName: resp.PlanName,
maxIncidentCount: resp.ThresholdCount,
collectionWindow: time.Duration(resp.ThresholdWindow) * time.Second,
aggregationWindow: time.Duration(resp.AggregationWindow) * time.Second,
resetWindow: time.Duration(resp.AggregationReset) * time.Second,
aggregationStatus: false,
incidentCount: 0,
collectionTime: time.Now(),
}
if val, ok := a.planAggregationMap[planName]; !ok {
// plan's aggregation settings had not been initialized yet
a.planAggregationMap[planName] = planAgg
} else {
if planAgg.maxIncidentCount != val.maxIncidentCount || planAgg.collectionWindow != val.collectionWindow || planAgg.aggregationWindow != val.aggregationWindow || planAgg.resetWindow != val.resetWindow {
// some setting changed, update
a.planAggregationMap[planName] = planAgg
a.logger.Infof("Updated aggregation settings for plan %s", planName)
}
}
}
// clean up plans that no longer exist
for planName := range a.planAggregationMap {
if _, ok := activePlans[planName]; !ok {
//delete non existent plan
delete(a.planAggregationMap, planName)
}
}
}
// periodically fetch plan aggregation settings from Iris and manage existing aggregations
func (a *AggregationManager) Run() {
defer a.mainWg.Done()
interval := time.NewTicker(a.config.RunLoopDuration * time.Second)
defer interval.Stop()
for ; true; <-interval.C {
// get aggregation settings from iris-api and update our aggregation map
aggSettingsResp, err := a.irisClient.GetPlanAggregationSettings()
if err != nil {
a.logger.Errorf("Failed to fetch aggregation settings, skipping aggregation settings update...")
} else {
a.updateSettings(aggSettingsResp)
}
// check if any aggregations need to be reset
a.resetAggregation()
a.logger.Infof("Updated aggregation information...")
select {
case <-a.Quit:
a.logger.Infof("stopped aggregationManager...")
return
default:
continue
}
}
}