/
products_recurrence.go
141 lines (127 loc) · 4.84 KB
/
products_recurrence.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package anomalies
import (
"context"
"encoding/json"
"time"
"github.com/trackit/jsonlog"
"github.com/trackit/trackit-server/aws"
"github.com/trackit/trackit-server/aws/usageReports"
"github.com/trackit/trackit-server/config"
"github.com/trackit/trackit-server/es"
)
type (
// esProductRecurrentAnomaly is a partial document for ElasticSearch.
esProductRecurrentAnomaly struct {
Recurrent bool `json:"recurrent"`
}
// esProductAnomalyWithId is used to get anomalies from ElasticSearch.
esProductAnomalyWithId struct {
Source esProductAnomaly `json:"source"`
Id string `json:"id"`
}
// esProductAnomaliesWithId is used to get anomalies from ElasticSearch.
esProductAnomaliesWithId []esProductAnomalyWithId
// anomaliesByDate is used to get an anomaly with its date more easily.
anomaliesByDate map[time.Time]esProductAnomalyWithId
// anomaliesByProduct is used to get an anomaly with its product more easily.
anomaliesByProduct map[string]anomaliesByDate
)
// removeRecurrence gets all anomalies from ElasticSearch and removes recurrent anomalies.
func removeRecurrence(ctx context.Context, params AnomalyEsQueryParams, account aws.AwsAccount) error {
logger := jsonlog.LoggerFromContextOrDefault(ctx)
logger.Info("Removing recurrent anomalies", nil)
params.Index = es.IndexNameForUserId(account.UserId, IndexPrefixAnomaliesDetection)
if raw, err := getAnomaliesFromEs(ctx, params); err != nil {
return err
} else {
res := transformAnomaliesToMap(raw)
var recurrentAnomalies esProductAnomaliesWithId
for product := range res {
recurrentAnomalies = append(recurrentAnomalies, detectRecurrence(res[product])...)
}
err := applyRecurrentAnomaliesToEs(ctx, account, recurrentAnomalies)
return err
}
}
// applyRecurrentAnomaliesToEs will save in ElasticSearch all recurrent anomalies
// by setting recurrent field to true.
func applyRecurrentAnomaliesToEs(ctx context.Context, account aws.AwsAccount, recurrentAnomalies esProductAnomaliesWithId) error {
logger := jsonlog.LoggerFromContextOrDefault(ctx)
logger.Info("Updating recurrent anomalies.", map[string]interface{}{
"awsAccount": account,
})
index := es.IndexNameForUserId(account.UserId, IndexPrefixAnomaliesDetection)
bp, err := utils.GetBulkProcessor(ctx)
if err != nil {
logger.Error("Failed to get bulk processor.", err.Error())
return err
}
for _, recurrentAnomaly := range recurrentAnomalies {
recurrentAnomaly.Source.Recurrent = true
if err != nil {
logger.Error("Error when marshaling recurrent anomalies var", err.Error())
return err
}
bp = addDocToBulkProcessor(bp, recurrentAnomaly.Source, TypeProductAnomaliesDetection, index, recurrentAnomaly.Id)
}
bp.Flush()
err = bp.Close()
if err != nil {
logger.Error("Failed when putting recurrent anomalies in ES", err.Error())
return err
}
logger.Info("Recurrent anomalies put in ES", nil)
return nil
}
// approximateCostComparison compares two float64 with
// an approximation of config.AnomalyDetectionRecurrenceCleaningThreshold.
// For example +/- 10% if it is set to 0.1.
func approximateCostComparison(a, b float64) bool {
t := config.AnomalyDetectionRecurrenceCleaningThreshold
return a+a*t > b && a-a*t < b
}
// detectRecurrence detects recurrent anomalies.
func detectRecurrence(an anomaliesByDate) (res esProductAnomaliesWithId) {
for date := range an {
prev := date.AddDate(0, -1, 0)
if an[prev].Source.Abnormal && approximateCostComparison(an[date].Source.Cost.Value, an[prev].Source.Cost.Value) {
res = append(res, an[date])
}
}
return res
}
// transformAnomaliesToMap transform a raw slice of anomalies in a parsed map.
func transformAnomaliesToMap(raw esProductAnomaliesWithId) anomaliesByProduct {
res := make(anomaliesByProduct)
for _, r := range raw {
if res[r.Source.Product] == nil {
res[r.Source.Product] = make(anomaliesByDate)
}
if date, err := time.Parse("2006-01-02T15:04:05Z", r.Source.Date); err == nil {
res[r.Source.Product][date] = r
}
}
return res
}
// getAnomaliesFromEs returns product anomalies in ElasticSearch
func getAnomaliesFromEs(ctx context.Context, params AnomalyEsQueryParams) (esProductAnomaliesWithId, error) {
logger := jsonlog.LoggerFromContextOrDefault(ctx)
sr, err := getAnomalyElasticSearchParams(params.Account, params.DateBegin, params.DateEnd, es.Client, params.Index, TypeProductAnomaliesDetection).Do(ctx)
if err != nil {
return nil, err
}
logger.Info("Found anomalies", map[string]interface{}{
"begin": params.DateBegin.String(),
"end": params.DateEnd.String(),
"account": params.Account,
"amount": sr.Hits.TotalHits,
})
typedDocuments := make(esProductAnomaliesWithId, sr.Hits.TotalHits)
for i, h := range sr.Hits.Hits {
typedDocuments[i].Id = h.Id
if b, err := h.Source.MarshalJSON(); err != nil {
} else if err := json.Unmarshal(b, &typedDocuments[i].Source); err != nil {
}
}
return typedDocuments, nil
}