-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
aggregation.response.go
143 lines (117 loc) · 3.09 KB
/
aggregation.response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package opensearch_query
import (
"encoding/json"
"github.com/influxdata/telegraf"
)
type AggregationResponse struct {
Hits *SearchHits `json:"hits"`
Aggregations *Aggregation `json:"aggregations"`
}
type SearchHits struct {
TotalHits *TotalHits `json:"total,omitempty"`
}
type TotalHits struct {
Relation string `json:"relation"`
Value int64 `json:"value"`
}
type MetricAggregation map[string]interface{}
type AggregateValue struct {
metrics MetricAggregation
buckets []BucketData
}
type Aggregation map[string]AggregateValue
type BucketData struct {
DocumentCount int64 `json:"doc_count"`
Key string `json:"key"`
subaggregation Aggregation
}
func (a *AggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement string) error {
// Simple case (no aggregations)
if a.Aggregations == nil {
tags := make(map[string]string)
fields := map[string]interface{}{
"doc_count": a.Hits.TotalHits.Value,
}
acc.AddFields(measurement, fields, tags)
return nil
}
return a.Aggregations.GetMetrics(acc, measurement, a.Hits.TotalHits.Value, map[string]string{})
}
func (a *Aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error {
var err error
fields := make(map[string]interface{})
for name, agg := range *a {
if agg.IsAggregation() {
for _, bucket := range agg.buckets {
tt := map[string]string{name: bucket.Key}
for k, v := range tags {
tt[k] = v
}
err = bucket.subaggregation.GetMetrics(acc, measurement, bucket.DocumentCount, tt)
if err != nil {
return err
}
}
return nil
}
for metric, value := range agg.metrics {
switch value := value.(type) {
case map[string]interface{}:
for k, v := range value {
fields[name+"_"+metric+"_"+k] = v
}
default:
fields[name+"_"+metric] = value
}
}
}
fields["doc_count"] = docCount
acc.AddFields(measurement, fields, tags)
return nil
}
func (a *AggregateValue) UnmarshalJSON(bytes []byte) error {
var partial map[string]json.RawMessage
err := json.Unmarshal(bytes, &partial)
if err != nil {
return err
}
// We'll continue to unmarshal if we have buckets
if b, found := partial["buckets"]; found {
return json.Unmarshal(b, &a.buckets)
}
// Use the remaining bytes as metrics
return json.Unmarshal(bytes, &a.metrics)
}
func (a *AggregateValue) IsAggregation() bool {
return !(a.buckets == nil)
}
func (b *BucketData) UnmarshalJSON(bytes []byte) error {
var partial map[string]json.RawMessage
var err error
err = json.Unmarshal(bytes, &partial)
if err != nil {
return err
}
err = json.Unmarshal(partial["doc_count"], &b.DocumentCount)
if err != nil {
return err
}
delete(partial, "doc_count")
err = json.Unmarshal(partial["key"], &b.Key)
if err != nil {
return err
}
delete(partial, "key")
if b.subaggregation == nil {
b.subaggregation = make(Aggregation)
}
for name, message := range partial {
var subaggregation AggregateValue
err = json.Unmarshal(message, &subaggregation)
if err != nil {
return err
}
b.subaggregation[name] = subaggregation
}
return nil
}