Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ var (
)
)

type labels struct {
keys func(...string) []string
values func(*clusterinfo.Response, ...string) []string
}

// Indices information struct
type Indices struct {
logger *slog.Logger
Expand Down
164 changes: 93 additions & 71 deletions collector/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,34 @@ import (
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
)

var (
shardsTotalLabels = []string{"node", "cluster"}
shardsStateLabels = []string{"node", "cluster", "index", "shard"}

shardsTotal = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "total"),
"Total shards per node",
shardsTotalLabels, nil,
)
jsonParseFailures = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
"Number of errors while parsing JSON.",
nil, nil,
)
shardsState = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "state"),
"Shard state allocated per node by index and shard (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica started, 21=replica initializing, 22=replica relocating).",
shardsStateLabels, nil,
)
)

// ShardResponse has shard's node and index info
type ShardResponse struct {
Index string `json:"index"`
Shard string `json:"shard"`
State string `json:"state"`
Node string `json:"node"`
Index string `json:"index"`
Shard string `json:"shard"`
State string `json:"state"`
Node *string `json:"node,omitempty"`
Prirep string `json:"prirep"`
}

// Shards information struct
Expand All @@ -42,8 +64,7 @@ type Shards struct {
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

nodeShardMetrics []*nodeShardMetric
jsonParseFailures prometheus.Counter
jsonParseFailures float64
}

// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the
Expand All @@ -57,15 +78,8 @@ func (s *Shards) String() string {
return namespace + "shards"
}

type nodeShardMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(shards float64) float64
Labels labels
}

// fetchClusterNameOnce performs a single request to the root endpoint to obtain the cluster name.
func fetchClusterNameOnce(s *Shards) string {
// getClusterName performs a single request to the root endpoint to obtain the cluster name.
func (s *Shards) getClusterName() string {
if s.lastClusterInfo != nil && s.lastClusterInfo.ClusterName != "unknown_cluster" {
return s.lastClusterInfo.ClusterName
}
Expand All @@ -89,25 +103,7 @@ func fetchClusterNameOnce(s *Shards) string {

// NewShards defines Shards Prometheus metrics
func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards {
var shardPtr *Shards
nodeLabels := labels{
keys: func(...string) []string {
return []string{"node", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, base ...string) []string {
if lastClusterinfo != nil {
return append(base, lastClusterinfo.ClusterName)
}
if shardPtr != nil {
return append(base, fetchClusterNameOnce(shardPtr))
}
return append(base, "unknown_cluster")
},
}

shards := &Shards{
// will assign later

logger: logger,
client: client,
url: url,
Expand All @@ -116,26 +112,6 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards {
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
},

nodeShardMetrics: []*nodeShardMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "node_shards", "total"),
"Total shards per node",
nodeLabels.keys(), nil,
),
Value: func(shards float64) float64 {
return shards
},
Labels: nodeLabels,
},
},

jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
Expand All @@ -150,17 +126,14 @@ func NewShards(logger *slog.Logger, client *http.Client, url *url.URL) *Shards {
logger.Debug("exiting cluster info receive loop")
}()

shardPtr = shards
return shards
}

// Describe Shards
func (s *Shards) Describe(ch chan<- *prometheus.Desc) {
ch <- s.jsonParseFailures.Desc()

for _, metric := range s.nodeShardMetrics {
ch <- metric.Desc
}
ch <- jsonParseFailures
ch <- shardsTotal
ch <- shardsState
}

func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) {
Expand All @@ -185,7 +158,7 @@ func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) {
}
var sfr []ShardResponse
if err := json.NewDecoder(res.Body).Decode(&sfr); err != nil {
s.jsonParseFailures.Inc()
s.jsonParseFailures++
return nil, err
}
return sfr, nil
Expand All @@ -207,34 +180,83 @@ func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
// Collect number of shards on each node
func (s *Shards) Collect(ch chan<- prometheus.Metric) {
defer func() {
ch <- s.jsonParseFailures
ch <- prometheus.MustNewConstMetric(
jsonParseFailures,
prometheus.CounterValue,
s.jsonParseFailures,
)
}()

sr, err := s.fetchAndDecodeShards()
if err != nil {
s.logger.Warn(
"failed to fetch and decode node shards stats",
"failed to fetch and decode shards",
"err", err,
)
return
}

clusterName := s.getClusterName()

nodeShards := make(map[string]float64)

for _, shard := range sr {
node := "-"
if shard.Node != nil {
node = *shard.Node
}
if shard.State == "STARTED" {
nodeShards[shard.Node]++
nodeShards[node]++
}

ch <- prometheus.MustNewConstMetric(
shardsState,
prometheus.GaugeValue,
s.encodeState(shard),
node,
clusterName,
shard.Index,
shard.Shard,
)
}

for node, shards := range nodeShards {
for _, metric := range s.nodeShardMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(shards),
metric.Labels.values(s.lastClusterInfo, node)...,
)
}
ch <- prometheus.MustNewConstMetric(
shardsTotal,
prometheus.GaugeValue,
shards,
node,
clusterName,
)
}
}

func (s *Shards) encodeState(shard ShardResponse) float64 {
if shard.Node == nil || shard.State == "UNASSIGNED" {
return 0
}

var state float64
switch shard.Prirep {
case "p":
state = 10
case "r":
state = 20
default:
s.logger.Warn("unknown shard type", "type", shard.Prirep)
return 0
}

switch shard.State {
case "STARTED":
return state
case "INITIALIZING":
state += 1
case "RELOCATING":
state += 2
default:
s.logger.Warn("unknown shard state", "state", shard.State)
return 0
}
return state
}
90 changes: 83 additions & 7 deletions collector/shards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,20 @@ func TestShards(t *testing.T) {
{
name: "7.15.0",
file: "7.15.0.json",
want: `# HELP elasticsearch_node_shards_json_parse_failures Number of errors while parsing JSON.
# TYPE elasticsearch_node_shards_json_parse_failures counter
elasticsearch_node_shards_json_parse_failures 0
# HELP elasticsearch_node_shards_total Total shards per node
# TYPE elasticsearch_node_shards_total gauge
elasticsearch_node_shards_total{cluster="unknown_cluster",node="35dfca79831a"} 3
`,
want: `# HELP elasticsearch_node_shards_state Shard state allocated per node by index (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica initializing, 21=replica started, 22=replica relocating).
# TYPE elasticsearch_node_shards_state gauge
elasticsearch_node_shards_state{cluster="unknown_cluster",index=".geoip_databases",node="35dfca79831a",shard="0"} 10
elasticsearch_node_shards_state{cluster="unknown_cluster",index="otherindex",node="35dfca79831a",shard="0"} 10
elasticsearch_node_shards_state{cluster="unknown_cluster",index="testindex",node="35dfca79831a",shard="0"} 10
elasticsearch_node_shards_state{cluster="unknown_cluster",index="otherindex",node="-",shard="0"} 0
elasticsearch_node_shards_state{cluster="unknown_cluster",index="testindex",node="-",shard="0"} 0
# HELP elasticsearch_node_shards_json_parse_failures Number of errors while parsing JSON.
# TYPE elasticsearch_node_shards_json_parse_failures counter
elasticsearch_node_shards_json_parse_failures 0
# HELP elasticsearch_node_shards_total Total shards per node
# TYPE elasticsearch_node_shards_total gauge
elasticsearch_node_shards_total{cluster="unknown_cluster",node="35dfca79831a"} 3
`,
},
}

Expand Down Expand Up @@ -81,3 +88,72 @@ func TestShards(t *testing.T) {
})
}
}

func TestShards_encodeState(t *testing.T) {
node := "test-node"
tests := []struct {
name string
shard ShardResponse
want float64
}{
{
name: "unassigned_nil_node",
shard: ShardResponse{Node: nil, State: "STARTED", Prirep: "p"},
want: 0,
},
{
name: "unassigned_state",
shard: ShardResponse{Node: nil, State: "UNASSIGNED", Prirep: "p"},
want: 0,
},
{
name: "primary_started",
shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "p"},
want: 10,
},
{
name: "primary_initializing",
shard: ShardResponse{Node: &node, State: "INITIALIZING", Prirep: "p"},
want: 11,
},
{
name: "primary_relocating",
shard: ShardResponse{Node: &node, State: "RELOCATING", Prirep: "p"},
want: 12,
},
{
name: "replica_started",
shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "r"},
want: 20,
},
{
name: "replica_initializing",
shard: ShardResponse{Node: &node, State: "INITIALIZING", Prirep: "r"},
want: 21,
},
{
name: "replica_relocating",
shard: ShardResponse{Node: &node, State: "RELOCATING", Prirep: "r"},
want: 22,
},
{
name: "unknown_prirep",
shard: ShardResponse{Node: &node, State: "STARTED", Prirep: "x"},
want: 0,
},
{
name: "unknown_state",
shard: ShardResponse{Node: &node, State: "UNKNOWN", Prirep: "p"},
want: 0,
},
}

s := &Shards{logger: promslog.NewNopLogger()}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := s.encodeState(tt.shard); got != tt.want {
t.Errorf("encodeState() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 2 additions & 0 deletions metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
| elasticsearch_indices_settings_replicas | gauge | | Index setting value for index.replicas |
| elasticsearch_indices_shards_docs | gauge | 3 | Count of documents on this shard |
| elasticsearch_indices_shards_docs_deleted | gauge | 3 | Count of deleted documents on each shard |
| elasticsearch_node_shards_total | gauge | 2 | Total shards per node |
| elasticsearch_node_shards_state | gauge | 4 | Shard state allocated per node by index and shard (0=unassigned, 10=primary started, 11=primary initializing, 12=primary relocating, 20=replica started, 21=replica initializing, 22=replica relocating). |
| elasticsearch_indices_store_size_bytes | gauge | 1 | Current size of stored index data in bytes |
| elasticsearch_indices_store_size_bytes_primary | gauge | | Current size of stored index data in bytes with only primary shards on all nodes |
| elasticsearch_indices_store_size_bytes_total | gauge | | Current size of stored index data in bytes with all shards on all nodes |
Expand Down