Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect duplicates across files in textfile collector #274

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 127 additions & 29 deletions collector/textfile.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Below code originally copied from prometheus/node_exporter/collector/textfile.go:
//
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,11 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !notextfile

package collector

import (
"bytes"
"fmt"
"io"
"io/ioutil"
Expand All @@ -29,7 +30,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand Down Expand Up @@ -64,7 +65,7 @@ func NewTextFileCollector() (Collector, error) {
}, nil
}

func convertMetricFamily(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) {
func convertMetricFamily(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric, seen map[uint64]string, path string) {
var valType prometheus.ValueType
var val float64

Expand Down Expand Up @@ -105,6 +106,14 @@ func convertMetricFamily(metricFamily *dto.MetricFamily, ch chan<- prometheus.Me
}
}

h := hash(metricFamily, metric)
if seenIn, ok := seen[h]; ok {
repr := friendlyString(*metricFamily.Name, names, values)
log.Warnf("Metric %s was read from %s, but has already been collected from file %s, skipping", repr, path, seenIn)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good idea, you're now randomly dropping metrics depending on the iteration order. We've had issues with this type of approach in the past.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not great, I agree. However, failing the scrape completely isn't that good either. Do you see some middle road or better approach?
Also, I just realized I've forgotten to set the error flag when this happens, so now it is not alertable. That needs to be fixed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node exporter fails, as this is an invalid setup. It's better to hard fail than silently return partial data.

@SuperQ FYI

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be failing the entire scrape, though? Skipping the textfile stuff completely seems okay (although I'll note that the metrics aren't randomly dropped, iteration order is deterministic here, so which metrics are dropped is consistent across scrapes given consistent input), but any individual breakage from any collector resulting in up=0 seems a bit harsh?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should approach it however you approach any individual collector failing. If you permit that, then you should have metrics indicating which collectors did/didn't work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that would be my preference. It will require a bit of gymnastics though, since promhttp bails the entire scrape if we allow the data to get there.
I guess I'll do some sort of buffer to be able to detect failure before passing the metrics through the channel. Would this be something that should be done in node_exporter as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be a question for @SuperQ, I've heard no plans in that direction.

}
seen[h] = path

metricType := metricFamily.GetType()
switch metricType {
case dto.MetricType_COUNTER:
Expand Down Expand Up @@ -186,34 +195,11 @@ func (c *textFileCollector) exportMTimes(mtimes map[string]time.Time, ch chan<-
}
}

type carriageReturnFilteringReader struct {
r io.Reader
}

// Read returns data from the underlying io.Reader, but with \r filtered out
func (cr carriageReturnFilteringReader) Read(p []byte) (int, error) {
buf := make([]byte, len(p))
n, err := cr.r.Read(buf)

if err != nil && err != io.EOF {
return n, err
}

pi := 0
for i := 0; i < n; i++ {
if buf[i] != '\r' {
p[pi] = buf[i]
pi++
}
}

return pi, err
}

// Update implements the Collector interface.
func (c *textFileCollector) Collect(ch chan<- prometheus.Metric) error {
error := 0.0
mtimes := map[string]time.Time{}
seenMetrics := make(map[uint64]string)

// Iterate over files and accumulate their metrics.
files, err := ioutil.ReadDir(c.path)
Expand Down Expand Up @@ -262,7 +248,7 @@ fileLoop:
mtimes[f.Name()] = f.ModTime()

for _, mf := range parsedFamilies {
convertMetricFamily(mf, ch)
convertMetricFamily(mf, ch, seenMetrics, path)
}
}

Expand All @@ -279,3 +265,115 @@ fileLoop:
)
return nil
}

//
// End of code copied from prometheus/node_exporter/collector/textfile.go
//

// Below code copied from prometheus/client_golang/prometheus/fnv.go:
//
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

const (
offset64 = 14695981039346656037
prime64 = 1099511628211
separatorByte byte = 255
)

// hashNew initializies a new fnv64a hash value.
func hashNew() uint64 {
return offset64
}

// hashAdd adds a string to a fnv64a hash value, returning the updated hash.
func hashAdd(h uint64, s string) uint64 {
for i := 0; i < len(s); i++ {
h ^= uint64(s[i])
h *= prime64
}
return h
}

// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash.
func hashAddByte(h uint64, b byte) uint64 {
h ^= uint64(b)
h *= prime64
return h
}

func hash(mf *dto.MetricFamily, m *dto.Metric) uint64 {
h := hashNew()
h = hashAdd(h, mf.GetName())
h = hashAddByte(h, separatorByte)
// Make sure label pairs are sorted. We depend on it for the consistency
// check.
sort.Sort(prometheus.LabelPairSorter(m.Label))
for _, lp := range m.Label {
h = hashAdd(h, lp.GetValue())
h = hashAddByte(h, separatorByte)
}

return h
}

//
// End of code copied from prometheus/client_golang/prometheus/fnv.go
//

type carriageReturnFilteringReader struct {
r io.Reader
}

// Read returns data from the underlying io.Reader, but with \r filtered out
func (cr carriageReturnFilteringReader) Read(p []byte) (int, error) {
buf := make([]byte, len(p))
n, err := cr.r.Read(buf)

if err != nil && err != io.EOF {
return n, err
}

pi := 0
for i := 0; i < n; i++ {
if buf[i] != '\r' {
p[pi] = buf[i]
pi++
}
}

return pi, err
}

func friendlyString(name string, labelNames, labelValues []string) string {
var bs bytes.Buffer

sortedNames := make([]string, len(labelNames))
copy(sortedNames, labelNames)
sort.Strings(sortedNames)

sortedValues := make([]string, len(labelValues))
copy(sortedValues, labelValues)
sort.Strings(sortedValues)

bs.WriteString(name)
bs.WriteRune('{')
for idx := 0; idx < len(sortedNames); idx++ {
bs.WriteString(fmt.Sprintf(`%s="%s"`, sortedNames[idx], sortedValues[idx]))
if idx < len(sortedNames)-1 {
bs.WriteRune(',')
}
}
bs.WriteRune('}')
return bs.String()
}
8 changes: 4 additions & 4 deletions collector/textfile_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package collector

import (
"testing"
"strings"
"io/ioutil"
"strings"
"testing"
)

func TestCRFilter(t *testing.T) {
sr := strings.NewReader("line 1\r\nline 2")
cr := carriageReturnFilteringReader{ r: sr }
cr := carriageReturnFilteringReader{r: sr}
b, err := ioutil.ReadAll(cr)
if err != nil {
t.Error(err)
Expand All @@ -17,4 +17,4 @@ func TestCRFilter(t *testing.T) {
if string(b) != "line 1\nline 2" {
t.Errorf("Unexpected output %q", b)
}
}
}