Skip to content

Commit

Permalink
Add x-pack data for Elasticsearch shard metricset (elastic#7097)
Browse files Browse the repository at this point in the history
* Introduces GetNodeInfo method to fetch additional info about the node. This should become obsolete in the future.
* Refactor shard metricset to use module level hostParser and metricset.

The xpack feature works but will need further testing with new builds of Elasticsearch. The plan is to test all xpack metricsets together when they are all done and do further tweaks.
  • Loading branch information
ruflin authored and stevea78 committed May 20, 2018
1 parent ca731a4 commit b75ece5
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 29 deletions.
39 changes: 34 additions & 5 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ type Info struct {
ClusterID string `json:"cluster_uuid"`
}

// NodeInfo struct cotains data about the node
type NodeInfo struct {
Host string `json:"host"`
TransportAddress string `json:"transport_address"`
IP string `json:"ip"`
Name string `json:"name"`
}

// GetClusterID fetches cluster id for given nodeID
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
// Check if cluster id already cached. If yes, return it.
if clusterID, ok := clusterIDCache[nodeID]; ok {
return clusterID, nil
}

// Makes sure the http uri is reset to its inital value
defer http.SetURI(uri)

info, err := GetInfo(http, uri)
if err != nil {
return "", err
Expand All @@ -44,8 +49,6 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error)
//
// The two names are compared
func IsMaster(http *helper.HTTP, uri string) (bool, error) {
// Makes sure the http uri is reset to its inital value
defer http.SetURI(uri)

node, err := getNodeName(http, uri)
if err != nil {
Expand Down Expand Up @@ -97,6 +100,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {

// GetInfo returns the data for the Elasticsearch / endpoint
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
Expand All @@ -116,6 +120,8 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
}

func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
u.Path = path
Expand All @@ -124,3 +130,26 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
http.SetURI(u.String())
return http.FetchContent()
}

// GetNodeInfo returns the node information
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
if err != nil {
return nil, err
}

nodesStruct := struct {
Nodes map[string]*NodeInfo `json:"nodes"`
}{}

json.Unmarshal(content, &nodesStruct)

// _local will only fetch one node info. First entry is node name
for k, v := range nodesStruct.Nodes {
if k == nodeID {
return v, nil
}
}
return nil, fmt.Errorf("no node matched id %s", nodeID)
}
12 changes: 7 additions & 5 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (

var (
schema = s.Schema{
"state": c.Str("state"),
"primary": c.Bool("primary"),
"node": c.Str("node"),
"index": c.Str("index"),
"shard": c.Int("number"),
"state": c.Str("state"),
"primary": c.Bool("primary"),
"node": c.Str("node"),
"index": c.Str("index"),
"shard": c.Int("number"),
"relocating_node": c.Str("relocating_node"),
}
)

type stateStruct struct {
ClusterName string `json:"cluster_name"`
StateID string `json:"state_uuid"`
MasterNode string `json:"master_node"`
RoutingTable struct {
Indices map[string]struct {
Shards map[string][]map[string]interface{} `json:"shards"`
Expand Down
71 changes: 71 additions & 0 deletions metricbeat/module/elasticsearch/shard/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package shard

import (
"encoding/json"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
stateData := &stateStruct{}
err := json.Unmarshal(content, stateData)
if err != nil {
r.Error(err)
return
}

nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

// TODO: This is currently needed because the cluser_uuid is `na` in stateData in case not the full state is requested.
// Will be fixed in: https://github.com/elastic/elasticsearch/pull/30656
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

sourceNode := common.MapStr{
"uuid": stateData.MasterNode,
"host": nodeInfo.Host,
"transport_address": nodeInfo.TransportAddress,
"ip": nodeInfo.IP,
// This seems to be in the x-pack data a subset of the cluster_uuid not the name?
"name": stateData.ClusterName,
"timestamp": common.Time(time.Now()),
}

for _, index := range stateData.RoutingTable.Indices {
for _, shards := range index.Shards {
for _, shard := range shards {
event := mb.Event{}
fields, _ := schema.Apply(shard)

fields["shard"] = fields["number"]
delete(fields, "number")

event.RootFields = common.MapStr{}

event.RootFields = common.MapStr{
"timestamp": time.Now(),
"cluster_uuid": clusterID,
"interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000,
"type": "shards",
"source_node": sourceNode,
"shard": fields,
"state_uuid": stateData.StateID,
}
event.Index = ".monitoring-es-6-mb"

r.Event(event)

}
}
}
}
34 changes: 15 additions & 19 deletions metricbeat/module/elasticsearch/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,51 @@ package shard

import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "shard", New,
mb.WithHostParser(hostParser),
mb.WithHostParser(elasticsearch.HostParser),
mb.DefaultMetricSet(),
mb.WithNamespace("elasticsearch.shard"),
)
}

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
// Get the stats from the local node
DefaultPath: "_cluster/state/version,master_node,routing_table",
}.Build()
const (
// Get the stats from the local node
statePath = "/_cluster/state/version,master_node,routing_table"
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
*elasticsearch.MetricSet
}

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch shard metricset is beta")

http, err := helper.NewHTTP(base)
// Get the stats from the local node
ms, err := elasticsearch.NewMetricSet(base, statePath)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
http: http,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

eventsMapping(r, content)
if m.XPack {
eventsMappingXPack(r, m, content)
} else {
eventsMapping(r, content)
}
}

0 comments on commit b75ece5

Please sign in to comment.