Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(1693): allow job label to be rewritten in prometheus receiver #3347

Closed
wants to merge 9 commits into from
22 changes: 13 additions & 9 deletions receiver/prometheusreceiver/internal/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@ type metadataService struct {
}

func (s *metadataService) Get(job, instance string) (MetadataCache, error) {
targetGroup, ok := s.sm.TargetsAll()[job]
if !ok {
return nil, errors.New("unable to find a target group with job=" + job)
// Lookup against unaltered job label
if targetGroup, ok := s.sm.TargetsAll()[job]; ok {
for _, target := range targetGroup {
if target.Labels().Get(model.InstanceLabel) == instance {
return &mCache{target}, nil
}
}
}

// from the same targetGroup, instance is not going to be duplicated
for _, target := range targetGroup {
if target.Labels().Get(model.InstanceLabel) == instance {
return &mCache{target}, nil
// Fallback to lookup through all target groups for cases where job was relabeled
for _, targetGroup := range s.sm.TargetsAll() {
for _, target := range targetGroup {
if target.Labels().Get(model.InstanceLabel) == instance && target.Labels().Get(model.JobLabel) == job {
return &mCache{target}, nil
}
}
}

return nil, errors.New("unable to find a target with job=" + job + ", and instance=" + instance)
}

Expand Down
41 changes: 41 additions & 0 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func Test_transaction(t *testing.T) {
Name: model.InstanceLabel,
Value: "localhost:8080",
},
labels.Label{
Name: model.JobLabel,
Value: "testJobOverridden",
},
)

ms := &metadataService{
Expand Down Expand Up @@ -138,6 +142,43 @@ func Test_transaction(t *testing.T) {
// assert.Len(t, ocmds[0].Metrics, 1)
})

goodLabelsJobOverridden := labels.Labels([]labels.Label{{Name: "instance", Value: "localhost:8080"},
{Name: "job", Value: "testJobOverridden"},
{Name: "__name__", Value: "foo"}})
t.Run("Job label can be overridden", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
if _, got := tr.Append(0, goodLabelsJobOverridden, time.Now().Unix()*1000, 1.0); got != nil {
t.Errorf("expecting error == nil from Add() but got: %v\n", got)
}
tr.metricBuilder.startTime = 1.0 // set to a non-zero value
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
expectedNode, expectedResource := createNodeAndResource("testJobOverridden", "localhost:8080", "http")
mds := sink.AllMetrics()
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
}
var ocmds []*agentmetricspb.ExportMetricsServiceRequest
rms := mds[0].ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ocmd := &agentmetricspb.ExportMetricsServiceRequest{}
ocmd.Node, ocmd.Resource, ocmd.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))
ocmds = append(ocmds, ocmd)
}
require.Len(t, ocmds, 1)
if !proto.Equal(ocmds[0].Node, expectedNode) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expectedNode)
}
if !proto.Equal(ocmds[0].Resource, expectedResource) {
t.Errorf("generated resource %v and expected resource %v is different\n", ocmds[0].Resource, expectedResource)
}

// TODO: re-enable this when handle unspecified OC type
// assert.Len(t, ocmds[0].Metrics, 1)
})

t.Run("Error when start time is zero", func(t *testing.T) {
sink := new(consumertest.MetricsSink)
tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)
Expand Down