Skip to content

Commit

Permalink
refactor(orc8r): Split Metricsd to protected and southbound
Browse files Browse the repository at this point in the history
Signed-off-by: Christine Wang <christinewang@fb.com>
  • Loading branch information
christinewang5 committed Mar 11, 2022
1 parent 8d9987f commit 885e43f
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 266 deletions.
119 changes: 119 additions & 0 deletions orc8r/cloud/go/services/metricsd/servicers/protected/servicer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package protected

import (
"context"
"time"

"github.com/golang/glog"
prom_proto "github.com/prometheus/client_model/go"

"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/exporters"
servicers "magma/orc8r/cloud/go/services/metricsd/servicers/southbound"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

// CloudMetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
type CloudMetricsControllerServer struct {
exporters []exporters.Exporter
}

func NewCloudMetricsControllerServer() *CloudMetricsControllerServer {
return &CloudMetricsControllerServer{}
}

func (srv *CloudMetricsControllerServer) Push(ctx context.Context, in *protos.PushedMetricsContainer) (*protos.Void, error) {
if in.Metrics == nil || len(in.Metrics) == 0 {
return new(protos.Void), nil
}

metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
return &protos.Void{}, err
}
for _, e := range metricsExporters {
metricsToSubmit := pushedMetricsToMetricsAndContext(in)
err := e.Submit(metricsToSubmit)
if err != nil {
glog.Error(err)
}
}
return new(protos.Void), nil
}

// ConsumeCloudMetrics pulls metrics off the given input channel and sends
// them to all exporters after some preprocessing.
// Returns only when inputChan closed, which should never happen.
func (srv *CloudMetricsControllerServer) ConsumeCloudMetrics(inputChan chan *prom_proto.MetricFamily, hostName string) {
for family := range inputChan {
metricsToSubmit := preprocessCloudMetrics(family, hostName)
metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
glog.Error(err)
continue
}
for _, e := range metricsExporters {
err := e.Submit([]exporters.MetricAndContext{metricsToSubmit})
if err != nil {
glog.Error(err)
}
}
}
glog.Error("Consume cloud metrics channel unexpectedly closed")
}

func preprocessCloudMetrics(family *prom_proto.MetricFamily, hostName string) exporters.MetricAndContext {
ctx := exporters.MetricContext{
MetricName: protos.GetDecodedName(family),
AdditionalContext: &exporters.CloudMetricContext{
CloudHost: hostName,
},
}
for _, metric := range family.Metric {
metric.Label = protos.GetDecodedLabel(metric)
servicers.AddLabel(metric, metrics.CloudHostLabelName, hostName)
}
return exporters.MetricAndContext{Family: family, Context: ctx}
}

func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Metrics))
for _, metric := range in.Metrics {
ctx := exporters.MetricContext{
MetricName: metric.MetricName,
AdditionalContext: &exporters.PushedMetricContext{
NetworkID: in.NetworkId,
},
}

ts := metric.TimestampMS
if ts == 0 {
ts = time.Now().Unix() * 1000
}

prometheusLabels := make([]*prom_proto.LabelPair, 0, len(metric.Labels))
for _, label := range metric.Labels {
prometheusLabels = append(prometheusLabels, &prom_proto.LabelPair{Name: &label.Name, Value: &label.Value})
}
promoMetric := &prom_proto.Metric{
Label: prometheusLabels,
Gauge: &prom_proto.Gauge{
Value: &metric.Value,
},
TimestampMs: &ts,
}
servicers.AddLabel(promoMetric, metrics.NetworkLabelName, in.NetworkId)

gaugeType := prom_proto.MetricType_GAUGE
fam := &prom_proto.MetricFamily{
Name: &metric.MetricName,
Type: &gaugeType,
Metric: []*prom_proto.Metric{promoMetric},
}
ret = append(ret, exporters.MetricAndContext{Family: fam, Context: ctx})
}
return ret
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package servicers
package protected

import (
"testing"
Expand Down Expand Up @@ -29,29 +29,6 @@ func TestPreprocessCloudMetrics(t *testing.T) {
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}

func TestMetricsContainerToMetricAndContexts(t *testing.T) {
testFamily := tests.MakeTestMetricFamily(prometheusProto.MetricType_GAUGE, 1, testLabels)
container := protos.MetricsContainer{
GatewayId: "gw1",
Family: []*prometheusProto.MetricFamily{testFamily},
}

metricAndContext := metricsContainerToMetricAndContexts(&container, "testNetwork", "gw1")

assert.Equal(t, 1, len(metricAndContext))
ctx := metricAndContext[0].Context
family := metricAndContext[0].Family
assert.NotNil(t, ctx.AdditionalContext)
assert.Equal(t, "gw1", ctx.AdditionalContext.(*exporters.GatewayMetricContext).GatewayID)
assert.Equal(t, "testNetwork", ctx.AdditionalContext.(*exporters.GatewayMetricContext).NetworkID)

labels := family.GetMetric()[0].Label
assert.Equal(t, 3, len(labels))
assert.True(t, tests.HasLabel(labels, metrics.NetworkLabelName, "testNetwork"))
assert.True(t, tests.HasLabel(labels, metrics.GatewayLabelName, "gw1"))
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}

func TestPushedMetricsToMetricsAndContext(t *testing.T) {
container := protos.PushedMetricsContainer{
NetworkId: "testNetwork",
Expand Down
237 changes: 0 additions & 237 deletions orc8r/cloud/go/services/metricsd/servicers/servicer.go

This file was deleted.

0 comments on commit 885e43f

Please sign in to comment.