forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.go
112 lines (102 loc) · 2.58 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package node_stats
import (
"encoding/json"
"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
)
var (
schema = s.Schema{
"jvm": c.Dict("jvm", s.Schema{
"mem": c.Dict("mem", s.Schema{
"pools": c.Dict("pools", s.Schema{
"young": c.Dict("young", poolSchema),
"survivor": c.Dict("survivor", poolSchema),
"old": c.Dict("old", poolSchema),
}),
}),
"gc": c.Dict("gc", s.Schema{
"collectors": c.Dict("collectors", s.Schema{
"young": c.Dict("young", collectorSchema),
"old": c.Dict("old", collectorSchema),
}),
}),
}),
"indices": c.Dict("indices", s.Schema{
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
"deleted": c.Int("deleted"),
}),
"store": c.Dict("store", s.Schema{
"size": s.Object{
"bytes": c.Int("size_in_bytes"),
},
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory": s.Object{
"bytes": c.Int("memory_in_bytes"),
},
}),
}),
"fs": c.Dict("fs", s.Schema{
"summary": c.Dict("total", s.Schema{
"total": s.Object{
"bytes": c.Int("total_in_bytes"),
},
"free": s.Object{
"bytes": c.Int("free_in_bytes"),
},
"available": s.Object{
"bytes": c.Int("available_in_bytes"),
},
}),
}),
}
poolSchema = s.Schema{
"used": s.Object{
"bytes": c.Int("used_in_bytes"),
},
"max": s.Object{
"bytes": c.Int("max_in_bytes"),
},
"peak": s.Object{
"bytes": c.Int("peak_used_in_bytes"),
},
"peak_max": s.Object{
"bytes": c.Int("peak_max_in_bytes"),
},
}
collectorSchema = s.Schema{
"collection": s.Object{
"count": c.Int("collection_count"),
"ms": c.Int("collection_time_in_millis"),
},
}
)
func eventsMapping(content []byte) ([]common.MapStr, error) {
nodesStruct := struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]map[string]interface{} `json:"nodes"`
}{}
json.Unmarshal(content, &nodesStruct)
var events []common.MapStr
errors := s.NewErrors()
for name, node := range nodesStruct.Nodes {
event, errs := schema.Apply(node)
// Write name here as full name only available as key
event[mb.ModuleDataKey] = common.MapStr{
"node": common.MapStr{
"name": name,
},
"cluster": common.MapStr{
"name": nodesStruct.ClusterName,
},
}
event[mb.NamespaceKey] = "node.stats"
events = append(events, event)
errors.AddErrors(errs)
}
return events, errors
}