Skip to content

Commit

Permalink
feat: add indices health collector
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Paz committed Jul 4, 2022
1 parent 23c72f7 commit 9e6f06b
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ Further Information
| elasticsearch_indices_translog_size_in_bytes | counter | 1 | Total translog size in bytes
| elasticsearch_indices_warmer_time_seconds_total | counter | 1 | Total warmer time in seconds
| elasticsearch_indices_warmer_total | counter | 1 | Total warmer count
| elasticsearch_indices_health_up | gauge | 0 | Was the last scrape of the Elasticsearch cat indices endpoint successful
| elasticsearch_indices_health_total_scrapes | counter | 0 | Current total Elasticsearch cat indices scrapes
| elasticsearch_indices_health_json_parse_failures | counter | 0 | Number of errors while parsing JSON
| elasticsearch_indices_health_health | gauge | 3 | Whether all primary and replica index shards are allocated.
| elasticsearch_jvm_gc_collection_seconds_count | counter | 2 | Count of JVM GC runs
| elasticsearch_jvm_gc_collection_seconds_sum | counter | 2 | GC run time in seconds
| elasticsearch_jvm_memory_committed_bytes | gauge | 2 | JVM memory currently committed by area
Expand Down
236 changes: 236 additions & 0 deletions collector/indices_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
)

var (
indexColors = []string{"green", "yellow", "red"}
)

type indicesHealthLabels struct {
keys func(...string) []string
values func(*clusterinfo.Response, ...string) []string
}

type indexHealthMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexHealth indexHealthResponse, color string) float64
Labels indicesHealthLabels
}

// IndiceHealth type defines the collector struct
type IndicesHealth struct {
logger log.Logger
client *http.Client
url *url.URL
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter

indexesHealthMetrics []*indexHealthMetric
}

// NewIndicesHealth defines IndicesHealth metrics
func NewIndicesHealth(logger log.Logger, client *http.Client, url *url.URL) *IndicesHealth {
subsystem := "indices_health"

indexLabels := indicesHealthLabels{
keys: func(...string) []string {
return []string{"index", "color", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

indicesHealth := &IndicesHealth{
logger: logger,
client: client,
url: url,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
},

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "up"),
Help: "Was the last scrape of the Elasticsearch cat indices endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "total_scrapes"),
Help: "Current total Elasticsearch cat indices scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),

indexesHealthMetrics: []*indexHealthMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "health"),
"Whether all primary and replica index shards are allocated.",
indexLabels.keys(), nil,
),
Value: func(indexHealth indexHealthResponse, color string) float64 {
if indexHealth.Health == color {
return 1
}
return 0
},
Labels: indexLabels,
},
},
}

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
go func() {
_ = level.Debug(logger).Log("msg", "starting cluster info receive loop")
for ci := range indicesHealth.clusterInfoCh {
if ci != nil {
_ = level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName)
indicesHealth.lastClusterInfo = ci
}
}
_ = level.Debug(logger).Log("msg", "exiting cluster info receive loop")
}()

return indicesHealth
}

// Describe add IndicesHealth metrics descriptions
func (ih *IndicesHealth) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range ih.indexesHealthMetrics {
ch <- metric.Desc
}
ch <- ih.up.Desc()
ch <- ih.totalScrapes.Desc()
ch <- ih.jsonParseFailures.Desc()
}

// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the
// (not exported) clusterinfo.consumer interface
func (ih *IndicesHealth) ClusterLabelUpdates() *chan *clusterinfo.Response {
return &ih.clusterInfoCh
}

// String implements the stringer interface. It is part of the clusterinfo.consumer interface
func (ih *IndicesHealth) String() string {
return namespace + "indiceshealth"
}

func (ih *IndicesHealth) queryURL(u *url.URL) ([]byte, error) {
res, err := ih.client.Get(u.String())
if err != nil {
return []byte{}, fmt.Errorf("failed to get resource from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(ih.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return []byte{}, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

bts, err := ioutil.ReadAll(res.Body)
if err != nil {
return []byte{}, err
}

return bts, nil
}

func (ih *IndicesHealth) fetchAndDecodeIndicesHealth() (CatIndicesResponse, error) {
var isr CatIndicesResponse

u := *ih.url
u.Path = path.Join(u.Path, "/_cat/indices")
u.RawQuery = "format=json&h=health,index"

bts, err := ih.queryURL(&u)
if err != nil {
return isr, err
}

if err := json.Unmarshal(bts, &isr); err != nil {
ih.jsonParseFailures.Inc()
return isr, err
}

return isr, nil
}

// Collect gets indices health metric values
func (ih *IndicesHealth) Collect(ch chan<- prometheus.Metric) {
ih.totalScrapes.Inc()
defer func() {
ch <- ih.up
ch <- ih.totalScrapes
ch <- ih.jsonParseFailures
}()

catIndicesResponse, err := ih.fetchAndDecodeIndicesHealth()
if err != nil {
ih.up.Set(0)
_ = level.Warn(ih.logger).Log(
"msg", "failed to fetch and decode cat indices",
"err", err,
)
return
}
ih.up.Set(1)

for _, metric := range ih.indexesHealthMetrics {
for _, indexHealth := range catIndicesResponse {
for _, color := range indexColors {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexHealth, color),
metric.Labels.values(ih.lastClusterInfo, indexHealth.Index, color)...,
)
}
}
}
}
21 changes: 21 additions & 0 deletions collector/indices_health_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

type indexHealthResponse struct {
Health string `json:"health"`
Index string `json:"index"`
}

type CatIndicesResponse []indexHealthResponse
61 changes: 61 additions & 0 deletions collector/indices_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/go-kit/log"
)

func TestIndicesHealth(t *testing.T) {
// Testcases created using:
// docker run -d -p 9200:9200 elasticsearch:VERSION
// curl -XPUT http://localhost:9200/twitter
// curl http://localhost:9200/_cat/indices?format=json&h=health,index
tcs := map[string]string{
"1.7.6": `[{"health":"yellow","index":"twitter"}]`,
"2.4.5": `[{"health":"yellow","index":"twitter"}]`,
"5.4.2": `[{"health":"yellow","index":"twitter"}]`,
"5.5.2": `[{"health":"yellow","index":"twitter"}]`,
"8.2.3": `[{"health":"yellow","index":"twitter"}]`,
}
for ver, out := range tcs {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, out)
}))
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewIndicesHealth(log.NewNopLogger(), http.DefaultClient, u)
ihr, err := c.fetchAndDecodeIndicesHealth()
if err != nil {
t.Fatalf("Failed to fetch or decode cluster health: %s", err)
}
t.Logf("[%s] Cluster Health Response: %+v", ver, ihr)
if ihr[0].Index != "twitter" {
t.Errorf("is not twitter")
}
if ihr[0].Health != "yellow" {
t.Errorf("twitter is not yellow")
}
}
}
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ func main() {
_ = level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
os.Exit(1)
}
iHC := collector.NewIndicesHealth(logger, httpClient, esURL)
prometheus.MustRegister(iHC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iHC); registerErr != nil {
_ = level.Error(logger).Log("msg", "failed to register indices health collector in cluster info")
os.Exit(1)
}
}

if *esExportSnapshots {
Expand Down

0 comments on commit 9e6f06b

Please sign in to comment.