From 2a3f356b396ec707220ce3bc4fbcd502d64795d5 Mon Sep 17 00:00:00 2001 From: Neemias Date: Fri, 21 Nov 2025 12:39:13 +0100 Subject: [PATCH] feat: add elasticsearch_node_shards_state metric Signed-off-by: Neemias Almeida --- collector/indices.go | 5 -- collector/shards.go | 164 ++++++++++++++++++++++----------------- collector/shards_test.go | 90 +++++++++++++++++++-- metrics.md | 2 + 4 files changed, 178 insertions(+), 83 deletions(-) diff --git a/collector/indices.go b/collector/indices.go index fdad33f5..d2c4e58a 100644 --- a/collector/indices.go +++ b/collector/indices.go @@ -432,11 +432,6 @@ var ( ) ) -type labels struct { - keys func(...string) []string - values func(*clusterinfo.Response, ...string) []string -} - // Indices information struct type Indices struct { logger *slog.Logger diff --git a/collector/shards.go b/collector/shards.go index 351680ca..eba0a633 100644 --- a/collector/shards.go +++ b/collector/shards.go @@ -26,12 +26,34 @@ import ( "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" ) +var ( + shardsTotalLabels = []string{"node", "cluster"} + shardsStateLabels = []string{"node", "cluster", "index", "shard"} + + shardsTotal = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "node_shards", "total"), + "Total shards per node", + shardsTotalLabels, nil, + ) + jsonParseFailures = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"), + "Number of errors while parsing JSON.", + nil, nil, + ) + shardsState = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "node_shards", "state"), + "Shard state allocated per node by index and shard (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica started, 21=replica initializing, 22=replica relocating).", + shardsStateLabels, nil, + ) +) + // ShardResponse has shard's node and index info type ShardResponse struct { - Index string `json:"index"` - Shard string `json:"shard"` - State string `json:"state"` - Node string `json:"node"` + Index string `json:"index"` + Shard string `json:"shard"` + State string `json:"state"` + Node *string `json:"node,omitempty"` + Prirep string `json:"prirep"` } // Shards information struct @@ -42,8 +64,7 @@ type Shards struct { clusterInfoCh chan *clusterinfo.Response lastClusterInfo *clusterinfo.Response - nodeShardMetrics []*nodeShardMetric - jsonParseFailures prometheus.Counter + jsonParseFailures float64 } // ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the @@ -57,15 +78,8 @@ func (s *Shards) String() string { return namespace + "shards" } -type nodeShardMetric struct { - Type prometheus.ValueType - Desc *prometheus.Desc - Value func(shards float64) float64 - Labels labels -} - -// fetchClusterNameOnce performs a single request to the root endpoint to obtain the cluster name. -func fetchClusterNameOnce(s *Shards) string { +// getClusterName performs a single request to the root endpoint to obtain the cluster name. +func (s *Shards) getClusterName() string { if s.lastClusterInfo != nil && s.lastClusterInfo.ClusterName != "unknown_cluster" { return s.lastClusterInfo.ClusterName } @@ -89,25 +103,7 @@ func fetchClusterNameOnce(s *Shards) string { // NewShards defines Shards Prometheus metrics func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards { - var shardPtr *Shards - nodeLabels := labels{ - keys: func(...string) []string { - return []string{"node", "cluster"} - }, - values: func(lastClusterinfo *clusterinfo.Response, base ...string) []string { - if lastClusterinfo != nil { - return append(base, lastClusterinfo.ClusterName) - } - if shardPtr != nil { - return append(base, fetchClusterNameOnce(shardPtr)) - } - return append(base, "unknown_cluster") - }, - } - shards := &Shards{ - // will assign later - logger: logger, client: client, url: url, @@ -116,26 +112,6 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards { lastClusterInfo: &clusterinfo.Response{ ClusterName: "unknown_cluster", }, - - nodeShardMetrics: []*nodeShardMetric{ - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "node_shards", "total"), - "Total shards per node", - nodeLabels.keys(), nil, - ), - Value: func(shards float64) float64 { - return shards - }, - Labels: nodeLabels, - }, - }, - - jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ - Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"), - Help: "Number of errors while parsing JSON.", - }), } // start go routine to fetch clusterinfo updates and save them to lastClusterinfo @@ -150,17 +126,14 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards { logger.Debug("exiting cluster info receive loop") }() - shardPtr = shards return shards } // Describe Shards func (s *Shards) Describe(ch chan<- *prometheus.Desc) { - ch <- s.jsonParseFailures.Desc() - - for _, metric := range s.nodeShardMetrics { - ch <- metric.Desc - } + ch <- jsonParseFailures + ch <- shardsTotal + ch <- shardsState } func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) { @@ -185,7 +158,7 @@ func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) { } var sfr []ShardResponse if err := json.NewDecoder(res.Body).Decode(&sfr); err != nil { - s.jsonParseFailures.Inc() + s.jsonParseFailures++ return nil, err } return sfr, nil @@ -207,34 +180,83 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) { // Collect number of shards on each node func (s *Shards) Collect(ch chan<- prometheus.Metric) { defer func() { - ch <- s.jsonParseFailures + ch <- prometheus.MustNewConstMetric( + jsonParseFailures, + prometheus.CounterValue, + s.jsonParseFailures, + ) }() sr, err := s.fetchAndDecodeShards() if err != nil { s.logger.Warn( - "failed to fetch and decode node shards stats", + "failed to fetch and decode shards", "err", err, ) return } + clusterName := s.getClusterName() + nodeShards := make(map[string]float64) for _, shard := range sr { + node := "-" + if shard.Node != nil { + node = *shard.Node + } if shard.State == "STARTED" { - nodeShards[shard.Node]++ + nodeShards[node]++ } + + ch <- prometheus.MustNewConstMetric( + shardsState, + prometheus.GaugeValue, + s.encodeState(shard), + node, + clusterName, + shard.Index, + shard.Shard, + ) } for node, shards := range nodeShards { - for _, metric := range s.nodeShardMetrics { - ch <- prometheus.MustNewConstMetric( - metric.Desc, - metric.Type, - metric.Value(shards), - metric.Labels.values(s.lastClusterInfo, node)..., - ) - } + ch <- prometheus.MustNewConstMetric( + shardsTotal, + prometheus.GaugeValue, + shards, + node, + clusterName, + ) + } +} + +func (s *Shards) encodeState(shard ShardResponse) float64 { + if shard.Node == nil || shard.State == "UNASSIGNED" { + return 0 + } + + var state float64 + switch shard.Prirep { + case "p": + state = 10 + case "r": + state = 20 + default: + s.logger.Warn("unknown shard type", "type", shard.Prirep) + return 0 + } + + switch shard.State { + case "STARTED": + return state + case "INITIALIZING": + state += 1 + case "RELOCATING": + state += 2 + default: + s.logger.Warn("unknown shard state", "state", shard.State) + return 0 } + return state } diff --git a/collector/shards_test.go b/collector/shards_test.go index 12d7d9fc..0c7c7bf5 100644 --- a/collector/shards_test.go +++ b/collector/shards_test.go @@ -42,13 +42,20 @@ func TestShards(t *testing.T) { { name: "7.15.0", file: "7.15.0.json", - want: `# HELP elasticsearch_node_shards_json_parse_failures Number of errors while parsing JSON. - # TYPE elasticsearch_node_shards_json_parse_failures counter - elasticsearch_node_shards_json_parse_failures 0 - # HELP elasticsearch_node_shards_total Total shards per node - # TYPE elasticsearch_node_shards_total gauge - elasticsearch_node_shards_total{cluster="unknown_cluster",node="35dfca79831a"} 3 - `, + want: `# HELP elasticsearch_node_shards_state Shard state allocated per node by index (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica initializing, 21=replica started, 22=replica relocating). + # TYPE elasticsearch_node_shards_state gauge + elasticsearch_node_shards_state{cluster="unknown_cluster",index=".geoip_databases",node="35dfca79831a",shard="0"} 10 + elasticsearch_node_shards_state{cluster="unknown_cluster",index="otherindex",node="35dfca79831a",shard="0"} 10 + elasticsearch_node_shards_state{cluster="unknown_cluster",index="testindex",node="35dfca79831a",shard="0"} 10 + elasticsearch_node_shards_state{cluster="unknown_cluster",index="otherindex",node="-",shard="0"} 0 + elasticsearch_node_shards_state{cluster="unknown_cluster",index="testindex",node="-",shard="0"} 0 + # HELP elasticsearch_node_shards_json_parse_failures Number of errors while parsing JSON. + # TYPE elasticsearch_node_shards_json_parse_failures counter + elasticsearch_node_shards_json_parse_failures 0 + # HELP elasticsearch_node_shards_total Total shards per node + # TYPE elasticsearch_node_shards_total gauge + elasticsearch_node_shards_total{cluster="unknown_cluster",node="35dfca79831a"} 3 +`, }, } @@ -81,3 +88,72 @@ func TestShards(t *testing.T) { }) } } + +func TestShards_encodeState(t *testing.T) { + node := "test-node" + tests := []struct { + name string + shard ShardResponse + want float64 + }{ + { + name: "unassigned_nil_node", + shard: ShardResponse{Node: nil, State: "STARTED", Prirep: "p"}, + want: 0, + }, + { + name: "unassigned_state", + shard: ShardResponse{Node: nil, State: "UNASSIGNED", Prirep: "p"}, + want: 0, + }, + { + name: "primary_started", + shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "p"}, + want: 10, + }, + { + name: "primary_initializing", + shard: ShardResponse{Node: &node, State: "INITIALIZING", Prirep: "p"}, + want: 11, + }, + { + name: "primary_relocating", + shard: ShardResponse{Node: &node, State: "RELOCATING", Prirep: "p"}, + want: 12, + }, + { + name: "replica_started", + shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "r"}, + want: 20, + }, + { + name: "replica_initializing", + shard: ShardResponse{Node: &node, State: "INITIALIZING", Prirep: "r"}, + want: 21, + }, + { + name: "replica_relocating", + shard: ShardResponse{Node: &node, State: "RELOCATING", Prirep: "r"}, + want: 22, + }, + { + name: "unknown_prirep", + shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "x"}, + want: 0, + }, + { + name: "unknown_state", + shard: ShardResponse{Node: &node, State: "UNKNOWN", Prirep: "p"}, + want: 0, + }, + } + + s := &Shards{logger: promslog.NewNopLogger()} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := s.encodeState(tt.shard); got != tt.want { + t.Errorf("encodeState() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/metrics.md b/metrics.md index d1fce9ef..2f4403f5 100644 --- a/metrics.md +++ b/metrics.md @@ -89,6 +89,8 @@ | elasticsearch_indices_settings_replicas | gauge | | Index setting value for index.replicas | | elasticsearch_indices_shards_docs | gauge | 3 | Count of documents on this shard | | elasticsearch_indices_shards_docs_deleted | gauge | 3 | Count of deleted documents on each shard | +| elasticsearch_node_shards_total | gauge | 2 | Total shards per node | +| elasticsearch_node_shards_state | gauge | 4 | Shard state allocated per node by index and shard (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica started, 21=replica initializing, 22=replica relocating). | | elasticsearch_indices_store_size_bytes | gauge | 1 | Current size of stored index data in bytes | | elasticsearch_indices_store_size_bytes_primary | gauge | | Current size of stored index data in bytes with only primary shards on all nodes | | elasticsearch_indices_store_size_bytes_total | gauge | | Current size of stored index data in bytes with all shards on all nodes |