diff --git a/receiver/prometheusreceiver/internal/metadata.go b/receiver/prometheusreceiver/internal/metadata.go index 0898f252a76..739c60ff54a 100644 --- a/receiver/prometheusreceiver/internal/metadata.go +++ b/receiver/prometheusreceiver/internal/metadata.go @@ -55,18 +55,22 @@ func (s *metadataService) Get(job, instance string) (MetadataCache, error) { return nil, errAlreadyStopped } - targetGroup, ok := s.sm.TargetsAll()[job] - if !ok { - return nil, errors.New("unable to find a target group with job=" + job) + // Lookup against unaltered job label + if targetGroup, ok := s.sm.TargetsAll()[job]; ok { + for _, target := range targetGroup { + if target.Labels().Get(model.InstanceLabel) == instance { + return &mCache{target}, nil + } + } } - - // from the same targetGroup, instance is not going to be duplicated - for _, target := range targetGroup { - if target.Labels().Get(model.InstanceLabel) == instance { - return &mCache{target}, nil + // Fallback to lookup through all target groups for cases where job was relabeled + for _, targetGroup := range s.sm.TargetsAll() { + for _, target := range targetGroup { + if target.Labels().Get(model.InstanceLabel) == instance && target.Labels().Get(model.JobLabel) == job { + return &mCache{target}, nil + } } } - return nil, errors.New("unable to find a target with job=" + job + ", and instance=" + instance) } diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index 91979170b1b..31535e88ef1 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -54,6 +54,10 @@ func Test_transaction(t *testing.T) { Name: model.InstanceLabel, Value: "localhost:8080", }, + labels.Label{ + Name: model.JobLabel, + Value: "testJobOverridden", + }, ) ms := &metadataService{ @@ -138,6 +142,43 @@ func Test_transaction(t *testing.T) { // assert.Len(t, ocmds[0].Metrics, 1) }) + goodLabelsJobOverridden := labels.Labels([]labels.Label{{Name: "instance", Value: "localhost:8080"}, + {Name: "job", Value: "testJobOverridden"}, + {Name: "__name__", Value: "foo"}}) + t.Run("Job label can be overridden", func(t *testing.T) { + sink := new(consumertest.MetricsSink) + tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger) + if _, got := tr.Append(0, goodLabelsJobOverridden, time.Now().Unix()*1000, 1.0); got != nil { + t.Errorf("expecting error == nil from Add() but got: %v\n", got) + } + tr.metricBuilder.startTime = 1.0 // set to a non-zero value + if got := tr.Commit(); got != nil { + t.Errorf("expecting nil from Commit() but got err %v", got) + } + expectedNode, expectedResource := createNodeAndResource("testJobOverridden", "localhost:8080", "http") + mds := sink.AllMetrics() + if len(mds) != 1 { + t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics()) + } + var ocmds []*agentmetricspb.ExportMetricsServiceRequest + rms := mds[0].ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + ocmd := &agentmetricspb.ExportMetricsServiceRequest{} + ocmd.Node, ocmd.Resource, ocmd.Metrics = internaldata.ResourceMetricsToOC(rms.At(i)) + ocmds = append(ocmds, ocmd) + } + require.Len(t, ocmds, 1) + if !proto.Equal(ocmds[0].Node, expectedNode) { + t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expectedNode) + } + if !proto.Equal(ocmds[0].Resource, expectedResource) { + t.Errorf("generated resource %v and expected resource %v is different\n", ocmds[0].Resource, expectedResource) + } + + // TODO: re-enable this when handle unspecified OC type + // assert.Len(t, ocmds[0].Metrics, 1) + }) + t.Run("Error when start time is zero", func(t *testing.T) { sink := new(consumertest.MetricsSink) tr := newTransaction(context.Background(), nil, true, "", rID, ms, sink, nil, testLogger)