diff --git a/README.md b/README.md index d5304f5c..e1bbd4b8 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,10 @@ Further Information | elasticsearch_indices_translog_size_in_bytes | counter | 1 | Total translog size in bytes | elasticsearch_indices_warmer_time_seconds_total | counter | 1 | Total warmer time in seconds | elasticsearch_indices_warmer_total | counter | 1 | Total warmer count +| elasticsearch_indices_health_up | gauge | 0 | Was the last scrape of the Elasticsearch cat indices endpoint successful +| elasticsearch_indices_health_total_scrapes | counter | 0 | Current total Elasticsearch cat indices scrapes +| elasticsearch_indices_health_json_parse_failures | counter | 0 | Number of errors while parsing JSON +| elasticsearch_indices_health_health | gauge | 3 | Whether all primary and replica index shards are allocated. | elasticsearch_jvm_gc_collection_seconds_count | counter | 2 | Count of JVM GC runs | elasticsearch_jvm_gc_collection_seconds_sum | counter | 2 | GC run time in seconds | elasticsearch_jvm_memory_committed_bytes | gauge | 2 | JVM memory currently committed by area diff --git a/collector/indices_health.go b/collector/indices_health.go new file mode 100644 index 00000000..124e7410 --- /dev/null +++ b/collector/indices_health.go @@ -0,0 +1,236 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + indexColors = []string{"green", "yellow", "red"} +) + +type indicesHealthLabels struct { + keys func(...string) []string + values func(*clusterinfo.Response, ...string) []string +} + +type indexHealthMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(indexHealth indexHealthResponse, color string) float64 + Labels indicesHealthLabels +} + +// IndiceHealth type defines the collector struct +type IndicesHealth struct { + logger log.Logger + client *http.Client + url *url.URL + clusterInfoCh chan *clusterinfo.Response + lastClusterInfo *clusterinfo.Response + + up prometheus.Gauge + totalScrapes, jsonParseFailures prometheus.Counter + + indexesHealthMetrics []*indexHealthMetric +} + +// NewIndicesHealth defines IndicesHealth metrics +func NewIndicesHealth(logger log.Logger, client *http.Client, url *url.URL) *IndicesHealth { + subsystem := "indices_health" + + indexLabels := indicesHealthLabels{ + keys: func(...string) []string { + return []string{"index", "color", "cluster"} + }, + values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string { + if lastClusterinfo != nil { + return append(s, lastClusterinfo.ClusterName) + } + // this shouldn't happen, as the clusterinfo Retriever has a blocking + // Run method. It blocks until the first clusterinfo call has succeeded + return append(s, "unknown_cluster") + }, + } + + indicesHealth := &IndicesHealth{ + logger: logger, + client: client, + url: url, + clusterInfoCh: make(chan *clusterinfo.Response), + lastClusterInfo: &clusterinfo.Response{ + ClusterName: "unknown_cluster", + }, + + up: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "up"), + Help: "Was the last scrape of the Elasticsearch cat indices endpoint successful.", + }), + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "total_scrapes"), + Help: "Current total Elasticsearch cat indices scrapes.", + }), + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures"), + Help: "Number of errors while parsing JSON.", + }), + + indexesHealthMetrics: []*indexHealthMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "health"), + "Whether all primary and replica index shards are allocated.", + indexLabels.keys(), nil, + ), + Value: func(indexHealth indexHealthResponse, color string) float64 { + if indexHealth.Health == color { + return 1 + } + return 0 + }, + Labels: indexLabels, + }, + }, + } + + // start go routine to fetch clusterinfo updates and save them to lastClusterinfo + go func() { + _ = level.Debug(logger).Log("msg", "starting cluster info receive loop") + for ci := range indicesHealth.clusterInfoCh { + if ci != nil { + _ = level.Debug(logger).Log("msg", "received cluster info update", "cluster", ci.ClusterName) + indicesHealth.lastClusterInfo = ci + } + } + _ = level.Debug(logger).Log("msg", "exiting cluster info receive loop") + }() + + return indicesHealth +} + +// Describe add IndicesHealth metrics descriptions +func (ih *IndicesHealth) Describe(ch chan<- *prometheus.Desc) { + for _, metric := range ih.indexesHealthMetrics { + ch <- metric.Desc + } + ch <- ih.up.Desc() + ch <- ih.totalScrapes.Desc() + ch <- ih.jsonParseFailures.Desc() +} + +// ClusterLabelUpdates returns a pointer to a channel to receive cluster info updates. It implements the +// (not exported) clusterinfo.consumer interface +func (ih *IndicesHealth) ClusterLabelUpdates() *chan *clusterinfo.Response { + return &ih.clusterInfoCh +} + +// String implements the stringer interface. It is part of the clusterinfo.consumer interface +func (ih *IndicesHealth) String() string { + return namespace + "indiceshealth" +} + +func (ih *IndicesHealth) queryURL(u *url.URL) ([]byte, error) { + res, err := ih.client.Get(u.String()) + if err != nil { + return []byte{}, fmt.Errorf("failed to get resource from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + defer func() { + err = res.Body.Close() + if err != nil { + _ = level.Warn(ih.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() + + if res.StatusCode != http.StatusOK { + return []byte{}, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + } + + bts, err := ioutil.ReadAll(res.Body) + if err != nil { + return []byte{}, err + } + + return bts, nil +} + +func (ih *IndicesHealth) fetchAndDecodeIndicesHealth() (CatIndicesResponse, error) { + var isr CatIndicesResponse + + u := *ih.url + u.Path = path.Join(u.Path, "/_cat/indices") + u.RawQuery = "format=json&h=health,index" + + bts, err := ih.queryURL(&u) + if err != nil { + return isr, err + } + + if err := json.Unmarshal(bts, &isr); err != nil { + ih.jsonParseFailures.Inc() + return isr, err + } + + return isr, nil +} + +// Collect gets indices health metric values +func (ih *IndicesHealth) Collect(ch chan<- prometheus.Metric) { + ih.totalScrapes.Inc() + defer func() { + ch <- ih.up + ch <- ih.totalScrapes + ch <- ih.jsonParseFailures + }() + + catIndicesResponse, err := ih.fetchAndDecodeIndicesHealth() + if err != nil { + ih.up.Set(0) + _ = level.Warn(ih.logger).Log( + "msg", "failed to fetch and decode cat indices", + "err", err, + ) + return + } + ih.up.Set(1) + + for _, metric := range ih.indexesHealthMetrics { + for _, indexHealth := range catIndicesResponse { + for _, color := range indexColors { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(indexHealth, color), + metric.Labels.values(ih.lastClusterInfo, indexHealth.Index, color)..., + ) + } + } + } +} diff --git a/collector/indices_health_response.go b/collector/indices_health_response.go new file mode 100644 index 00000000..3d22f59f --- /dev/null +++ b/collector/indices_health_response.go @@ -0,0 +1,21 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +type indexHealthResponse struct { + Health string `json:"health"` + Index string `json:"index"` +} + +type CatIndicesResponse []indexHealthResponse diff --git a/collector/indices_health_test.go b/collector/indices_health_test.go new file mode 100644 index 00000000..955bfcfd --- /dev/null +++ b/collector/indices_health_test.go @@ -0,0 +1,61 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-kit/log" +) + +func TestIndicesHealth(t *testing.T) { + // Testcases created using: + // docker run -d -p 9200:9200 elasticsearch:VERSION + // curl -XPUT http://localhost:9200/twitter + // curl http://localhost:9200/_cat/indices?format=json&h=health,index + tcs := map[string]string{ + "1.7.6": `[{"health":"yellow","index":"twitter"}]`, + "2.4.5": `[{"health":"yellow","index":"twitter"}]`, + "5.4.2": `[{"health":"yellow","index":"twitter"}]`, + "5.5.2": `[{"health":"yellow","index":"twitter"}]`, + "8.2.3": `[{"health":"yellow","index":"twitter"}]`, + } + for ver, out := range tcs { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, out) + })) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } + c := NewIndicesHealth(log.NewNopLogger(), http.DefaultClient, u) + ihr, err := c.fetchAndDecodeIndicesHealth() + if err != nil { + t.Fatalf("Failed to fetch or decode cluster health: %s", err) + } + t.Logf("[%s] Cluster Health Response: %+v", ver, ihr) + if ihr[0].Index != "twitter" { + t.Errorf("is not twitter") + } + if ihr[0].Health != "yellow" { + t.Errorf("twitter is not yellow") + } + } +} diff --git a/main.go b/main.go index 89236ceb..da7e390e 100644 --- a/main.go +++ b/main.go @@ -191,6 +191,12 @@ func main() { _ = level.Error(logger).Log("msg", "failed to register indices collector in cluster info") os.Exit(1) } + iHC := collector.NewIndicesHealth(logger, httpClient, esURL) + prometheus.MustRegister(iHC) + if registerErr := clusterInfoRetriever.RegisterConsumer(iHC); registerErr != nil { + _ = level.Error(logger).Log("msg", "failed to register indices health collector in cluster info") + os.Exit(1) + } } if *esExportSnapshots {