Skip to content

Commit

Permalink
Switch to fasthash library for consistent hashing (#443)
Browse files Browse the repository at this point in the history
Switch to the faster fasthash package for consistent hashing, per #442. This significantly reduces allocations for two functions that are in hot code paths

#443
  • Loading branch information
ChimeraCoder committed Apr 16, 2018
1 parent 5b34b8e commit 74ddf76
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 17 deletions.
12 changes: 6 additions & 6 deletions http.go
@@ -1,7 +1,6 @@
package veneur

import (
"hash/fnv"
"net/http"
"net/http/pprof"
"sort"
Expand All @@ -12,6 +11,7 @@ import (
"github.com/stripe/veneur/trace"
"github.com/stripe/veneur/trace/metrics"

"github.com/segmentio/fasthash/fnv1a"
"goji.io"
"goji.io/pat"
"golang.org/x/net/context"
Expand Down Expand Up @@ -82,11 +82,11 @@ func newSortableJSONMetrics(metrics []samplers.JSONMetric, numWorkers int) *sort
workerIndices: make([]uint32, 0, len(metrics)),
}
for _, j := range metrics {
h := fnv.New32a()
h.Write([]byte(j.Name))
h.Write([]byte(j.Type))
h.Write([]byte(j.JoinedTags))
ret.workerIndices = append(ret.workerIndices, h.Sum32()%uint32(numWorkers))
h := fnv1a.Init32
h = fnv1a.AddString32(h, j.Name)
h = fnv1a.AddString32(h, j.Type)
h = fnv1a.AddString32(h, j.JoinedTags)
ret.workerIndices = append(ret.workerIndices, h%uint32(numWorkers))
}
return &ret
}
Expand Down
26 changes: 26 additions & 0 deletions http_test.go
Expand Up @@ -3,6 +3,7 @@ package veneur
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -16,6 +17,8 @@ import (
"testing"
"time"

"github.com/stripe/veneur/trace"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stripe/veneur/samplers"
Expand Down Expand Up @@ -374,3 +377,26 @@ func testServerImportHelper(t *testing.T, data interface{}) {

assert.Equal(t, http.StatusBadRequest, w.Code, "Test server returned wrong HTTP response code")
}

func BenchmarkNewSortableJSONMetrics(b *testing.B) {
const numWorkers = 100
filename := filepath.Join("fixtures", "import.deflate")
contentEncoding := "deflate"

f, err := os.Open(filename)
assert.NoError(b, err, "Error reading response fixture")
defer f.Close()

r := httptest.NewRequest(http.MethodPost, "/import", f)
r.Header.Set("Content-Encoding", contentEncoding)

w := httptest.NewRecorder()

_, jsonMetrics, err := unmarshalMetricsFromHTTP(context.Background(), trace.DefaultClient, w, r)
assert.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
newSortableJSONMetrics(jsonMetrics, numWorkers)
}
}
22 changes: 11 additions & 11 deletions samplers/parser.go
Expand Up @@ -4,13 +4,13 @@ import (
"bytes"
"errors"
"fmt"
"hash/fnv"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/segmentio/fasthash/fnv1a"
"github.com/stripe/veneur/protocol/dogstatsd"
"github.com/stripe/veneur/samplers/metricpb"
"github.com/stripe/veneur/ssf"
Expand Down Expand Up @@ -176,8 +176,8 @@ func ParseMetricSSF(metric *ssf.SSFSample) (UDPMetric, error) {
ret := UDPMetric{
SampleRate: 1.0,
}
h := fnv.New32a()
h.Write([]byte(metric.Name))
h := fnv1a.Init32
h = fnv1a.AddString32(h, metric.Name)
ret.Name = metric.Name
switch metric.Metric {
case ssf.SSFSample_COUNTER:
Expand All @@ -191,7 +191,7 @@ func ParseMetricSSF(metric *ssf.SSFSample) (UDPMetric, error) {
default:
return UDPMetric{}, invalidMetricTypeError
}
h.Write([]byte(ret.Type))
h = fnv1a.AddString32(h, ret.Type)
if metric.Metric == ssf.SSFSample_SET {
ret.Value = metric.Message
} else {
Expand All @@ -213,8 +213,8 @@ func ParseMetricSSF(metric *ssf.SSFSample) (UDPMetric, error) {
sort.Strings(tempTags)
ret.Tags = tempTags
ret.JoinedTags = strings.Join(tempTags, ",")
h.Write([]byte(ret.JoinedTags))
ret.Digest = h.Sum32()
h = fnv1a.AddString32(h, ret.JoinedTags)
ret.Digest = h
return ret, nil
}

Expand Down Expand Up @@ -247,8 +247,8 @@ func ParseMetric(packet []byte) (*UDPMetric, error) {
return nil, errors.New("Invalid metric packet, metric type not specified")
}

h := fnv.New32a()
h.Write(nameChunk)
h := fnv1a.Init32
h = fnv1a.AddString32(h, string(nameChunk))
ret.Name = string(nameChunk)

// Decide on a type
Expand All @@ -267,7 +267,7 @@ func ParseMetric(packet []byte) (*UDPMetric, error) {
return nil, invalidMetricTypeError
}
// Add the type to the digest
h.Write([]byte(ret.Type))
h = fnv1a.AddString32(h, ret.Type)

// Now convert the metric's value
if ret.Type == "set" {
Expand Down Expand Up @@ -331,14 +331,14 @@ func ParseMetric(packet []byte) (*UDPMetric, error) {
// we specifically need the sorted version here so that hashing over
// tags behaves deterministically
ret.JoinedTags = strings.Join(tags, ",")
h.Write([]byte(ret.JoinedTags))
h = fnv1a.AddString32(h, ret.JoinedTags)

default:
return nil, fmt.Errorf("Invalid metric packet, contains unknown section %q", pipeSplitter.Chunk())
}
}

ret.Digest = h.Sum32()
ret.Digest = h

return ret, nil
}
Expand Down
83 changes: 83 additions & 0 deletions samplers/samplers_test.go
Expand Up @@ -8,8 +8,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stripe/veneur/ssf"
)

const ε = .01

func TestRouting(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -418,3 +421,83 @@ func TestMetricKeyEquality(t *testing.T) {
assert.Equal(t, ce1.MetricKey.String(), ce2.MetricKey.String())
assert.NotEqual(t, ce1.MetricKey.String(), ce3.MetricKey.String())
}

func TestParseMetricSSF(t *testing.T) {
sample := ssf.SSFSample{
Metric: ssf.SSFSample_GAUGE,

Name: "my.test.metric",
Value: rand.Float32(),
Timestamp: time.Now().Unix(),
Message: "arbitrary test message",
Status: ssf.SSFSample_WARNING,
SampleRate: rand.Float32(),
Tags: map[string]string{
"keats": "false",
"yeats": "false",
"wilde": "true",
"veneurglobalonly": "true",
},
Unit: "frobs per second",
}

expected := UDPMetric{
MetricKey: MetricKey{
Name: "my.test.metric",
Type: "gauge",
JoinedTags: "keats:false,wilde:true,yeats:false",
},
Digest: 0x7ae783ad,
Value: sample.Value,
SampleRate: sample.SampleRate,
Tags: []string{
"keats:false",
"wilde:true",
"yeats:false",
},
Scope: 2,
}

udpMetric, err := ParseMetricSSF(&sample)
assert.NoError(t, err)
assert.Equal(t, udpMetric.MetricKey, expected.MetricKey)
assert.Equal(t, udpMetric.Type, expected.Type)
assert.Equal(t, udpMetric.Digest, expected.Digest)
assert.InEpsilon(t, udpMetric.Value, expected.Value, ε)
assert.InEpsilon(t, udpMetric.SampleRate, expected.SampleRate, ε)
assert.Equal(t, udpMetric.JoinedTags, expected.JoinedTags)
assert.Equal(t, udpMetric.Tags, expected.Tags)
assert.Equal(t, udpMetric.Scope, expected.Scope)
}

func BenchmarkParseMetricSSF(b *testing.B) {

const LEN = 10000

samples := make([]*ssf.SSFSample, LEN)

for i, _ := range samples {
p := make([]byte, 10)
_, err := rand.Read(p)
if err != nil {
b.Fatalf("Error generating data: %s", err)
}
sample := ssf.SSFSample{
Name: "my.test.metric",
Value: rand.Float32(),
Timestamp: time.Now().Unix(),
SampleRate: rand.Float32(),
Tags: map[string]string{
"keats": "false",
"yeats": "false",
"wilde": "true",
string(p[:5]): string(p[5:]),
},
}
samples[i] = &sample
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ParseMetricSSF(samples[i%LEN])
}
}

0 comments on commit 74ddf76

Please sign in to comment.