Skip to content

Commit

Permalink
Add a gRPC "import" server (#442)
Browse files Browse the repository at this point in the history
* Add a gRPC "import" server

This adds a gRPC "import" server, which receives metrics and outputs
them to a set of generic `MetricIngesters".  This should be used to
receive metrics forwarded from another Veneur.

The importsrv package defines a gRPC server that implements the
forwardrpc.Forward service.  It receives batches of metrics, and
sends them on to a set of sinks based on their key.  To facilitate easy
testing of this RPC, I made the server accept an interface
MetricIngester, which is what is used to forward the metrics.  Worker
will later implement this interface, allowing for them to be passed
directly into this server.

* importsrv: reduce allocations from hashing metrics

From profiling global Veneurs in a production environment, we see that
the garbage collector is generally dominating the CPU usage.  Digging in
a little more, it looks like the process to hash a metric to a
destination worker in the "importsrv" package is actually doing a ton of
allocations.

To optimize this, I pulled in an external fnv hash implementation that
handles strings with zero allocations.  I also removed usages of
"MetricKey", as it does a lot of string manipulations that perform a ton
of allocations, in favor of just writing strings directly into the hash.

The benchmarks show nice improvements in the number of allocations, see
https://gist.github.com/noahgoldman/034fb2a52aadd60c383321fe9d3a4ba3 for
a comparison of benchmark results. There is a 2x reduction for a small
number (10 metrics) and a 6x reduction for 100 metrics, which is closer
to our average of around 180 currently.

* importsrv: group static tags into a single var block

* importsrv: Fix a->an in 'New' comment

* proxysrv: increase handler count test timeouts to 3s
  • Loading branch information
noahgoldman authored and sdboyer-stripe committed Apr 13, 2018
1 parent 68706c1 commit 5b34b8e
Show file tree
Hide file tree
Showing 24 changed files with 1,059 additions and 3 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Expand Up @@ -109,3 +109,7 @@
[[constraint]]
branch = "master"
name = "github.com/araddon/dateparse"

[[constraint]]
branch = "master"
name = "github.com/segmentio/fasthash"
11 changes: 11 additions & 0 deletions importsrv/options.go
@@ -0,0 +1,11 @@
package importsrv

import "github.com/stripe/veneur/trace"

// WithTraceClient sets the trace client for the server. Otherwise it uses
// trace.DefaultClient.
func WithTraceClient(c *trace.Client) Option {
return func(opts *options) {
opts.traceClient = c
}
}
147 changes: 147 additions & 0 deletions importsrv/server.go
@@ -0,0 +1,147 @@
// Package importsrv receives metrics over gRPC and sends them to workers.
//
// The Server wraps a grpc.Server, and implements the forwardrpc.Forward
// service. It receives batches of metrics, then hashes them to a specific
// "MetricIngester" and forwards them on.
package importsrv

import (
"fmt"
"net"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/segmentio/fasthash/fnv1a"
"golang.org/x/net/context" // This can be replace with "context" after Go 1.8 support is dropped
"google.golang.org/grpc"

"github.com/stripe/veneur/forwardrpc"
"github.com/stripe/veneur/samplers/metricpb"
"github.com/stripe/veneur/ssf"
"github.com/stripe/veneur/trace"
)

const (
responseDurationMetric = "import.response_duration_ns"
)

// MetricIngester reads metrics from protobufs
type MetricIngester interface {
IngestMetrics([]*metricpb.Metric)
}

// Server wraps a gRPC server and implements the forwardrpc.Forward service.
// It reads a list of metrics, and based on the provided key chooses a
// MetricIngester to send it to. A unique metric (name, tags, and type)
// should always be routed to the same MetricIngester.
type Server struct {
*grpc.Server
metricOuts []MetricIngester
opts *options
}

type options struct {
traceClient *trace.Client
}

// Option is returned by functions that serve as options to New, like
// "With..."
type Option func(*options)

// New creates an unstarted Server with the input MetricIngester's to send
// output to.
func New(metricOuts []MetricIngester, opts ...Option) *Server {
res := &Server{
Server: grpc.NewServer(),
metricOuts: metricOuts,
opts: &options{},
}

for _, opt := range opts {
opt(res.opts)
}

if res.opts.traceClient == nil {
res.opts.traceClient = trace.DefaultClient
}

forwardrpc.RegisterForwardServer(res.Server, res)

return res
}

// Serve starts a gRPC listener on the specified address and blocks while
// listening for requests. If listening is interrupted by some means other
// than Stop or GracefulStop being called, it returns a non-nil error.
func (s *Server) Serve(addr string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("failed to bind the import server to '%s': %v",
addr, err)
}

return s.Server.Serve(ln)
}

// Static maps of tags used in the SendMetrics handler
var (
grpcTags = map[string]string{"protocol": "grpc"}
responseGroupTags = map[string]string{
"protocol": "grpc",
"part": "group",
}
responseSendTags = map[string]string{
"protocol": "grpc",
"part": "send",
}
)

// SendMetrics takes a list of metrics and hashes each one (based on the
// metric key) to a specific metric ingester.
func (s *Server) SendMetrics(ctx context.Context, mlist *forwardrpc.MetricList) (*empty.Empty, error) {
span, _ := trace.StartSpanFromContext(ctx, "veneur.opentracing.importsrv.handle_send_metrics")
span.SetTag("protocol", "grpc")
defer span.ClientFinish(s.opts.traceClient)

dests := make([][]*metricpb.Metric, len(s.metricOuts))

// group metrics by their destination
groupStart := time.Now()
for _, m := range mlist.Metrics {
workerIdx := s.hashMetric(m) % uint32(len(dests))
dests[workerIdx] = append(dests[workerIdx], m)
}
span.Add(ssf.Timing(responseDurationMetric, time.Since(groupStart), time.Nanosecond, responseGroupTags))

// send each set of metrics to its destination. Since this is typically
// implemented with channels, batching the metrics together avoids
// repeated channel send operations
sendStart := time.Now()
for i, ms := range dests {
if len(ms) > 0 {
s.metricOuts[i].IngestMetrics(ms)
}
}

span.Add(
ssf.Timing(responseDurationMetric, time.Since(sendStart), time.Nanosecond, responseSendTags),
ssf.Count("import.metrics_total", float32(len(mlist.Metrics)), grpcTags),
)

return &empty.Empty{}, nil
}

// hashMetric returns a 32-bit hash from the input metric based on its name,
// type, and tags.
//
// The fnv1a package is used as opposed to fnv from the standard library, as
// it avoids allocations by not using the hash.Hash interface and by avoiding
// string to []byte conversions.
func (s *Server) hashMetric(m *metricpb.Metric) uint32 {
h := fnv1a.HashString32(m.Name)
h = fnv1a.AddString32(h, m.Type.String())
for _, tag := range m.Tags {
h = fnv1a.AddString32(h, tag)
}
return h
}
137 changes: 137 additions & 0 deletions importsrv/server_test.go
@@ -0,0 +1,137 @@
package importsrv

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stripe/veneur/forwardrpc"
"github.com/stripe/veneur/samplers/metricpb"
metrictest "github.com/stripe/veneur/samplers/metricpb/testutils"
"github.com/stripe/veneur/trace"
)

type testMetricIngester struct {
metrics []*metricpb.Metric
}

func (mi *testMetricIngester) IngestMetrics(ms []*metricpb.Metric) {
mi.metrics = append(mi.metrics, ms...)
}

func (mi *testMetricIngester) clear() {
mi.metrics = mi.metrics[:0]
}

// Test that sending the same metric to a Veneur results in it being hashed
// to the same worker every time
func TestSendMetrics_ConsistentHash(t *testing.T) {
ingesters := []*testMetricIngester{&testMetricIngester{}, &testMetricIngester{}}

casted := make([]MetricIngester, len(ingesters))
for i, ingester := range ingesters {
casted[i] = ingester
}
s := New(casted)

inputs := []*metricpb.Metric{
&metricpb.Metric{Name: "test.counter", Type: metricpb.Type_Counter, Tags: []string{"tag:1"}},
&metricpb.Metric{Name: "test.gauge", Type: metricpb.Type_Gauge},
&metricpb.Metric{Name: "test.histogram", Type: metricpb.Type_Histogram, Tags: []string{"type:histogram"}},
&metricpb.Metric{Name: "test.set", Type: metricpb.Type_Set},
&metricpb.Metric{Name: "test.gauge3", Type: metricpb.Type_Gauge},
}

// Send the same inputs many times
for i := 0; i < 10; i++ {
s.SendMetrics(context.Background(), &forwardrpc.MetricList{inputs})

assert.Equal(t, []*metricpb.Metric{inputs[0], inputs[4]},
ingesters[0].metrics, "Ingester 0 has the wrong metrics")
assert.Equal(t, []*metricpb.Metric{inputs[1], inputs[2], inputs[3]},
ingesters[1].metrics, "Ingester 1 has the wrong metrics")

for _, ingester := range ingesters {
ingester.clear()
}
}
}

func TestSendMetrics_Empty(t *testing.T) {
ingester := &testMetricIngester{}
s := New([]MetricIngester{ingester})
s.SendMetrics(context.Background(), &forwardrpc.MetricList{})

assert.Empty(t, ingester.metrics, "The server shouldn't have submitted "+
"any metrics")
}

func TestOptions_WithTraceClient(t *testing.T) {
c, err := trace.NewClient(trace.DefaultVeneurAddress)
if err != nil {
t.Fatalf("failed to initialize a trace client: %v", err)
}

s := New([]MetricIngester{}, WithTraceClient(c))
assert.Equal(t, c, s.opts.traceClient, "WithTraceClient didn't correctly "+
"set the trace client")
}

type noopChannelMetricIngester struct {
in chan []*metricpb.Metric
quit chan struct{}
}

func newNoopChannelMetricIngester() *noopChannelMetricIngester {
return &noopChannelMetricIngester{
in: make(chan []*metricpb.Metric),
quit: make(chan struct{}),
}
}

func (mi *noopChannelMetricIngester) start() {
go func() {
for {
select {
case <-mi.in:
case <-mi.quit:
return
}
}
}()
}

func (mi *noopChannelMetricIngester) stop() {
mi.quit <- struct{}{}
}

func (mi *noopChannelMetricIngester) IngestMetrics(ms []*metricpb.Metric) {
mi.in <- ms
}

func BenchmarkImportServerSendMetrics(b *testing.B) {
rand.Seed(time.Now().Unix())

metrics := metrictest.RandomForwardMetrics(10000)
for _, inputSize := range []int{10, 100, 1000, 10000} {
ingesters := make([]MetricIngester, 100)
for i := range ingesters {
ingester := newNoopChannelMetricIngester()
ingester.start()
defer ingester.stop()
ingesters[i] = ingester
}
s := New(ingesters)
ctx := context.Background()
input := &forwardrpc.MetricList{Metrics: metrics[:inputSize]}

b.Run(fmt.Sprintf("InputSize=%d", inputSize), func(b *testing.B) {
for i := 0; i < b.N; i++ {
s.SendMetrics(ctx, input)
}
})
}
}
4 changes: 2 additions & 2 deletions proxysrv/server_test.go
Expand Up @@ -187,7 +187,7 @@ func TestCountActiveHandlers(t *testing.T) {
tick := time.NewTicker(10 * time.Nanosecond)
defer tick.Stop()

timeout := time.NewTicker(100 * time.Millisecond)
timeout := time.NewTicker(3 * time.Second)
defer timeout.Stop()
for int64(n) != atomic.LoadInt64(s.activeProxyHandlers) {
select {
Expand All @@ -204,7 +204,7 @@ func TestCountActiveHandlers(t *testing.T) {

// Stop all of the servers and check that the counter goes to zero
close(done)
timeout = time.NewTicker(100 * time.Millisecond)
timeout = time.NewTicker(3 * time.Second)
defer timeout.Stop()
for atomic.LoadInt64(s.activeProxyHandlers) != 0 {
select {
Expand Down
12 changes: 12 additions & 0 deletions vendor/github.com/segmentio/fasthash/.circleci/config.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions vendor/github.com/segmentio/fasthash/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/segmentio/fasthash/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5b34b8e

Please sign in to comment.