Skip to content

Commit

Permalink
feat: connz: monitor pending bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
caarlos0 committed May 17, 2019
1 parent 15da6ff commit 7de7814
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 10 deletions.
3 changes: 3 additions & 0 deletions collector/collector.go
Expand Up @@ -208,6 +208,9 @@ func NewCollector(endpoint string, servers []*CollectedServer) prometheus.Collec
if isStreamingEndpoint(endpoint) {
return newStreamingCollector(endpoint, servers)
}
if isConnzEndpoint(endpoint) {
return newConnzCollector(servers)
}

// TODO: Potentially add TLS config in the transport.
tr := &http.Transport{}
Expand Down
4 changes: 3 additions & 1 deletion collector/collector_test.go
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
"time"

"github.com/nats-io/go-nats-streaming"
stan "github.com/nats-io/go-nats-streaming"
pet "github.com/nats-io/prometheus-nats-exporter/test"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -164,6 +164,7 @@ func TestConnz(t *testing.T) {

cases := map[string]float64{
"gnatsd_connz_total_connections": 0,
"gnatsd_connz_pending_bytes": 0,
"gnatsd_varz_connections": 0,
}

Expand All @@ -173,6 +174,7 @@ func TestConnz(t *testing.T) {

cases = map[string]float64{
"gnatsd_connz_total_connections": 1,
"gnatsd_connz_pending_bytes": 0,
"gnatsd_varz_connections": 1,
}
nc := pet.CreateClientConnSubscribeAndPublish(t)
Expand Down
108 changes: 108 additions & 0 deletions collector/connz.go
@@ -0,0 +1,108 @@
package collector

import (
"net/http"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

func isConnzEndpoint(endpoint string) bool {
return endpoint == "connz"
}

type connzCollector struct {
sync.Mutex

httpClient *http.Client
servers []*CollectedServer

numConnections *prometheus.Desc
total *prometheus.Desc
offset *prometheus.Desc
limit *prometheus.Desc
pendingBytes *prometheus.Desc
}

func newConnzCollector(servers []*CollectedServer) prometheus.Collector {
nc := &connzCollector{
httpClient: http.DefaultClient,
numConnections: prometheus.NewDesc(
prometheus.BuildFQName("gnatsd", "connz", "num_connections"),
"num_connections",
[]string{"server_id"},
nil,
),
offset: prometheus.NewDesc(
prometheus.BuildFQName("gnatsd", "connz", "offset"),
"offset",
[]string{"server_id"},
nil,
),
total: prometheus.NewDesc(
prometheus.BuildFQName("gnatsd", "connz", "total"),
"total",
[]string{"server_id"},
nil,
),
limit: prometheus.NewDesc(
prometheus.BuildFQName("gnatsd", "connz", "limit"),
"limit",
[]string{"server_id"},
nil,
),
pendingBytes: prometheus.NewDesc(
prometheus.BuildFQName("gnatsd", "connz", "pending_bytes"),
"pending_bytes",
[]string{"server_id"},
nil,
),
}

nc.servers = make([]*CollectedServer, len(servers))
for i, s := range servers {
nc.servers[i] = &CollectedServer{
ID: s.ID,
URL: s.URL + "/connz",
}
}

return nc
}

func (nc *connzCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- nc.limit
}

// Collect gathers the streaming server serverz metrics.
func (nc *connzCollector) Collect(ch chan<- prometheus.Metric) {
for _, server := range nc.servers {
var resp Connz
if err := getMetricURL(nc.httpClient, server.URL, &resp); err != nil {
Debugf("ignoring server %s: %v", server.ID, err)
continue
}

var pendingBytes = 0
for _, conn := range resp.Connections {
pendingBytes += conn.PendingBytes
}

ch <- prometheus.MustNewConstMetric(nc.numConnections, prometheus.GaugeValue, float64(resp.NumConnections), server.ID)
ch <- prometheus.MustNewConstMetric(nc.total, prometheus.GaugeValue, float64(resp.Total), server.ID)
ch <- prometheus.MustNewConstMetric(nc.offset, prometheus.GaugeValue, float64(resp.Offset), server.ID)
ch <- prometheus.MustNewConstMetric(nc.limit, prometheus.GaugeValue, float64(resp.Limit), server.ID)
ch <- prometheus.MustNewConstMetric(nc.pendingBytes, prometheus.GaugeValue, float64(pendingBytes), server.ID)
}
}

// Connz output
type Connz struct {
NumConnections int `json:"num_connections"`
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
Connections []struct {
PendingBytes int `json:"pending_bytes"`
} `json:"connections"`
}
16 changes: 7 additions & 9 deletions collector/streaming.go
Expand Up @@ -45,37 +45,37 @@ func newServerzCollector(servers []*CollectedServer) prometheus.Collector {
bytesTotal: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "bytes_total"),
"Total of bytes",
[]string{"server"},
[]string{"server_id"},
nil,
),
msgsTotal: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "msgs_total"),
"Total of messages",
[]string{"server"},
[]string{"server_id"},
nil,
),
channels: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "channels"),
"Total channels",
[]string{"server"},
[]string{"server_id"},
nil,
),
subs: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "subscriptions"),
"Total subscriptions",
[]string{"server"},
[]string{"server_id"},
nil,
),
clients: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "clients"),
"Total clients",
[]string{"server"},
[]string{"server_id"},
nil,
),
info: prometheus.NewDesc(
prometheus.BuildFQName("nss", "server", "info"),
"Info",
[]string{"server", "cluster_id", "server_id", "version", "go_version", "state", "role", "start_time"},
[]string{"server_id", "cluster_id", "version", "go_version", "state", "role", "start_time"},
nil,
),
}
Expand Down Expand Up @@ -130,8 +130,7 @@ func (nc *serverzCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(nc.channels, prometheus.CounterValue, float64(resp.Channels), server.ID)
ch <- prometheus.MustNewConstMetric(nc.subs, prometheus.CounterValue, float64(resp.Subscriptions), server.ID)
ch <- prometheus.MustNewConstMetric(nc.clients, prometheus.CounterValue, float64(resp.Clients), server.ID)
ch <- prometheus.MustNewConstMetric(nc.info, prometheus.GaugeValue, 1, server.ID, resp.ClusterID, resp.ServerID,
resp.Version, resp.GoVersion, resp.State, resp.Role, resp.StartTime)
ch <- prometheus.MustNewConstMetric(nc.info, prometheus.GaugeValue, 1, server.ID, resp.ClusterID, resp.Version, resp.GoVersion, resp.State, resp.Role, resp.StartTime)
}
}

Expand Down Expand Up @@ -240,7 +239,6 @@ func (nc *channelsCollector) Collect(ch chan<- prometheus.Metric) {
// Channelsz lists the name of all NATS Streaming Channelsz
type Channelsz struct {
ClusterID string `json:"cluster_id"`
ServerID string `json:"server_id"`
Now time.Time `json:"now"`
Offset int `json:"offset"`
Limit int `json:"limit"`
Expand Down

0 comments on commit 7de7814

Please sign in to comment.