Skip to content

Commit

Permalink
refactor(orc8r): Split Metricsd to protected and southbound
Browse files Browse the repository at this point in the history
Signed-off-by: Christine Wang <christinewang@fb.com>
  • Loading branch information
christinewang5 committed Mar 28, 2022
1 parent 8d9987f commit 2bfd484
Show file tree
Hide file tree
Showing 9 changed files with 464 additions and 468 deletions.
24 changes: 24 additions & 0 deletions orc8r/cloud/go/services/metricsd/exporters/remote_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,27 @@ func (r *remoteExporter) getExporterClient() (protos.MetricsExporterClient, erro
}
return protos.NewMetricsExporterClient(conn), nil
}

type TestMetricExporter struct {
Queue []Sample

// error to return
RetErr error
}

func (e *TestMetricExporter) Submit(metrics []MetricAndContext) error {
for _, metricAndContext := range metrics {
family := metricAndContext.Family
for _, metric := range family.GetMetric() {
convertedMetricAndContext := MakeProtoMetric(metricAndContext)
e.Queue = append(
e.Queue,
GetSamplesForMetrics(convertedMetricAndContext, metric)...,
)
}
}

return e.RetErr
}

func (e *TestMetricExporter) Start() {}
6 changes: 3 additions & 3 deletions orc8r/cloud/go/services/metricsd/metricsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package main

import (
"magma/orc8r/cloud/go/services/metricsd/servicers/protected"
"time"

"github.com/golang/glog"
Expand All @@ -28,7 +29,6 @@ import (
"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/collection"
"magma/orc8r/cloud/go/services/metricsd/obsidian/handlers"
"magma/orc8r/cloud/go/services/metricsd/servicers"
"magma/orc8r/lib/go/protos"
)

Expand All @@ -47,8 +47,8 @@ func main() {
glog.Fatalf("Error creating orc8r service for metricsd: %s", err)
}

controllerServicer := servicers.NewMetricsControllerServer()
protos.RegisterMetricsControllerServer(srv.GrpcServer, controllerServicer)
controllerServicer := protected.NewCloudMetricsControllerServer()
protos.RegisterCloudMetricsControllerServer(srv.GrpcServer, controllerServicer)

swagger_protos.RegisterSwaggerSpecServer(srv.GrpcServer, swagger_servicers.NewSpecServicerFromFile(metricsd.ServiceName))

Expand Down
119 changes: 119 additions & 0 deletions orc8r/cloud/go/services/metricsd/servicers/protected/servicer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package protected

import (
"context"
"time"

"github.com/golang/glog"
prom_proto "github.com/prometheus/client_model/go"

"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/exporters"
servicers "magma/orc8r/cloud/go/services/metricsd/servicers/southbound"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

// CloudMetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
type CloudMetricsControllerServer struct {
exporters []exporters.Exporter
}

func NewCloudMetricsControllerServer() *CloudMetricsControllerServer {
return &CloudMetricsControllerServer{}
}

func (srv *CloudMetricsControllerServer) Push(ctx context.Context, in *protos.PushedMetricsContainer) (*protos.Void, error) {
if in.Metrics == nil || len(in.Metrics) == 0 {
return new(protos.Void), nil
}

metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
return &protos.Void{}, err
}
for _, e := range metricsExporters {
metricsToSubmit := pushedMetricsToMetricsAndContext(in)
err := e.Submit(metricsToSubmit)
if err != nil {
glog.Error(err)
}
}
return new(protos.Void), nil
}

// ConsumeCloudMetrics pulls metrics off the given input channel and sends
// them to all exporters after some preprocessing.
// Returns only when inputChan closed, which should never happen.
func (srv *CloudMetricsControllerServer) ConsumeCloudMetrics(inputChan chan *prom_proto.MetricFamily, hostName string) {
for family := range inputChan {
metricsToSubmit := preprocessCloudMetrics(family, hostName)
metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
glog.Error(err)
continue
}
for _, e := range metricsExporters {
err := e.Submit([]exporters.MetricAndContext{metricsToSubmit})
if err != nil {
glog.Error(err)
}
}
}
glog.Error("Consume cloud metrics channel unexpectedly closed")
}

func preprocessCloudMetrics(family *prom_proto.MetricFamily, hostName string) exporters.MetricAndContext {
ctx := exporters.MetricContext{
MetricName: protos.GetDecodedName(family),
AdditionalContext: &exporters.CloudMetricContext{
CloudHost: hostName,
},
}
for _, metric := range family.Metric {
metric.Label = protos.GetDecodedLabel(metric)
servicers.AddLabel(metric, metrics.CloudHostLabelName, hostName)
}
return exporters.MetricAndContext{Family: family, Context: ctx}
}

func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Metrics))
for _, metric := range in.Metrics {
ctx := exporters.MetricContext{
MetricName: metric.MetricName,
AdditionalContext: &exporters.PushedMetricContext{
NetworkID: in.NetworkId,
},
}

ts := metric.TimestampMS
if ts == 0 {
ts = time.Now().Unix() * 1000
}

prometheusLabels := make([]*prom_proto.LabelPair, 0, len(metric.Labels))
for _, label := range metric.Labels {
prometheusLabels = append(prometheusLabels, &prom_proto.LabelPair{Name: &label.Name, Value: &label.Value})
}
promoMetric := &prom_proto.Metric{
Label: prometheusLabels,
Gauge: &prom_proto.Gauge{
Value: &metric.Value,
},
TimestampMs: &ts,
}
servicers.AddLabel(promoMetric, metrics.NetworkLabelName, in.NetworkId)

gaugeType := prom_proto.MetricType_GAUGE
fam := &prom_proto.MetricFamily{
Name: &metric.MetricName,
Type: &gaugeType,
Metric: []*prom_proto.Metric{promoMetric},
}
ret = append(ret, exporters.MetricAndContext{Family: fam, Context: ctx})
}
return ret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package protected

import (
"context"
"flag"
dto "github.com/prometheus/client_model/go"
prometheusProto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
configurator_test_init "magma/orc8r/cloud/go/services/configurator/test_init"
configurator_test_utils "magma/orc8r/cloud/go/services/configurator/test_utils"
device_test_init "magma/orc8r/cloud/go/services/device/test_init"
"magma/orc8r/cloud/go/services/metricsd/test_init"
"strconv"
"testing"
"time"

"magma/orc8r/cloud/go/services/metricsd/exporters"
tests "magma/orc8r/cloud/go/services/metricsd/test_common"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

var (
testLabels = []*prometheusProto.LabelPair{{Name: tests.MakeStrPtr("labelName"), Value: tests.MakeStrPtr("labelValue")}}
)

// Set verbosity so we can capture exporter error logging
var _ = flag.Set("vmodule", "*=2")

func TestPreprocessCloudMetrics(t *testing.T) {
testFamily := tests.MakeTestMetricFamily(prometheusProto.MetricType_GAUGE, 1, testLabels)
metricAndContext := preprocessCloudMetrics(testFamily, "hostA")

assert.NotNil(t, metricAndContext.Context.AdditionalContext)
assert.Equal(t, "hostA", metricAndContext.Context.AdditionalContext.(*exporters.CloudMetricContext).CloudHost)

labels := metricAndContext.Family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, "cloudHost", "hostA"))
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}

func TestPushedMetricsToMetricsAndContext(t *testing.T) {
container := protos.PushedMetricsContainer{
NetworkId: "testNetwork",
Metrics: []*protos.PushedMetric{{
MetricName: "metricA",
Value: 10,
TimestampMS: 1234,
Labels: []*protos.LabelPair{{Name: "labelName", Value: "labelValue"}},
},
},
}

metricAndContext := pushedMetricsToMetricsAndContext(&container)

assert.Equal(t, 1, len(metricAndContext))
ctx := metricAndContext[0].Context
family := metricAndContext[0].Family
assert.NotNil(t, ctx.AdditionalContext)
assert.Equal(t, "testNetwork", ctx.AdditionalContext.(*exporters.PushedMetricContext).NetworkID)

labels := family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, metrics.NetworkLabelName, "testNetwork"))
assert.True(t, tests.HasLabel(labels, "labelName", "labelValue"))
}

func TestConsume(t *testing.T) {
metricsChan := make(chan *dto.MetricFamily)
e := &exporters.TestMetricExporter{}

test_init.StartNewTestExporter(t, e)
srv := NewCloudMetricsControllerServer()

go srv.ConsumeCloudMetrics(metricsChan, "Host_name_place_holder")
fam1 := "test1"
fam2 := "test2"
go func() {
metricsChan <- &dto.MetricFamily{Name: &fam1, Metric: []*dto.Metric{{}}}
metricsChan <- &dto.MetricFamily{Name: &fam2, Metric: []*dto.Metric{{}}}
}()
time.Sleep(time.Second)
assert.Equal(t, 2, len(e.Queue))
}

func TestPush(t *testing.T) {
device_test_init.StartTestService(t)
configurator_test_init.StartTestService(t)

e := &exporters.TestMetricExporter{}
test_init.StartNewTestExporter(t, e)
srv := NewCloudMetricsControllerServer()

// Create test network
networkID := "metricsd_servicer_test_network"
configurator_test_utils.RegisterNetwork(t, networkID, "Test Network Name")

metricName := "test_metric"
value := 8.2
testLabel := &protos.LabelPair{Name: "labelName", Value: "labelValue"}
timestamp := int64(123456)

protoMet := protos.PushedMetric{
MetricName: metricName,
Value: value,
TimestampMS: timestamp,
Labels: []*protos.LabelPair{testLabel},
}
pushedMetrics := protos.PushedMetricsContainer{
NetworkId: networkID,
Metrics: []*protos.PushedMetric{&protoMet},
}

_, err := srv.Push(context.Background(), &pushedMetrics)
assert.NoError(t, err)
assert.Equal(t, 1, len(e.Queue))
assert.Equal(t, metricName, e.Queue[0].Name())
assert.Equal(t, 2, len(e.Queue[0].Labels()))
assert.Equal(t, testLabel.Name, *e.Queue[0].Labels()[0].Name)
assert.Equal(t, testLabel.Value, *e.Queue[0].Labels()[0].Value)
assert.Equal(t, metrics.NetworkLabelName, *e.Queue[0].Labels()[1].Name)
assert.Equal(t, networkID, *e.Queue[0].Labels()[1].Value)
assert.Equal(t, timestamp, e.Queue[0].TimestampMs())
assert.Equal(t, strconv.FormatFloat(value, 'f', -1, 64), e.Queue[0].Value())
}

0 comments on commit 2bfd484

Please sign in to comment.