Skip to content

Commit

Permalink
refactor(orc8r): Split Metricsd to protected and southbound
Browse files Browse the repository at this point in the history
Signed-off-by: Christine Wang <christinewang@fb.com>
  • Loading branch information
christinewang5 committed Mar 28, 2022
1 parent 8d9987f commit b0b967e
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 321 deletions.
6 changes: 6 additions & 0 deletions cwf/k8s/cwf_operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8
github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dhui/dktest v0.3.0/go.mod h1:cyzIUfGsBEbZ6BT7tnXqAShHSXCZhSNmFl70sZ7c1yc=
github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E=
github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE=
github.com/docker/cli v0.0.0-20200130152716-5d0cf8839492 h1:FwssHbCDJD025h+BchanCwE1Q8fyMgqDr2mOQAWOLGw=
github.com/docker/cli v0.0.0-20200130152716-5d0cf8839492/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/distribution v0.0.0-20191216044856-a8371794149d/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY=
Expand Down Expand Up @@ -299,6 +300,7 @@ github.com/facebookincubator/prometheus-edge-hub v1.1.0/go.mod h1:MXZSK377xnwne4
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc=
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.1.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
Expand Down Expand Up @@ -624,6 +626,7 @@ github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down Expand Up @@ -1318,6 +1321,7 @@ golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190617190820-da514acc4774/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190813034749-528a2984e271/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down Expand Up @@ -1475,6 +1479,8 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/gotestsum v1.7.0/go.mod h1:V1m4Jw3eBerhI/A6qCxUE07RnCg7ACkKj9BYcAm09V8=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
helm.sh/helm/v3 v3.2.4 h1:lz/0ZRkSgyIF+pCo6pjFzap1udCARB1IN6CRfqkpcOg=
helm.sh/helm/v3 v3.2.4/go.mod h1:ZaXz/vzktgwjyGGFbUWtIQkscfE7WYoRGP2szqAFHR0=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
24 changes: 24 additions & 0 deletions orc8r/cloud/go/services/metricsd/exporters/remote_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,27 @@ func (r *remoteExporter) getExporterClient() (protos.MetricsExporterClient, erro
}
return protos.NewMetricsExporterClient(conn), nil
}

type TestMetricExporter struct {
Queue []Sample

// error to return
RetErr error
}

func (e *TestMetricExporter) Submit(metrics []MetricAndContext) error {
for _, metricAndContext := range metrics {
family := metricAndContext.Family
for _, metric := range family.GetMetric() {
convertedMetricAndContext := MakeProtoMetric(metricAndContext)
e.Queue = append(
e.Queue,
GetSamplesForMetrics(convertedMetricAndContext, metric)...,
)
}
}

return e.RetErr
}

func (e *TestMetricExporter) Start() {}
6 changes: 3 additions & 3 deletions orc8r/cloud/go/services/metricsd/metricsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/collection"
"magma/orc8r/cloud/go/services/metricsd/obsidian/handlers"
"magma/orc8r/cloud/go/services/metricsd/servicers"
"magma/orc8r/cloud/go/services/metricsd/servicers/protected"
"magma/orc8r/lib/go/protos"
)

Expand All @@ -47,8 +47,8 @@ func main() {
glog.Fatalf("Error creating orc8r service for metricsd: %s", err)
}

controllerServicer := servicers.NewMetricsControllerServer()
protos.RegisterMetricsControllerServer(srv.GrpcServer, controllerServicer)
controllerServicer := protected.NewCloudMetricsControllerServer()
protos.RegisterCloudMetricsControllerServer(srv.GrpcServer, controllerServicer)

swagger_protos.RegisterSwaggerSpecServer(srv.GrpcServer, swagger_servicers.NewSpecServicerFromFile(metricsd.ServiceName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package servicers
package protected

import (
"context"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
prom_proto "github.com/prometheus/client_model/go"

"magma/orc8r/cloud/go/serdes"
"magma/orc8r/cloud/go/services/configurator"
"magma/orc8r/cloud/go/services/metricsd"
"magma/orc8r/cloud/go/services/metricsd/exporters"
servicers "magma/orc8r/cloud/go/services/metricsd/servicers/southbound"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

// MetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
type MetricsControllerServer struct {
exporters []exporters.Exporter
}

func NewMetricsControllerServer() *MetricsControllerServer {
return &MetricsControllerServer{}
}

// CloudMetricsControllerServer implements a handler to the gRPC server run by the
// Metrics Controller. It can register instances of the Exporter interface for
// writing to storage
Expand Down Expand Up @@ -70,44 +57,10 @@ func (srv *CloudMetricsControllerServer) Push(ctx context.Context, in *protos.Pu
return new(protos.Void), nil
}

func (srv *MetricsControllerServer) Collect(ctx context.Context, in *protos.MetricsContainer) (*protos.Void, error) {
if in.Family == nil || len(in.Family) == 0 {
return new(protos.Void), nil
}

hardwareID := in.GetGatewayId()
checkID, err := protos.GetGatewayIdentity(ctx)
if err != nil {
return new(protos.Void), err
}
if len(checkID.HardwareId) > 0 && checkID.HardwareId != hardwareID {
glog.Errorf("Expected %s, but found %s as Hardware ID", checkID.HardwareId, hardwareID)
hardwareID = checkID.HardwareId
}
networkID, gatewayID, err := getNetworkAndEntityIDForPhysicalID(ctx, hardwareID)
if err != nil {
return new(protos.Void), err
}
glog.V(2).Infof("collecting %v metrics from gateway %v\n", len(in.Family), in.GatewayId)

metricsToSubmit := metricsContainerToMetricAndContexts(in, networkID, gatewayID)
metricsExporters, err := metricsd.GetMetricsExporters()
if err != nil {
return &protos.Void{}, err
}
for _, e := range metricsExporters {
err := e.Submit(metricsToSubmit)
if err != nil {
glog.Error(err)
}
}
return new(protos.Void), nil
}

// ConsumeCloudMetrics pulls metrics off the given input channel and sends
// them to all exporters after some preprocessing.
// Returns only when inputChan closed, which should never happen.
func (srv *MetricsControllerServer) ConsumeCloudMetrics(inputChan chan *prom_proto.MetricFamily, hostName string) {
func (srv *CloudMetricsControllerServer) ConsumeCloudMetrics(inputChan chan *prom_proto.MetricFamily, hostName string) {
for family := range inputChan {
metricsToSubmit := preprocessCloudMetrics(family, hostName)
metricsExporters, err := metricsd.GetMetricsExporters()
Expand All @@ -134,39 +87,11 @@ func preprocessCloudMetrics(family *prom_proto.MetricFamily, hostName string) ex
}
for _, metric := range family.Metric {
metric.Label = protos.GetDecodedLabel(metric)
addLabel(metric, metrics.CloudHostLabelName, hostName)
servicers.AddLabel(metric, metrics.CloudHostLabelName, hostName)
}
return exporters.MetricAndContext{Family: family, Context: ctx}
}

func (srv *MetricsControllerServer) RegisterExporter(e exporters.Exporter) []exporters.Exporter {
srv.exporters = append(srv.exporters, e)
return srv.exporters
}

func metricsContainerToMetricAndContexts(
in *protos.MetricsContainer,
networkID, gatewayID string,
) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Family))
for _, fam := range in.Family {
ctx := exporters.MetricContext{
MetricName: protos.GetDecodedName(fam),
AdditionalContext: &exporters.GatewayMetricContext{
NetworkID: networkID,
GatewayID: gatewayID,
},
}
for _, metric := range fam.Metric {
metric.Label = protos.GetDecodedLabel(metric)
addLabel(metric, metrics.NetworkLabelName, networkID)
addLabel(metric, metrics.GatewayLabelName, gatewayID)
}
ret = append(ret, exporters.MetricAndContext{Family: fam, Context: ctx})
}
return ret
}

func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []exporters.MetricAndContext {
ret := make([]exporters.MetricAndContext, 0, len(in.Metrics))
for _, metric := range in.Metrics {
Expand All @@ -193,7 +118,7 @@ func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []expor
},
TimestampMs: &ts,
}
addLabel(promoMetric, metrics.NetworkLabelName, in.NetworkId)
servicers.AddLabel(promoMetric, metrics.NetworkLabelName, in.NetworkId)

gaugeType := prom_proto.MetricType_GAUGE
fam := &prom_proto.MetricFamily{
Expand All @@ -205,33 +130,3 @@ func pushedMetricsToMetricsAndContext(in *protos.PushedMetricsContainer) []expor
}
return ret
}

// addLabel ensures that the desired name-value pairing is present in the
// metric's labels.
func addLabel(metric *prom_proto.Metric, labelName, labelValue string) {
labelAdded := false
for _, label := range metric.Label {
if label.GetName() == labelName {
label.Value = &labelValue
labelAdded = true
}
}
if !labelAdded {
metric.Label = append(metric.Label, &prom_proto.LabelPair{Name: strPtr(labelName), Value: &labelValue})
}
}

func strPtr(s string) *string {
return &s
}

func getNetworkAndEntityIDForPhysicalID(ctx context.Context, physicalID string) (string, string, error) {
if len(physicalID) == 0 {
return "", "", errors.New("Empty Hardware ID")
}
entity, err := configurator.LoadEntityForPhysicalID(ctx, physicalID, configurator.EntityLoadCriteria{}, serdes.Entity)
if err != nil {
return "", "", err
}
return entity.NetworkID, entity.Key, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2020 The Magma Authors.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protected

import (
"context"
"flag"
"strconv"
"testing"
"time"

dto "github.com/prometheus/client_model/go"
prometheusProto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"

configurator_test_init "magma/orc8r/cloud/go/services/configurator/test_init"
configurator_test_utils "magma/orc8r/cloud/go/services/configurator/test_utils"
device_test_init "magma/orc8r/cloud/go/services/device/test_init"
"magma/orc8r/cloud/go/services/metricsd/exporters"
tests "magma/orc8r/cloud/go/services/metricsd/test_common"
"magma/orc8r/cloud/go/services/metricsd/test_init"
"magma/orc8r/lib/go/metrics"
"magma/orc8r/lib/go/protos"
)

var (
testLabels = []*prometheusProto.LabelPair{{Name: tests.MakeStrPtr("labelName"), Value: tests.MakeStrPtr("labelValue")}}
)

// Set verbosity so we can capture exporter error logging
var _ = flag.Set("vmodule", "*=2")

func TestPreprocessCloudMetrics(t *testing.T) {
testFamily := tests.MakeTestMetricFamily(prometheusProto.MetricType_GAUGE, 1, testLabels)
metricAndContext := preprocessCloudMetrics(testFamily, "hostA")

assert.NotNil(t, metricAndContext.Context.AdditionalContext)
assert.Equal(t, "hostA", metricAndContext.Context.AdditionalContext.(*exporters.CloudMetricContext).CloudHost)

labels := metricAndContext.Family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, "cloudHost", "hostA"))
assert.True(t, tests.HasLabel(labels, testLabels[0].GetName(), testLabels[0].GetValue()))
}

func TestPushedMetricsToMetricsAndContext(t *testing.T) {
container := protos.PushedMetricsContainer{
NetworkId: "testNetwork",
Metrics: []*protos.PushedMetric{{
MetricName: "metricA",
Value: 10,
TimestampMS: 1234,
Labels: []*protos.LabelPair{{Name: "labelName", Value: "labelValue"}},
},
},
}

metricAndContext := pushedMetricsToMetricsAndContext(&container)

assert.Equal(t, 1, len(metricAndContext))
ctx := metricAndContext[0].Context
family := metricAndContext[0].Family
assert.NotNil(t, ctx.AdditionalContext)
assert.Equal(t, "testNetwork", ctx.AdditionalContext.(*exporters.PushedMetricContext).NetworkID)

labels := family.GetMetric()[0].Label
assert.Equal(t, 2, len(labels))
assert.True(t, tests.HasLabel(labels, metrics.NetworkLabelName, "testNetwork"))
assert.True(t, tests.HasLabel(labels, "labelName", "labelValue"))
}

func TestConsume(t *testing.T) {
metricsChan := make(chan *dto.MetricFamily)
e := &exporters.TestMetricExporter{}

test_init.StartNewTestExporter(t, e)
srv := NewCloudMetricsControllerServer()

go srv.ConsumeCloudMetrics(metricsChan, "Host_name_place_holder")
fam1 := "test1"
fam2 := "test2"
go func() {
metricsChan <- &dto.MetricFamily{Name: &fam1, Metric: []*dto.Metric{{}}}
metricsChan <- &dto.MetricFamily{Name: &fam2, Metric: []*dto.Metric{{}}}
}()
time.Sleep(time.Second)
assert.Equal(t, 2, len(e.Queue))
}

func TestPush(t *testing.T) {
device_test_init.StartTestService(t)
configurator_test_init.StartTestService(t)

e := &exporters.TestMetricExporter{}
test_init.StartNewTestExporter(t, e)
srv := NewCloudMetricsControllerServer()

// Create test network
networkID := "metricsd_servicer_test_network"
configurator_test_utils.RegisterNetwork(t, networkID, "Test Network Name")

metricName := "test_metric"
value := 8.2
testLabel := &protos.LabelPair{Name: "labelName", Value: "labelValue"}
timestamp := int64(123456)

protoMet := protos.PushedMetric{
MetricName: metricName,
Value: value,
TimestampMS: timestamp,
Labels: []*protos.LabelPair{testLabel},
}
pushedMetrics := protos.PushedMetricsContainer{
NetworkId: networkID,
Metrics: []*protos.PushedMetric{&protoMet},
}

_, err := srv.Push(context.Background(), &pushedMetrics)
assert.NoError(t, err)
assert.Equal(t, 1, len(e.Queue))
assert.Equal(t, metricName, e.Queue[0].Name())
assert.Equal(t, 2, len(e.Queue[0].Labels()))
assert.Equal(t, testLabel.Name, *e.Queue[0].Labels()[0].Name)
assert.Equal(t, testLabel.Value, *e.Queue[0].Labels()[0].Value)
assert.Equal(t, metrics.NetworkLabelName, *e.Queue[0].Labels()[1].Name)
assert.Equal(t, networkID, *e.Queue[0].Labels()[1].Value)
assert.Equal(t, timestamp, e.Queue[0].TimestampMs())
assert.Equal(t, strconv.FormatFloat(value, 'f', -1, 64), e.Queue[0].Value())
}

0 comments on commit b0b967e

Please sign in to comment.