Skip to content

Commit

Permalink
refactor(orc8r): Split Metricsd to protected and southbound
Browse files Browse the repository at this point in the history
Signed-off-by: Christine Wang <christinewang@fb.com>
  • Loading branch information
christinewang5 committed Mar 9, 2022
1 parent d783c82 commit ed69bf6
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 0 deletions.
119 changes: 119 additions & 0 deletions orc8r/cloud/go/services/metricsd/servicers/protected/servicer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package protected

import (
"context"
"time"

"github.com/golang/glog"
prom_proto "github.com/prometheus/client_model/go"

"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/exporters"
servicers "magma/orc8r/cloud/go/services/metricsd/servicers/southbound"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

// CloudMetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
type CloudMetricsControllerServer struct {
exporters []exporters.Exporter
}

func NewCloudMetricsControllerServer() *CloudMetricsControllerServer {
return &CloudMetricsControllerServer{}
}

func (srv *CloudMetricsControllerServer) Push(ctx context.Context, in *protos.PushedMetricsContainer) (*protos.Void, error) {
if in.Metrics == nil || len(in.Metrics) == 0 {
return new(protos.Void), nil
}

metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
return &protos.Void{}, err
}
for _, e := range metricsExporters {
metricsToSubmit := pushedMetricsToMetricsAndContext(in)
err := e.Submit(metricsToSubmit)
if err != nil {
glog.Error(err)
}
}
return new(protos.Void), nil
}

// ConsumeCloudMetrics pulls metrics off the given input channel and sends
// them to all exporters after some preprocessing.
// Returns only when inputChan closed, which should never happen.
func (srv *CloudMetricsControllerServer) ConsumeCloudMetrics(inputChan chan *prom_proto.MetricFamily, hostName string) {
for family := range inputChan {
metricsToSubmit := preprocessCloudMetrics(family, hostName)
metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
glog.Error(err)
continue
}
for _, e := range metricsExporters {
err := e.Submit([]exporters.MetricAndContext{metricsToSubmit})
if err != nil {
glog.Error(err)
}
}
}
glog.Error("Consume cloud metrics channel unexpectedly closed")
}

func preprocessCloudMetrics(family *prom_proto.MetricFamily, hostName string) exporters.MetricAndContext {
ctx := exporters.MetricContext{
MetricName: protos.GetDecodedName(family),
AdditionalContext: &exporters.CloudMetricContext{
CloudHost: hostName,
},
}
for _, metric := range family.Metric {
metric.Label = protos.GetDecodedLabel(metric)
servicers.AddLabel(metric, metrics.CloudHostLabelName, hostName)
}
return exporters.MetricAndContext{Family: family, Context: ctx}
}

func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Metrics))
for _, metric := range in.Metrics {
ctx := exporters.MetricContext{
MetricName: metric.MetricName,
AdditionalContext: &exporters.PushedMetricContext{
NetworkID: in.NetworkId,
},
}

ts := metric.TimestampMS
if ts == 0 {
ts = time.Now().Unix() * 1000
}

prometheusLabels := make([]*prom_proto.LabelPair, 0, len(metric.Labels))
for _, label := range metric.Labels {
prometheusLabels = append(prometheusLabels, &prom_proto.LabelPair{Name: &label.Name, Value: &label.Value})
}
promoMetric := &prom_proto.Metric{
Label: prometheusLabels,
Gauge: &prom_proto.Gauge{
Value: &metric.Value,
},
TimestampMs: &ts,
}
servicers.AddLabel(promoMetric, metrics.NetworkLabelName, in.NetworkId)

gaugeType := prom_proto.MetricType_GAUGE
fam := &prom_proto.MetricFamily{
Name: &metric.MetricName,
Type: &gaugeType,
Metric: []*prom_proto.Metric{promoMetric},
}
ret = append(ret, exporters.MetricAndContext{Family: fam, Context: ctx})
}
return ret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package protected

import (
"testing"

prometheusProto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"

"magma/orc8r/cloud/go/services/metricsd/exporters"
tests "magma/orc8r/cloud/go/services/metricsd/test_common"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

var (
testLabels = []*prometheusProto.LabelPair{{Name: tests.MakeStrPtr("labelName"), Value: tests.MakeStrPtr("labelValue")}}
)

func TestPushedMetricsToMetricsAndContext(t *testing.T) {
container := protos.PushedMetricsContainer{
NetworkId: "testNetwork",
Metrics: []*protos.PushedMetric{{
MetricName: "metricA",
Value: 10,
TimestampMS: 1234,
Labels: []*protos.LabelPair{{Name: "labelName", Value: "labelValue"}},
},
},
}

metricAndContext := pushedMetricsToMetricsAndContext(&container)

assert.Equal(t, 1, len(metricAndContext))
ctx := metricAndContext[0].Context
family := metricAndContext[0].Family
assert.NotNil(t, ctx.AdditionalContext)
assert.Equal(t, "testNetwork", ctx.AdditionalContext.(*exporters.PushedMetricContext).NetworkID)

labels := family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, metrics.NetworkLabelName, "testNetwork"))
assert.True(t, tests.HasLabel(labels, "labelName", "labelValue"))
}

func TestPreprocessCloudMetrics(t *testing.T) {
testFamily := tests.MakeTestMetricFamily(prometheusProto.MetricType_GAUGE, 1, testLabels)
metricAndContext := preprocessCloudMetrics(testFamily, "hostA")

assert.NotNil(t, metricAndContext.Context.AdditionalContext)
assert.Equal(t, "hostA", metricAndContext.Context.AdditionalContext.(*exporters.CloudMetricContext).CloudHost)

labels := metricAndContext.Family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, "cloudHost", "hostA"))
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}
112 changes: 112 additions & 0 deletions orc8r/cloud/go/services/metricsd/servicers/southbound/servicer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package southbound

import (
"context"

"github.com/golang/glog"
"github.com/pkg/errors"
prom_proto "github.com/prometheus/client_model/go"

"magma/orc8r/cloud/go/serdes"
"magma/orc8r/cloud/go/services/configurator"
"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/exporters"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

// MetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
type MetricsControllerServer struct {
exporters []exporters.Exporter
}

func NewMetricsControllerServer() *MetricsControllerServer {
return &MetricsControllerServer{}
}

func (srv *MetricsControllerServer) Collect(ctx context.Context, in *protos.MetricsContainer) (*protos.Void, error) {
if in.Family == nil || len(in.Family) == 0 {
return new(protos.Void), nil
}

hardwareID := in.GetGatewayId()
checkID, err := protos.GetGatewayIdentity(ctx)
if err != nil {
return new(protos.Void), err
}
if len(checkID.HardwareId) > 0 && checkID.HardwareId != hardwareID {
glog.Errorf("Expected %s, but found %s as Hardware ID", checkID.HardwareId, hardwareID)
hardwareID = checkID.HardwareId
}
networkID, gatewayID, err := getNetworkAndEntityIDForPhysicalID(ctx, hardwareID)
if err != nil {
return new(protos.Void), err
}
glog.V(2).Infof("collecting %v metrics from gateway %v\n", len(in.Family), in.GatewayId)

metricsToSubmit := metricsContainerToMetricAndContexts(in, networkID, gatewayID)
metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
return &protos.Void{}, err
}
for _, e := range metricsExporters {
err := e.Submit(metricsToSubmit)
if err != nil {
glog.Error(err)
}
}
return new(protos.Void), nil
}
func getNetworkAndEntityIDForPhysicalID(ctx context.Context, physicalID string) (string, string, error) {
if len(physicalID) == 0 {
return "", "", errors.New("Empty Hardware ID")
}
entity, err := configurator.LoadEntityForPhysicalID(ctx, physicalID, configurator.EntityLoadCriteria{}, serdes.Entity)
if err != nil {
return "", "", err
}
return entity.NetworkID, entity.Key, nil
}
func metricsContainerToMetricAndContexts(
in *protos.MetricsContainer,
networkID, gatewayID string,
) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Family))
for _, fam := range in.Family {
ctx := exporters.MetricContext{
MetricName: protos.GetDecodedName(fam),
AdditionalContext: &exporters.GatewayMetricContext{
NetworkID: networkID,
GatewayID: gatewayID,
},
}
for _, metric := range fam.Metric {
metric.Label = protos.GetDecodedLabel(metric)
AddLabel(metric, metrics.NetworkLabelName, networkID)
AddLabel(metric, metrics.GatewayLabelName, gatewayID)
}
ret = append(ret, exporters.MetricAndContext{Family: fam, Context: ctx})
}
return ret
}

// AddLabel ensures that the desired name-value pairing is present in the
// metric's labels.
func AddLabel(metric *prom_proto.Metric, labelName, labelValue string) {
labelAdded := false
for _, label := range metric.Label {
if label.GetName() == labelName {
label.Value = &labelValue
labelAdded = true
}
}
if !labelAdded {
metric.Label = append(metric.Label, &prom_proto.LabelPair{Name: strPtr(labelName), Value: &labelValue})
}
}

func strPtr(s string) *string {
return &s
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package southbound

import (
"testing"

prometheusProto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"

"magma/orc8r/cloud/go/services/metricsd/exporters"
tests "magma/orc8r/cloud/go/services/metricsd/test_common"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

var (
testLabels = []*prometheusProto.LabelPair{{Name: tests.MakeStrPtr("labelName"), Value: tests.MakeStrPtr("labelValue")}}
)

func TestMetricsContainerToMetricAndContexts(t *testing.T) {
testFamily := tests.MakeTestMetricFamily(prometheusProto.MetricType_GAUGE, 1, testLabels)
container := protos.MetricsContainer{
GatewayId: "gw1",
Family: []*prometheusProto.MetricFamily{testFamily},
}

metricAndContext := metricsContainerToMetricAndContexts(&container, "testNetwork", "gw1")

assert.Equal(t, 1, len(metricAndContext))
ctx := metricAndContext[0].Context
family := metricAndContext[0].Family
assert.NotNil(t, ctx.AdditionalContext)
assert.Equal(t, "gw1", ctx.AdditionalContext.(*exporters.GatewayMetricContext).GatewayID)
assert.Equal(t, "testNetwork", ctx.AdditionalContext.(*exporters.GatewayMetricContext).NetworkID)

labels := family.GetMetric()[0].Label
assert.Equal(t, 3, len(labels))
assert.True(t, tests.HasLabel(labels, metrics.NetworkLabelName, "testNetwork"))
assert.True(t, tests.HasLabel(labels, metrics.GatewayLabelName, "gw1"))
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}

0 comments on commit ed69bf6

Please sign in to comment.