Skip to content

Commit

Permalink
Add replicator metrics (#96)
Browse files Browse the repository at this point in the history
Add replicator metrics

* Added implementation, tests, and parameters for replicator metrics
* Added system scoping to handle endpoint overlap
* Refactored prefixes

Signed-off-by: Colin Sullivan <colin@synadia.com>
  • Loading branch information
ColinSullivan1 committed Aug 19, 2019
1 parent 057940a commit 21f0e17
Show file tree
Hide file tree
Showing 22 changed files with 1,133 additions and 105 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ go:

install:
- go get github.com/nats-io/go-nats
- go get github.com/nats-io/gnatsd
- go get github.com/nats-io/nats-server
- go get github.com/nats-io/nats-replicator
- go get github.com/mattn/goveralls
- go get github.com/wadey/gocovmerge
- go get github.com/alecthomas/gometalinter
Expand Down
57 changes: 41 additions & 16 deletions collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2018 The NATS Authors
// Copyright 2017-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package collector has various collector utilities and implementations.
package collector

import (
Expand All @@ -24,7 +25,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var namespace = "gnatsd"
// System Name Varibles
var (
// use gnatsd for backward compatiblity. Changing would require users to
// change their dashboards or other applications that rely on the
// prometheus metric names.
CoreSystem = "gnatsd"
StreamingSystem = "nss"
ReplicatorSystem = "replicator"
)

// CollectedServer is a NATS server polled by this collector
type CollectedServer struct {
Expand All @@ -38,17 +47,19 @@ type NATSCollector struct {
Stats map[string]interface{}
httpClient *http.Client
endpoint string
system string
servers []*CollectedServer
}

// newPrometheusGaugeVec creates a custom GaugeVec
// Based on our current integration, we're going to treat all metrics as gauges.
// We are going to call the set message on the gauge when we receive an updated
// metrics pull.
func newPrometheusGaugeVec(subsystem string, name string, help string, prefix string) (metric *prometheus.GaugeVec) {
func newPrometheusGaugeVec(system, subsystem, name, help, prefix string) (metric *prometheus.GaugeVec) {
if help == "" {
help = name
}
namespace := system
if prefix != "" {
namespace = prefix
}
Expand Down Expand Up @@ -77,7 +88,7 @@ func getMetricURL(httpClient *http.Client, URL string, response interface{}) err
if err != nil {
return err
}

Tracef("Retrieved metric result:\n%s\n", string(body))
return json.Unmarshal(body, &response)
}

Expand Down Expand Up @@ -244,7 +255,7 @@ func (nc *NATSCollector) initMetricsFromServers(namespace string) {
i := response[k]
switch v := i.(type) {
case float64: // all json numbers are handled here.
nc.Stats[k] = newPrometheusGaugeVec(nc.endpoint, k, "", namespace)
nc.Stats[k] = newPrometheusGaugeVec(nc.system, nc.endpoint, k, "", namespace)
case string:
// do nothing
default:
Expand All @@ -255,21 +266,13 @@ func (nc *NATSCollector) initMetricsFromServers(namespace string) {
}
}

// NewCollector creates a new NATS Collector from a list of monitoring URLs.
// Each URL should be to a specific endpoint (e.g. varz, connz, subsz, or routez)
func NewCollector(endpoint string, servers []*CollectedServer, prefix string) prometheus.Collector {
if isStreamingEndpoint(endpoint) {
return newStreamingCollector(endpoint, servers, prefix)
}
if isConnzEndpoint(endpoint) {
return newConnzCollector(servers)
}

func newNatsCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
// TODO: Potentially add TLS config in the transport.
tr := &http.Transport{}
hc := &http.Client{Transport: tr}
nc := &NATSCollector{
httpClient: hc,
system: system,
endpoint: endpoint,
}

Expand All @@ -283,7 +286,29 @@ func NewCollector(endpoint string, servers []*CollectedServer, prefix string) pr
}
}

nc.initMetricsFromServers(namespace)
nc.initMetricsFromServers(system)

return nc
}

func getSystem(system, prefix string) string {
if prefix == "" {
return system
}
return prefix
}

// NewCollector creates a new NATS Collector from a list of monitoring URLs.
// Each URL should be to a specific endpoint (e.g. varz, connz, subsz, or routez)
func NewCollector(system, endpoint, prefix string, servers []*CollectedServer) prometheus.Collector {
if isStreamingEndpoint(system, endpoint) {
return newStreamingCollector(getSystem(system, prefix), endpoint, servers)
}
if isConnzEndpoint(system, endpoint) {
return newConnzCollector(getSystem(system, prefix), endpoint, servers)
}
if isReplicatorEndpoint(system, endpoint) {
return newReplicatorCollector(getSystem(system, prefix), "varz", servers)
}
return newNatsCollector(getSystem(system, prefix), endpoint, servers)
}
85 changes: 60 additions & 25 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func parseDesc(desc string) string {
return strings.Split(desc, "\"")[1]
}

func verifyCollector(url string, endpoint string, cases map[string]float64, t *testing.T) {
func verifyCollector(system, url string, endpoint string, cases map[string]float64, t *testing.T) {
// create a new collector.
servers := make([]*CollectedServer, 1)
servers[0] = &CollectedServer{
ID: "id",
URL: url,
}
coll := NewCollector(endpoint, servers, "test")
coll := NewCollector(system, endpoint, "", servers)

// now collect the metrics
c := make(chan prometheus.Metric)
Expand Down Expand Up @@ -73,7 +73,7 @@ func verifyStreamingCollector(url string, endpoint string, prefix string, cases
ID: "id",
URL: url,
}
coll := NewCollector(endpoint, servers, prefix)
coll := NewCollector(StreamingSystem, endpoint, "", servers)

// now collect the metrics
c := make(chan prometheus.Metric)
Expand Down Expand Up @@ -103,7 +103,7 @@ func verifyStreamingCollector(url string, endpoint string, prefix string, cases

// To account for the metrics that share the same descriptor but differ in their variable label values,
// return a list of lists of label pairs for each of the supplied metric names.
func getLabelValues(url string, endpoint string, metricNames []string) (map[string][]map[string]string, error) {
func getLabelValues(system, url, endpoint string, metricNames []string) (map[string][]map[string]string, error) {
labelValues := make(map[string][]map[string]string)
namesMap := make(map[string]bool)
for _, metricName := range metricNames {
Expand Down Expand Up @@ -151,7 +151,7 @@ func getLabelValues(url string, endpoint string, metricNames []string) (map[stri
ID: "id",
URL: url,
}
coll := NewCollector(endpoint, servers, "test")
coll := NewCollector(system, endpoint, "", servers)
coll.Collect(metrics)
close(metrics)

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestVarz(t *testing.T) {
"gnatsd_varz_subscriptions": 1,
}

verifyCollector(url, "varz", cases, t)
verifyCollector(CoreSystem, url, "varz", cases, t)
}

func TestConnz(t *testing.T) {
Expand All @@ -214,7 +214,7 @@ func TestConnz(t *testing.T) {
"gnatsd_varz_connections": 0,
}

verifyCollector(url, "connz", cases, t)
verifyCollector(CoreSystem, url, "connz", cases, t)

// Test with connections.

Expand All @@ -226,7 +226,7 @@ func TestConnz(t *testing.T) {
nc := pet.CreateClientConnSubscribeAndPublish(t)
defer nc.Close()

verifyCollector(url, "connz", cases, t)
verifyCollector(CoreSystem, url, "connz", cases, t)
}

func TestNoServer(t *testing.T) {
Expand All @@ -237,7 +237,7 @@ func TestNoServer(t *testing.T) {
"gnatsd_varz_connections": 0,
}

verifyCollector(url, "varz", cases, t)
verifyCollector(CoreSystem, url, "varz", cases, t)
}

func TestRegister(t *testing.T) {
Expand All @@ -248,10 +248,10 @@ func TestRegister(t *testing.T) {
// check duplicates do not panic
servers = append(servers, cs)

NewCollector("varz", servers, "test")
NewCollector("test", "varz", "", servers)

// test idenpotency.
nc := NewCollector("varz", servers, "test")
nc := NewCollector("test", "varz", "", servers)

// test without a server (no error).
if err := prometheus.Register(nc); err == nil {
Expand All @@ -264,14 +264,14 @@ func TestRegister(t *testing.T) {
defer s.Shutdown()

// test collect with a server
nc = NewCollector("varz", servers, "test")
nc = NewCollector("test", "varz", "", servers)
if err := prometheus.Register(nc); err != nil {
t.Fatalf("Got unexpected error: %v", err)
}
prometheus.Unregister(nc)

// test collect with an invalid endpoint
nc = NewCollector("GARBAGE", servers, "test")
nc = NewCollector("test", "GARBAGE", "", servers)
if err := prometheus.Register(nc); err == nil {
t.Fatalf("Did not get expected error.")
defer prometheus.Unregister(nc)
Expand All @@ -292,22 +292,22 @@ func TestAllEndpoints(t *testing.T) {
cases := map[string]float64{
"gnatsd_varz_connections": 1,
}
verifyCollector(url, "varz", cases, t)
verifyCollector(CoreSystem, url, "varz", cases, t)

cases = map[string]float64{
"gnatsd_routez_num_routes": 0,
}
verifyCollector(url, "routez", cases, t)
verifyCollector(CoreSystem, url, "routez", cases, t)

cases = map[string]float64{
"gnatsd_subsz_num_subscriptions": 1,
}
verifyCollector(url, "subsz", cases, t)
verifyCollector(CoreSystem, url, "subsz", cases, t)

cases = map[string]float64{
"gnatsd_connz_total_connections": 1,
}
verifyCollector(url, "connz", cases, t)
verifyCollector(CoreSystem, url, "connz", cases, t)
}

const (
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestStreamingVarz(t *testing.T) {
"gnatsd_varz_subscriptions": 14,
}

verifyCollector(url, "varz", cases, t)
verifyCollector(CoreSystem, url, "varz", cases, t)
}

func TestStreamingMetrics(t *testing.T) {
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestStreamingMetrics(t *testing.T) {
"test_chan_subs_max_inflight": 1024,
}

verifyCollector(url, "channelsz", cases, t)
verifyCollector(StreamingSystem, url, "channelsz", cases, t)

cases = map[string]float64{
"test_server_bytes_total": 0,
Expand All @@ -400,7 +400,7 @@ func TestStreamingMetrics(t *testing.T) {
"test_server_active": 0,
}

verifyCollector(url, "serverz", cases, t)
verifyCollector(StreamingSystem, url, "serverz", cases, t)
}

func TestStreamingMetricsCustomPrefix(t *testing.T) {
Expand Down Expand Up @@ -459,8 +459,8 @@ func TestStreamingServerInfoMetricLabels(t *testing.T) {

url := fmt.Sprintf("http://localhost:%d/", pet.MonitorPort)

serverInfoMetric := "test_server_info"
labelValues, err := getLabelValues(url, "serverz", []string{serverInfoMetric})
serverInfoMetric := "nss_server_info"
labelValues, err := getLabelValues(StreamingSystem, url, "serverz", []string{serverInfoMetric})
if err != nil {
t.Fatalf("Unexpected error getting labels for nss_server_info metric: %v", err)
}
Expand Down Expand Up @@ -537,9 +537,9 @@ func TestStreamingSubscriptionsMetricLabels(t *testing.T) {

url := fmt.Sprintf("http://localhost:%d/", pet.MonitorPort)

streamingSunscriptionMetrics := []string{"test_chan_subs_last_sent",
"test_chan_subs_pending_count", "test_chan_subs_max_inflight"}
labelValues, err := getLabelValues(url, "channelsz", streamingSunscriptionMetrics)
streamingSunscriptionMetrics := []string{"nss_chan_subs_last_sent",
"nss_chan_subs_pending_count", "nss_chan_subs_max_inflight"}
labelValues, err := getLabelValues(StreamingSystem, url, "channelsz", streamingSunscriptionMetrics)
if err != nil {
t.Fatalf("Unexpected error getting labels for nss_server_info metric: %v", err)
}
Expand Down Expand Up @@ -587,3 +587,38 @@ func TestStreamingSubscriptionsMetricLabels(t *testing.T) {
}
}
}

func TestReplicatorMetrics(t *testing.T) {
s1 := pet.RunServerWithPorts(pet.ClientPort, pet.MonitorPort)
defer s1.Shutdown()

s2 := pet.RunServerWithPorts(pet.ClientPort+1, pet.MonitorPort+1)
defer s2.Shutdown()

// Just test with NATS for this, getting protobuf errors with multiple
// streaming servers in the same process.
r, err := pet.RunTestReplicator(9922, pet.ClientPort, pet.ClientPort+1)
if err != nil {
t.Fatalf("couldn't start replicator, %s", err)
}
defer r.Stop()

cases := map[string]float64{
"replicator_connector_bytes_in": 0,
"replicator_connector_bytes_out": 0,
"replicator_connector_connected": 1,
"replicator_connector_connects": 1,
"replicator_connector_disconnects": 0,
"replicator_connector_messages_in": 0,
"replicator_connector_messages_out": 0,
"replicator_connector_moving_average": 0,
"replicator_connector_quintile_50": -1,
"replicator_connector_quintile_75": -1,
"replicator_connector_quintile_90": -1,
"replicator_connector_quintile_95": -1,
"replicator_connector_request_count": 0,
}

url := "http://127.0.0.1:9922"
verifyCollector(ReplicatorSystem, url, "varz", cases, t)
}

0 comments on commit 21f0e17

Please sign in to comment.