/
products.go
266 lines (246 loc) · 8.92 KB
/
products.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
// Copyright 2018 MSolution.IO
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package anomalies
import (
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"sort"
"github.com/trackit/jsonlog"
"github.com/trackit/trackit-server/aws"
"github.com/trackit/trackit-server/aws/usageReports"
"github.com/trackit/trackit-server/config"
"github.com/trackit/trackit-server/es"
)
type (
// esProductAnomalyCost contains the cost data
esProductAnomalyCost struct {
Value float64 `json:"value"`
MaxExpected float64 `json:"maxExpected"`
}
// esProductAnomaly is used to ingest in ElasticSearch.
esProductAnomaly struct {
Account string `json:"account"`
Date string `json:"date"`
Product string `json:"product"`
Abnormal bool `json:"abnormal"`
Recurrent bool `json:"recurrent"`
Cost esProductAnomalyCost `json:"cost"`
}
// esProductAnomalies is used to get anomalies from ElasticSearch.
esProductAnomalies []esProductAnomaly
// esProductDatesBucket is used to store the raw ElasticSearch response.
esProductDatesBucket struct {
Key string `json:"key_as_string"`
Cost struct {
Value float64 `json:"value"`
} `json:"cost"`
}
// esProductTypedResult is used to store the raw ElasticSearch response.
esProductTypedResult struct {
Products struct {
Buckets []struct {
Key string `json:"key"`
Dates struct {
Buckets []esProductDatesBucket `json:"buckets"`
} `json:"dates"`
} `json:"buckets"`
}
}
// costWithProduct is used when a cost has to be wrapped by a product name.
costWithProduct struct {
product string
cost float64
}
// totalCostByDay is a named type for total cost for each day.
totalCostByDay map[string]float64
// highestSpendersByDay contains the more costly product podium for each day.
highestSpendersByDay map[string][]string
)
// runAnomaliesDetectionForProducts will get data from ElasticSearch,
// compute anomalies and ingest the result in ElasticSearch.
func runAnomaliesDetectionForProducts(parsedParams AnomalyEsQueryParams, account aws.AwsAccount, ctx context.Context) (err error) {
var res AnalyzedCosts
if res, err = productGetAnomaliesData(ctx, parsedParams); err != nil {
} else if err = productSaveAnomaliesData(ctx, res, account); err != nil {
} else if err = removeRecurrence(ctx, parsedParams, account); err != nil {
}
return
}
// productSaveAnomaliesData will save anomalies in ElasticSearch.
// If the index doesn't exist, it will be created.
// Anomalies are unique and will replace the existing ones if
// they changed (cost or upper band).
func productSaveAnomaliesData(ctx context.Context, aCosts AnalyzedCosts, account aws.AwsAccount) error {
logger := jsonlog.LoggerFromContextOrDefault(ctx)
logger.Info("Updating anomalies for AWS account.", map[string]interface{}{
"awsAccount": account,
})
index := es.IndexNameForUserId(account.UserId, IndexPrefixAnomaliesDetection)
bp, err := utils.GetBulkProcessor(ctx)
if err != nil {
logger.Error("Failed to get bulk processor.", err.Error())
return err
}
for _, aCost := range aCosts {
doc := esProductAnomaly{
Account: account.AwsIdentity,
Date: aCost.Meta.Date,
Product: aCost.Meta.AdditionalMeta.(AnalyzedCostProductMeta).Product,
Abnormal: aCost.Anomaly,
Recurrent: false,
Cost: esProductAnomalyCost{
Value: aCost.Cost,
MaxExpected: aCost.UpperBand,
},
}
id, err := productGenerateElasticSearchDocumentId(doc)
if err != nil {
logger.Error("Error when marshaling anomalies var", err.Error())
return err
}
bp = addDocToBulkProcessor(bp, doc, TypeProductAnomaliesDetection, index, id)
}
bp.Flush()
err = bp.Close()
if err != nil {
logger.Error("Failed when putting anomalies in ES", err.Error())
return err
}
logger.Info("Anomalies put in ES", nil)
return nil
}
// productGenerateElasticSearchDocumentId is used to generate the document id ingested in ElasticSearch.
// The document id is not dependent on cost or upper band: if one of them change,
// it will update the document in ElasticSearch instead of recreating one.
func productGenerateElasticSearchDocumentId(doc esProductAnomaly) (id string, err error) {
var ji []byte
ji, err = json.Marshal(struct {
Account string `json:"account"`
Date string `json:"date"`
Product string `json:"product"`
}{
doc.Account,
doc.Date,
doc.Product,
})
if err != nil {
return
}
hash := md5.Sum(ji)
id = base64.URLEncoding.EncodeToString(hash[:])
return
}
// productClearDisturbances clears fake alerts with thresholds in config.
func productClearDisturbances(aCosts AnalyzedCosts, totalCostByDay totalCostByDay, highestSpendersByDay highestSpendersByDay) AnalyzedCosts {
for index, aCost := range aCosts {
if aCost.Anomaly {
date := aCost.Meta.Date
increaseAmount := aCost.Cost - aCost.UpperBand
if increaseAmount < totalCostByDay[date]*config.AnomalyDetectionDisturbanceCleaningMinPercentOfDailyBill/100 ||
aCost.Cost < config.AnomalyDetectionDisturbanceCleaningMinAbsoluteCost {
aCosts[index].Anomaly = false
} else {
spenderInPodium := false
for _, spender := range highestSpendersByDay[date] {
if spender == aCost.Meta.AdditionalMeta.(AnalyzedCostProductMeta).Product {
spenderInPodium = true
break
}
}
aCosts[index].Anomaly = spenderInPodium
}
}
}
return aCosts
}
// productAddCostToCosts is a tool used by productGetHighestSpendersByDay.
func productAddCostToCosts(product string, cost float64, costs []costWithProduct) []costWithProduct {
for idx := range costs {
if costs[idx].product == product {
costs[idx].cost += cost
return costs
}
}
return append(costs, costWithProduct{product, cost})
}
// productGetHighestSpendersByDay gets a podium of the highest spenders.
func productGetHighestSpendersByDay(typedDocument esProductTypedResult) highestSpendersByDay {
costByDayByProduct := map[string][]costWithProduct{}
for _, product := range typedDocument.Products.Buckets {
for _, date := range product.Dates.Buckets {
costByDayByProduct[date.Key] = productAddCostToCosts(product.Key, date.Cost.Value, costByDayByProduct[date.Key])
}
}
highestSpendersByDay := make(highestSpendersByDay)
for day, products := range costByDayByProduct {
sort.Slice(products, func(i, j int) bool {
return products[i].cost > products[j].cost
})
for i := 0; i < config.AnomalyDetectionDisturbanceCleaningHighestSpendingMinRank && i < len(products); i++ {
highestSpendersByDay[day] = append(highestSpendersByDay[day], products[i].product)
}
}
return highestSpendersByDay
}
// productGetTotalCostByDay gets the total cost for each day.
func productGetTotalCostByDay(typedDocument esProductTypedResult) totalCostByDay {
totalCostByDay := totalCostByDay{}
for _, product := range typedDocument.Products.Buckets {
for _, date := range product.Dates.Buckets {
totalCostByDay[date.Key] += date.Cost.Value
}
}
return totalCostByDay
}
// productGetAnomaliesData returns product anomalies based on query params, in JSON format.
func productGetAnomaliesData(ctx context.Context, params AnomalyEsQueryParams) (AnalyzedCosts, error) {
logger := jsonlog.LoggerFromContextOrDefault(ctx)
sr, err := makeElasticSearchRequest(ctx, getProductElasticSearchParams, params)
if err != nil {
return nil, err
}
var typedDocument esProductTypedResult
if err := json.Unmarshal(*sr.Aggregations["products"], &typedDocument.Products); err != nil {
logger.Error("Failed to parse elasticsearch document.", err.Error())
return nil, err
}
totalAnalyzedCosts := make(AnalyzedCosts, 0)
totalCostsByDay := productGetTotalCostByDay(typedDocument)
highestSpendersByDay := productGetHighestSpendersByDay(typedDocument)
for _, product := range typedDocument.Products.Buckets {
aCosts := make(AnalyzedCosts, 0, len(product.Dates.Buckets))
for _, date := range product.Dates.Buckets {
aCosts = append(aCosts, AnalyzedCost{
Meta: AnalyzedCostEssentialMeta{
AdditionalMeta: AnalyzedCostProductMeta{
Product: product.Key,
},
Date: date.Key,
},
Cost: date.Cost.Value,
Anomaly: false,
})
}
aCosts = computeAnomalies(ctx, aCosts, params.DateBegin)
aCosts = deleteOffset(aCosts, params.DateBegin)
totalAnalyzedCosts = append(totalAnalyzedCosts, aCosts...)
}
totalAnalyzedCosts = productClearDisturbances(totalAnalyzedCosts, totalCostsByDay, highestSpendersByDay)
if err != nil {
return nil, err
}
return totalAnalyzedCosts, nil
}