Skip to content

Commit

Permalink
change labelset comparison in promql engine to avoid false positive d…
Browse files Browse the repository at this point in the history
…uring detection of duplicates (#7058)

* Use go1.14 new hash/maphash to hash both RHS and LHS instead of XOR'ing
which has been resulting in hash collisions.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Refactor engine labelset signature generation, just use labels.Labels
instead of hashes.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address review comments; function comments + store result of
lhs.String+rhs.String as key.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Replace all signatureFunc usage with signatureFuncString.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Make optimizations to labels String function and generation of rhs+lhs
as string in resultMetric.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use separate string functions that don't use strconv just for engine
maps.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use a byte invalid separator instead of quoting and have a buffer
attached to EvalNodeHelper instead of using a global pool in the labels
package.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address more review comments, labels has a function that now builds a
byte slice without turning it into a string.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use two different non-ascii hex codes as byte separators between labels
and between sets of labels when building bytes of a Labels struct.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* We only need the 2nd byte invalid sep. at the beginning of a
labels.Bytes

Signed-off-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
cstyan committed May 12, 2020
1 parent da217cb commit 5bb7f00
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 54 deletions.
60 changes: 57 additions & 3 deletions pkg/labels/labels.go
Expand Up @@ -22,14 +22,15 @@ import (
"github.com/cespare/xxhash"
)

const sep = '\xff'

// Well-known label names used by Prometheus components.
const (
MetricName = "__name__"
AlertName = "alertname"
BucketLabel = "le"
InstanceName = "instance"

sep = '\xff'
labelSep = '\xfe'
)

// Label is a key/value pair of strings.
Expand Down Expand Up @@ -59,10 +60,25 @@ func (ls Labels) String() string {
b.WriteString(strconv.Quote(l.Value))
}
b.WriteByte('}')

return b.String()
}

// Bytes returns ls as a byte slice.
// It uses an byte invalid character as a separator and so should not be used for printing.
func (ls Labels) Bytes(buf []byte) []byte {
b := bytes.NewBuffer(buf[:0])
b.WriteByte(labelSep)
for i, l := range ls {
if i > 0 {
b.WriteByte(sep)
}
b.WriteString(l.Name)
b.WriteByte(sep)
b.WriteString(l.Value)
}
return b.Bytes()
}

// MarshalJSON implements json.Marshaler.
func (ls Labels) MarshalJSON() ([]byte, error) {
return json.Marshal(ls.Map())
Expand Down Expand Up @@ -172,6 +188,44 @@ func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
return xxhash.Sum64(b), b
}

// WithLabels returns a new labels.Labels from ls that only contains labels matching names.
// 'names' have to be sorted in ascending order.
func (ls Labels) WithLabels(names ...string) Labels {
ret := make([]Label, 0, len(ls))

i, j := 0, 0
for i < len(ls) && j < len(names) {
if names[j] < ls[i].Name {
j++
} else if ls[i].Name < names[j] {
i++
} else {
ret = append(ret, ls[i])
i++
j++
}
}
return ret
}

// WithLabels returns a new labels.Labels from ls that contains labels not matching names.
// 'names' have to be sorted in ascending order.
func (ls Labels) WithoutLabels(names ...string) Labels {
ret := make([]Label, 0, len(ls))

j := 0
for i := range ls {
for j < len(names) && names[j] < ls[i].Name {
j++
}
if ls[i].Name == MetricName || (j < len(names) && ls[i].Name == names[j]) {
continue
}
ret = append(ret, ls[i])
}
return ret
}

// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
res := make(Labels, len(ls))
Expand Down
98 changes: 52 additions & 46 deletions promql/engine.go
Expand Up @@ -14,6 +14,7 @@
package promql

import (
"bytes"
"container/heap"
"context"
"fmt"
Expand Down Expand Up @@ -831,16 +832,20 @@ type EvalNodeHelper struct {
// dropMetricName and label_*.
dmn map[uint64]labels.Labels
// signatureFunc.
sigf map[uint64]uint64
sigf map[string]string
// funcHistogramQuantile.
signatureToMetricWithBuckets map[uint64]*metricWithBuckets
signatureToMetricWithBuckets map[string]*metricWithBuckets
// label_replace.
regex *regexp.Regexp

lb *labels.Builder
lblBuf []byte
lblResultBuf []byte

// For binary vector matching.
rightSigs map[uint64]Sample
matchedSigs map[uint64]map[uint64]struct{}
resultMetric map[uint64]labels.Labels
rightSigs map[string]Sample
matchedSigs map[string]map[uint64]struct{}
resultMetric map[string]labels.Labels
}

// dropMetricName is a cached version of dropMetricName.
Expand All @@ -858,20 +863,19 @@ func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels {
return ret
}

// signatureFunc is a cached version of signatureFunc.
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) string {
if enh.sigf == nil {
enh.sigf = make(map[uint64]uint64, len(enh.out))
enh.sigf = make(map[string]string, len(enh.out))
}
f := signatureFunc(on, names...)
return func(l labels.Labels) uint64 {
h := l.Hash()
ret, ok := enh.sigf[h]
f := signatureFunc(on, enh.lblBuf, names...)
return func(l labels.Labels) string {
enh.lblBuf = l.Bytes(enh.lblBuf)
ret, ok := enh.sigf[string(enh.lblBuf)]
if ok {
return ret
}
ret = f(l)
enh.sigf[h] = ret
enh.sigf[string(enh.lblBuf)] = ret
return ret
}
}
Expand Down Expand Up @@ -1527,7 +1531,7 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching,
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)

// The set of signatures for the right-hand side Vector.
rightSigs := map[uint64]struct{}{}
rightSigs := map[string]struct{}{}
// Add all rhs samples to a map so we can easily find matches later.
for _, rs := range rhs {
rightSigs[sigf(rs.Metric)] = struct{}{}
Expand All @@ -1548,7 +1552,7 @@ func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching,
}
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)

leftSigs := map[uint64]struct{}{}
leftSigs := map[string]struct{}{}
// Add everything from the left-hand-side Vector.
for _, ls := range lhs {
leftSigs[sigf(ls.Metric)] = struct{}{}
Expand All @@ -1569,7 +1573,7 @@ func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatchi
}
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)

rightSigs := map[uint64]struct{}{}
rightSigs := map[string]struct{}{}
for _, rs := range rhs {
rightSigs[sigf(rs.Metric)] = struct{}{}
}
Expand Down Expand Up @@ -1598,7 +1602,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *

// All samples from the rhs hashed by the matching label/values.
if enh.rightSigs == nil {
enh.rightSigs = make(map[uint64]Sample, len(enh.out))
enh.rightSigs = make(map[string]Sample, len(enh.out))
} else {
for k := range enh.rightSigs {
delete(enh.rightSigs, k)
Expand Down Expand Up @@ -1628,7 +1632,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one
// the value is a set of signatures to detect duplicated result elements.
if enh.matchedSigs == nil {
enh.matchedSigs = make(map[uint64]map[uint64]struct{}, len(rightSigs))
enh.matchedSigs = make(map[string]map[uint64]struct{}, len(rightSigs))
} else {
for k := range enh.matchedSigs {
delete(enh.matchedSigs, k)
Expand Down Expand Up @@ -1662,7 +1666,6 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
continue
}
metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh)

insertedSigs, exists := matchedSigs[sig]
if matching.Card == parser.CardOneToOne {
if exists {
Expand Down Expand Up @@ -1692,42 +1695,45 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
return enh.out
}

// signatureFunc returns a function that calculates the signature for a metric
// ignoring the provided labels. If on, then the given labels are only used instead.
func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
func signatureFunc(on bool, b []byte, names ...string) func(labels.Labels) string {
sort.Strings(names)
if on {
return func(lset labels.Labels) uint64 {
h, _ := lset.HashForLabels(make([]byte, 0, 1024), names...)
return h
return func(lset labels.Labels) string {
return string(lset.WithLabels(names...).Bytes(b))
}
}
return func(lset labels.Labels) uint64 {
h, _ := lset.HashWithoutLabels(make([]byte, 0, 1024), names...)
return h
return func(lset labels.Labels) string {
return string(lset.WithoutLabels(names...).Bytes(b))
}
}

// resultMetric returns the metric for the given sample(s) based on the Vector
// binary operation and the matching options.
func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.VectorMatching, enh *EvalNodeHelper) labels.Labels {
if enh.resultMetric == nil {
enh.resultMetric = make(map[uint64]labels.Labels, len(enh.out))
}
// op and matching are always the same for a given node, so
// there's no need to include them in the hash key.
// If the lhs and rhs are the same then the xor would be 0,
// so add in one side to protect against that.
lh := lhs.Hash()
h := (lh ^ rhs.Hash()) + lh
if ret, ok := enh.resultMetric[h]; ok {
return ret
enh.resultMetric = make(map[string]labels.Labels, len(enh.out))
}

lb := labels.NewBuilder(lhs)
if enh.lb == nil {
enh.lb = labels.NewBuilder(lhs)
} else {
enh.lb.Reset(lhs)
}

buf := bytes.NewBuffer(enh.lblResultBuf[:0])
enh.lblBuf = lhs.Bytes(enh.lblBuf)
buf.Write(enh.lblBuf)
enh.lblBuf = rhs.Bytes(enh.lblBuf)
buf.Write(enh.lblBuf)
enh.lblResultBuf = buf.Bytes()

if ret, ok := enh.resultMetric[string(enh.lblResultBuf)]; ok {
return ret
}
str := string(enh.lblResultBuf)

if shouldDropMetricName(op) {
lb.Del(labels.MetricName)
enh.lb.Del(labels.MetricName)
}

if matching.Card == parser.CardOneToOne {
Expand All @@ -1739,23 +1745,23 @@ func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.V
continue Outer
}
}
lb.Del(l.Name)
enh.lb.Del(l.Name)
}
} else {
lb.Del(matching.MatchingLabels...)
enh.lb.Del(matching.MatchingLabels...)
}
}
for _, ln := range matching.Include {
// Included labels from the `group_x` modifier are taken from the "one"-side.
if v := rhs.Get(ln); v != "" {
lb.Set(ln, v)
enh.lb.Set(ln, v)
} else {
lb.Del(ln)
enh.lb.Del(ln)
}
}

ret := lb.Labels()
enh.resultMetric[h] = ret
ret := enh.lb.Labels()
enh.resultMetric[str] = ret
return ret
}

Expand Down
10 changes: 5 additions & 5 deletions promql/functions.go
Expand Up @@ -598,10 +598,10 @@ func funcPredictLinear(vals []parser.Value, args parser.Expressions, enh *EvalNo
func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
q := vals[0].(Vector)[0].V
inVec := vals[1].(Vector)
sigf := enh.signatureFunc(false, excludedLabels...)
sigf := signatureFunc(false, enh.lblBuf, excludedLabels...)

if enh.signatureToMetricWithBuckets == nil {
enh.signatureToMetricWithBuckets = map[uint64]*metricWithBuckets{}
enh.signatureToMetricWithBuckets = map[string]*metricWithBuckets{}
} else {
for _, v := range enh.signatureToMetricWithBuckets {
v.buckets = v.buckets[:0]
Expand All @@ -616,16 +616,16 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
// TODO(beorn7): Issue a warning somehow.
continue
}
hash := sigf(el.Metric)
l := sigf(el.Metric)

mb, ok := enh.signatureToMetricWithBuckets[hash]
mb, ok := enh.signatureToMetricWithBuckets[l]
if !ok {
el.Metric = labels.NewBuilder(el.Metric).
Del(labels.BucketLabel, labels.MetricName).
Labels()

mb = &metricWithBuckets{el.Metric, nil}
enh.signatureToMetricWithBuckets[hash] = mb
enh.signatureToMetricWithBuckets[l] = mb
}
mb.buckets = append(mb.buckets, bucket{upperBound, el.V})
}
Expand Down
11 changes: 11 additions & 0 deletions promql/testdata/collision.test
@@ -0,0 +1,11 @@

load 1s
node_namespace_pod:kube_pod_info:{namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
node_cpu_seconds_total{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449
node_cpu_seconds_total{cpu="35",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449
node_cpu_seconds_total{cpu="89",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449

eval instant at 4s count by(namespace, pod, cpu) (node_cpu_seconds_total{cpu=~".*",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v"}) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{namespace="observability",pod="node-exporter-l454v"}
{cpu="10",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
{cpu="35",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
{cpu="89",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1

0 comments on commit 5bb7f00

Please sign in to comment.