Skip to content

Commit

Permalink
Fix merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: sowmiyamuthuraman <sowmiyamuthuraman@gmail.com>
  • Loading branch information
sowmiyamuthuraman committed Oct 19, 2019
2 parents 0b54ba7 + 06d440a commit 7993c6b
Showing 1 changed file with 76 additions and 47 deletions.
123 changes: 76 additions & 47 deletions consul_exporter.go
Expand Up @@ -23,7 +23,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -107,7 +106,7 @@ type promHTTPLogger struct {
}

func (l promHTTPLogger) Println(v ...interface{}) {
level.Error(l.logger).Log("msg", fmt.Sprintf("%v", v))
level.Error(l.logger).Log("msg", fmt.Sprint(v...))
}

// Exporter collects Consul stats from the given server and exports them using
Expand Down Expand Up @@ -200,27 +199,42 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
// Collect fetches the stats from configured Consul location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
// How many peers are in the Consul cluster?
peers, err := e.client.Status().Peers()
if err != nil {
ok := e.collectPeersMetric(ch)
ok = e.collectLeaderMetric(ch) && ok
ok = e.collectNodesMetric(ch) && ok
ok = e.collectMembersMetric(ch) && ok
ok = e.collectServicesMetric(ch) && ok
ok = e.collectHealthStateMetric(ch) && ok
ok = e.collectKeyValues(ch) && ok

if ok {
ch <- prometheus.MustNewConstMetric(
up, prometheus.GaugeValue, 0,
up, prometheus.GaugeValue, 1.0,
)
} else {
ch <- prometheus.MustNewConstMetric(
up, prometheus.GaugeValue, 0.0,
)
level.Error(e.logger).Log("msg", "Can't query consul", "err", err)
return
}
}

// We'll use peers to decide that we're up.
ch <- prometheus.MustNewConstMetric(
up, prometheus.GaugeValue, 1,
)
func (e *Exporter) collectPeersMetric(ch chan<- prometheus.Metric) bool {
peers, err := e.client.Status().Peers()
if err != nil {
level.Error(e.logger).Log("msg", "Can't query consul", "err", err)
return false
}
ch <- prometheus.MustNewConstMetric(
clusterServers, prometheus.GaugeValue, float64(len(peers)),
)
return true
}

func (e *Exporter) collectLeaderMetric(ch chan<- prometheus.Metric) bool {
leader, err := e.client.Status().Leader()
if err != nil {
level.Error(e.logger).Log("msg", "Can't query consul", "err", err)
return false
}
if len(leader) == 0 {
ch <- prometheus.MustNewConstMetric(
Expand All @@ -231,48 +245,58 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
clusterLeader, prometheus.GaugeValue, 1,
)
}
return true
}

// How many nodes are registered?
func (e *Exporter) collectNodesMetric(ch chan<- prometheus.Metric) bool {
nodes, _, err := e.client.Catalog().Nodes(&queryOptions)
if err != nil {
// FIXME: How should we handle a partial failure like this?
} else {
ch <- prometheus.MustNewConstMetric(
nodeCount, prometheus.GaugeValue, float64(len(nodes)),
)
level.Error(e.logger).Log("msg", "Failed to query catalog for nodes", "err", err)
return false
}
// Query for member status.
ch <- prometheus.MustNewConstMetric(
nodeCount, prometheus.GaugeValue, float64(len(nodes)),
)
return true
}

func (e *Exporter) collectMembersMetric(ch chan<- prometheus.Metric) bool {
members, err := e.client.Agent().Members(false)
if err != nil {
// FIXME: How should we handle a partial failure like this?
} else {
for _, entry := range members {
ch <- prometheus.MustNewConstMetric(
memberStatus, prometheus.GaugeValue, float64(entry.Status), entry.Name,
)
}
level.Error(e.logger).Log("msg", "Failed to query member status", "err", err)
return false
}
for _, entry := range members {
ch <- prometheus.MustNewConstMetric(
memberStatus, prometheus.GaugeValue, float64(entry.Status), entry.Name,
)
}
return true
}

// Query for the full list of services.
func (e *Exporter) collectServicesMetric(ch chan<- prometheus.Metric) bool {
serviceNames, _, err := e.client.Catalog().Services(&queryOptions)
if err != nil {
// FIXME: How should we handle a partial failure like this?
return
level.Error(e.logger).Log("msg", "Failed to query for services", "err", err)
return false
}
ch <- prometheus.MustNewConstMetric(
serviceCount, prometheus.GaugeValue, float64(len(serviceNames)),
)

if e.healthSummary {
e.collectHealthSummary(ch, serviceNames)
if ok := e.collectHealthSummary(ch, serviceNames); !ok {
return false
}
}
return true
}

func (e *Exporter) collectHealthStateMetric(ch chan<- prometheus.Metric) bool {
checks, _, err := e.client.Health().State("any", &queryOptions)
if err != nil {
level.Error(e.logger).Log("msg", "Failed to query service health", "err", err)
return
return false
}

for _, hc := range checks {
var passing, warning, critical, maintenance float64

Expand Down Expand Up @@ -315,38 +339,41 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
)
}
}

e.collectKeyValues(ch)
return true
}

// collectHealthSummary collects health information about every node+service
// combination. It will cause one lookup query per service.
func (e *Exporter) collectHealthSummary(ch chan<- prometheus.Metric, serviceNames map[string][]string) {
var wg sync.WaitGroup
func (e *Exporter) collectHealthSummary(ch chan<- prometheus.Metric, serviceNames map[string][]string) bool {
ok := make(chan bool)

for s := range serviceNames {
wg.Add(1)
go func(s string) {
defer wg.Done()
e.collectOneHealthSummary(ch, s)
ok <- e.collectOneHealthSummary(ch, s)
}(s)
}

wg.Wait()
allOK := true
for range serviceNames {
allOK = <-ok && allOK
}
close(ok)

return allOK
}

func (e *Exporter) collectOneHealthSummary(ch chan<- prometheus.Metric, serviceName string) {
func (e *Exporter) collectOneHealthSummary(ch chan<- prometheus.Metric, serviceName string) bool {
// See https://github.com/hashicorp/consul/issues/1096.
if strings.HasPrefix(serviceName, "/") {
level.Warn(e.logger).Log("msg", "Skipping service because it starts with a slash", "service_name", serviceName)
return
return true
}
level.Debug(e.logger).Log("msg", "Fetching health summary", "serviceName", serviceName)

service, _, err := e.client.Health().Service(serviceName, "", false, &queryOptions)
if err != nil {
level.Error(e.logger).Log("msg", "Failed to query service health", "err", err)
return
return false
}

for _, entry := range service {
Expand All @@ -372,18 +399,19 @@ func (e *Exporter) collectOneHealthSummary(ch chan<- prometheus.Metric, serviceN
tags[tag] = struct{}{}
}
}
return true
}

func (e *Exporter) collectKeyValues(ch chan<- prometheus.Metric) {
func (e *Exporter) collectKeyValues(ch chan<- prometheus.Metric) bool {
if e.kvPrefix == "" {
return
return true
}

kv := e.client.KV()
pairs, _, err := kv.List(e.kvPrefix, &queryOptions)
if err != nil {
level.Error(e.logger).Log("msg", "Error fetching key/values", "err", err)
return
return false
}

for _, pair := range pairs {
Expand All @@ -396,6 +424,7 @@ func (e *Exporter) collectKeyValues(ch chan<- prometheus.Metric) {
}
}
}
return true
}

func init() {
Expand Down

0 comments on commit 7993c6b

Please sign in to comment.