Skip to content

Commit

Permalink
feat: Add remotes and replications to telemetry (#23456)
Browse files Browse the repository at this point in the history
* feat: start work on remotes/replications phone home data

* feat: add remotes/replications phone home data (no tests

* refactor: use erroring binary conversions

* style: gofmt

* refactor: improve some error handling

* style: cleanup

* feat: add tests

* refactor: just list remotes/replications rather than decrement

* chore: linting fix

Co-authored-by: DStrand1 <dstrandboge@influxdata.com>
  • Loading branch information
jeffreyssmith2nd and DStrand1 committed Jun 16, 2022
1 parent 8bd4fc5 commit 090f681
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -32,6 +32,7 @@ test.key
/fluxd
/transpilerd
/bin
/internal/cmd/kvmigrate/kvmigrate

# Project tools that you might install with go build.
/editorconfig-checker
Expand Down
2 changes: 2 additions & 0 deletions bolt/bbolt.go
Expand Up @@ -102,6 +102,8 @@ func (c *Client) initialize(ctx context.Context) error {
scraperBucket,
telegrafBucket,
telegrafPluginsBucket,
remoteBucket,
replicationBucket,
userBucket,
}
for _, bktName := range bkts {
Expand Down
29 changes: 29 additions & 0 deletions bolt/metrics.go
Expand Up @@ -21,6 +21,8 @@ var (
scraperBucket = []byte("scraperv2")
telegrafBucket = []byte("telegrafv1")
telegrafPluginsBucket = []byte("telegrafPluginsv1")
remoteBucket = []byte("remotesv2")
replicationBucket = []byte("replicationsv2")
userBucket = []byte("usersv1")
)

Expand Down Expand Up @@ -65,6 +67,16 @@ var (
"Number of individual telegraf plugins configured",
[]string{"plugin"}, nil)

remoteDesc = prometheus.NewDesc(
"influxdb_remotes_total",
"Number of total remote connections configured on the server",
nil, nil)

replicationDesc = prometheus.NewDesc(
"influxdb_replications_total",
"Number of total replication configurations on the server",
nil, nil)

boltWritesDesc = prometheus.NewDesc(
"boltdb_writes_total",
"Total number of boltdb writes",
Expand All @@ -85,6 +97,8 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- dashboardsDesc
ch <- scrapersDesc
ch <- telegrafsDesc
ch <- remoteDesc
ch <- replicationDesc
ch <- boltWritesDesc
ch <- boltReadsDesc

Expand Down Expand Up @@ -209,12 +223,15 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {

orgs, buckets, users, tokens := 0, 0, 0, 0
dashboards, scrapers, telegrafs := 0, 0, 0
remotes, replications := 0, 0
_ = c.db.View(func(tx *bolt.Tx) error {
buckets = tx.Bucket(bucketBucket).Stats().KeyN
dashboards = tx.Bucket(dashboardBucket).Stats().KeyN
orgs = tx.Bucket(organizationBucket).Stats().KeyN
scrapers = tx.Bucket(scraperBucket).Stats().KeyN
telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN
remotes = tx.Bucket(remoteBucket).Stats().KeyN
replications = tx.Bucket(replicationBucket).Stats().KeyN
tokens = tx.Bucket(authorizationBucket).Stats().KeyN
users = tx.Bucket(userBucket).Stats().KeyN
return nil
Expand Down Expand Up @@ -262,5 +279,17 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
float64(telegrafs),
)

ch <- prometheus.MustNewConstMetric(
remoteDesc,
prometheus.CounterValue,
float64(remotes),
)

ch <- prometheus.MustNewConstMetric(
replicationDesc,
prometheus.CounterValue,
float64(replications),
)

c.pluginsCollector.Collect(ch)
}
2 changes: 2 additions & 0 deletions bolt/metrics_test.go
Expand Up @@ -38,6 +38,8 @@ func TestInitialMetrics(t *testing.T) {
"influxdb_users_total": 0,
"influxdb_tokens_total": 0,
"influxdb_dashboards_total": 0,
"influxdb_remotes_total": 0,
"influxdb_replications_total": 0,
"boltdb_reads_total": 0,
}
for name, count := range metrics {
Expand Down
4 changes: 2 additions & 2 deletions cmd/influxd/launcher/launcher.go
Expand Up @@ -363,11 +363,11 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {

remotesSvc := remotes.NewService(m.sqlStore)
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
m.log.With(zap.String("handler", "remotes")), m.reg, m.kvStore, remotesSvc)

replicationSvc, replicationsMetrics := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath, opts.InstanceID)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
m.log.With(zap.String("handler", "replications")), m.reg, m.kvStore, replicationSvc)
ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)

Expand Down
14 changes: 14 additions & 0 deletions kv/migration/all/0020_add_remotes_replications_metrics_buckets.go
@@ -0,0 +1,14 @@
package all

import "github.com/influxdata/influxdb/v2/kv/migration"

var (
remoteMetricsBucket = []byte("remotesv2")
replicationsMetricsBucket = []byte("replicationsv2")
)

var Migration0020_Add_remotes_replications_metrics_buckets = migration.CreateBuckets(
"create remotes and replications metrics buckets",
remoteMetricsBucket,
replicationsMetricsBucket,
)
2 changes: 2 additions & 0 deletions kv/migration/all/all.go
Expand Up @@ -45,5 +45,7 @@ var Migrations = [...]migration.Spec{
Migration0018_RepairMissingShardGroupDurations,
// add remotes and replications resource types to operator and all-access tokens
Migration0019_AddRemotesReplicationsToTokens,
// add_remotes_replications_metrics_buckets
Migration0020_Add_remotes_replications_metrics_buckets,
// {{ do_not_edit . }}
}
2 changes: 1 addition & 1 deletion prometheus/filter.go
Expand Up @@ -35,7 +35,7 @@ func NewMatcher() Matcher {
return Matcher{}
}

// Family helps constuct match by adding a metric family to match to.
// Family helps construct match by adding a metric family to match to.
func (m Matcher) Family(name string, lps ...*dto.LabelPair) Matcher {
// prometheus metrics labels are sorted by label name.
sort.Slice(lps, func(i, j int) bool {
Expand Down
6 changes: 5 additions & 1 deletion remotes/transport/http.go
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"net/http"

"github.com/influxdata/influxdb/v2/kv"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/influxdata/influxdb/v2"
Expand Down Expand Up @@ -56,7 +58,9 @@ type RemoteConnectionHandler struct {
remotesService RemoteConnectionService
}

func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, svc RemoteConnectionService) *RemoteConnectionHandler {
func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, kv kv.Store, svc RemoteConnectionService) *RemoteConnectionHandler {
// Collect telemetry.
svc = newTelemetryCollectingService(kv, svc)
// Collect metrics.
svc = newMetricCollectingService(reg, svc)
// Wrap logging.
Expand Down
99 changes: 99 additions & 0 deletions remotes/transport/middleware_kv.go
@@ -0,0 +1,99 @@
package transport

import (
"bytes"
"context"
"encoding/binary"
"fmt"

"github.com/influxdata/influxdb/v2/kit/platform"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)

var remotesBucket = []byte("remotesv2")

func newTelemetryCollectingService(kv kv.Store, underlying RemoteConnectionService) *telemetryService {
return &telemetryService{
kv: kv,
underlying: underlying,
}
}

type telemetryService struct {
kv kv.Store
underlying RemoteConnectionService
}

func (t telemetryService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (*influxdb.RemoteConnections, error) {
return t.underlying.ListRemoteConnections(ctx, filter)
}

func (t telemetryService) GetRemoteConnection(ctx context.Context, id platform.ID) (*influxdb.RemoteConnection, error) {
return t.underlying.GetRemoteConnection(ctx, id)
}

func (t telemetryService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
return t.underlying.UpdateRemoteConnection(ctx, id, request)
}

func (t telemetryService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
conn, err := t.underlying.CreateRemoteConnection(ctx, request)
if err != nil {
return conn, err
}
err = t.storeRemoteMetrics(ctx, request.OrgID)
return conn, err
}

func (t telemetryService) DeleteRemoteConnection(ctx context.Context, id platform.ID) error {
rc, err := t.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return err
}

err = t.underlying.DeleteRemoteConnection(ctx, id)
if err != nil {
return err
}

return t.storeRemoteMetrics(ctx, rc.OrgID)
}

func (t telemetryService) storeRemoteMetrics(ctx context.Context, orgID platform.ID) error {
if err := t.kv.Update(ctx, func(tx kv.Tx) error {
encodedID, err := orgID.Encode()
if err != nil {
return platform.ErrInvalidID
}
bucket, err := tx.Bucket(remotesBucket)
if err != nil {
return err
}
count, err := t.countRemotes(ctx, orgID)
if err != nil {
return err
}
return bucket.Put(encodedID, count)
}); err != nil {
return fmt.Errorf("updating telemetry failed: %v", err)
}

return nil
}

func (t telemetryService) countRemotes(ctx context.Context, orgID platform.ID) ([]byte, error) {
req := influxdb.RemoteConnectionListFilter{
OrgID: orgID,
}
list, err := t.underlying.ListRemoteConnections(ctx, req)
if err != nil {
return nil, err
}

b := make([]byte, 0, 8)
buf := bytes.NewBuffer(b)
err = binary.Write(buf, binary.BigEndian, int64(len(list.Remotes)))
return buf.Bytes(), err
}

0 comments on commit 090f681

Please sign in to comment.