Skip to content

Commit

Permalink
Merge branch 'master' into nickt.go-mod
Browse files Browse the repository at this point in the history
* master:
  Add benchmark for protocol.ValidTrace (stripe#692)
  Report objective metric for spans (stripe#690)
  Don't log X-Ray write errors at Error level (stripe#689)
  Ensure SSF packets contain a span or metric before passing to sinks (stripe#688)
  Add OT trace information on outgoing HTTP reqs (stripe#685)
  Fix sfx submissions reporting a timeout when one encounters an error (stripe#684)
  Open CHANGELOG for v12.0.0 (stripe#683)
  Update CHANGELOG (stripe#682)
  Update base images for public Docker images to Go 1.11.4 (stripe#681)
  Allow configuring SFX flush batch size (stripe#680)
  Update team members on contributing page (stripe#679)
  Support tag filters in Datadog (stripe#595)
  Update gRPC, x/sys and protobuf (stripe#589)
  Build public docker images with 1.11.3 and alpine 3.8 (stripe#593)
  Document trace API (stripe#581)
  Add support for Datadog distribution metric type (stripe#590)
  Open CHANGELOG for v11.0.0 (stripe#592)
  Update CHANGELOG for v10.0.0 release (stripe#591)
  • Loading branch information
prudhvi committed Feb 15, 2019
2 parents d92a1ee + 5edc4c9 commit df2294e
Show file tree
Hide file tree
Showing 382 changed files with 89,528 additions and 22,346 deletions.
30 changes: 26 additions & 4 deletions CHANGELOG.md
@@ -1,13 +1,35 @@
# 10.0.0, in progress
# 12.0.0, in progress

## Added

* The OpenTracing implementation's `Tracer.Inject` in the `trace` package now sets HTTP headers in a way that tools like [Envoy](https://www.envoyproxy.io/) can propagate on traces. Thanks, [antifuchs](https://github.com/antifuchs)!
* SSF packets are now validated to ensure they contain either a valid span or at least one metric. The metric `veneur.worker.ssf.empty_total` tracks the number of empty SSF packets encountered, which indicates a client error. Thanks, [tummychow](https://github.com/tummychow) and [aditya](https://github.com/chimeracoder)!
* SSF indicator spans can now report an additional "objective" metric, tagged with their service and name. Thanks, [tummychow](https://github.com/tummychow)!

## Updated
* The metric `veneur.sink.spans_dropped_total` now includes packets that were skipped due to UDP write errors. Thanks, [aditya](https://github.com/chimeracoder)!

## Bugfixes
* The signalfx client no longer reports a timeout when submission to the datapoint API endpoint encounters an error. Thanks, [antifuchs](https://github.com/antifuchs)!
* SSF packets without a name are no longer considered valid for `protocol.ValidTrace`. Thanks, [tummychow](https://github.com/tummychow)!

# 11.0.0, 2019-01-22

## Added
* Datadog's [distribution](https://docs.datadoghq.com/developers/metrics/distributions/) type for DogStatsD is now supported and treated as a plain histogram for compatibility. Thanks, [gphat](https://github.com/gphat)!
* Add support for `tags_exclude` to the DataDog metrics sink. Thanks, [mhamrah](https://github.com/mhamrah)!
* The `github.com/stripe/veneur/trace` package has brand new and much more extensive [documentation](https://godoc.org/github.com/stripe/veneur/trace)! Thanks, [antifuchs](https://github.com/antifuchs)!
* New configuration setting `signalfx_flush_max_per_body` that allows limiting the payload of HTTP POST bodies containing data points destined for SignalFx. Thanks, [antifuchs](https://github.com/antifuchs)!

# 10.0.0, 2018-12-19

## Added
* The new X-Ray sink provides support for [AWS X-Ray](https://aws.amazon.com/xray/) as a tracing backend. Thanks, [gphat](https://github.com/gphat) and [aditya](https://github.com/chimeracoder)!
* A new package `github.com/stripe/veneur/trace/testbackend` contains two trace client backends that can be used to test the trace data emitted by applications. Thanks, [antifuchs](https://github.com/antifuchs)!

## Updated
* Updated the vendored version of x/net, which picks up a package rename that
can lead issues when integrating veneur into other codebases. Thanks
[nicktrav](https://github.com/nicktrav)!
* Updated the vendored version of x/net, which picks up a package rename that can lead issues when integrating veneur into other codebases. Thanks, [nicktrav](https://github.com/nicktrav)!
* Updated the vendored versions of x/sys, protobuf, and gRPC. Thanks [nicktrav](https://github.com/nicktrav)!

# 9.0.0, 2018-11-08

Expand Down
9 changes: 5 additions & 4 deletions CONTRIBUTING.md
Expand Up @@ -15,15 +15,16 @@ All pull requests will be reviewed by someone on Stripe's Observability team. At

* aditya-stripe (aka chimeracoder)
* asf-stripe
* cory-stripe (aka gphat)
* joshu-stripe
* krisreeves-stripe
* aubrey-stripe
* sjung-stripe
* sdboyer-stripe

There's no need to pick a reviewer; if you submit a pull request, we'll see it and figure out who should review it.

If your pull request involves large or sensitive changes, it will probably need to be reviewed by either Aditya Mukerjee (chimeracoder || aditya-stripe) or Cory Watson (cory-stripe || gphat).
If your pull request involves large or sensitive changes, it will probably need to be reviewed by Aditya Mukerjee (chimeracoder || aditya-stripe).

For larger changes, there may be a delay in reviewing/merging if Aditya or Cory is unavailable, or if we need to complete merge some other related work first as a prerequisite. If you have any questions along the way, though, feel free to ask on the thread, and we'll be happy to help out.
For larger changes, there may be a delay in reviewing/merging if Aditya is unavailable, or if we need to complete merge some other related work first as a prerequisite. If you have any questions along the way, though, feel free to ask on the thread, and we'll be happy to help out.

## Help us help you!

Expand Down
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -58,7 +58,7 @@ More generically, Veneur is a convenient sink for various observability primitiv

Once you cross a threshold into dozens, hundreds or (gasp!) thousands of machines emitting metric data for an application, you've moved into that world where data about individual hosts is uninteresting except in aggregate form. Instead of paying to store tons of data points and then aggregating them later at read-time, Veneur can calculate global aggregates, like percentiles and forward those along to your time series database, etc.

Veneur is also a StatsD or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) protocol transport, fowarding the locally collected metrics over more reliable TCP
Veneur is also a StatsD or [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) protocol transport, forwarding the locally collected metrics over more reliable TCP
implementations.

Here are some examples of why Stripe and other companies are using Veneur today:
Expand Down Expand Up @@ -121,6 +121,10 @@ Because Veneur is built to handle lots and lots of data, it uses approximate his

Datadog's DogStatsD — and StatsD — uses an exact histogram which retains all samples and is reset every flush period. This means that there is a loss of precision when using Veneur, but the resulting percentile values are meant to be more representative of a global view.

### Datadog Distributions

Because Veneur already handles "global" histograms, any DogStatsD packets received with type `d`[Datadog's distribution type](https://docs.datadoghq.com/developers/metrics/distributions/) — will be considered a histogram and therefore compatible with all sinks. Veneur does **not** send any metrics to Datadog typed as a Datadog-native distribution.

## Approximate Sets

Veneur uses [HyperLogLogs](https://github.com/clarkduvall/hyperloglog) for approximate unique sets. These are a very efficient unique counter with fixed memory consumption.
Expand Down
2 changes: 2 additions & 0 deletions config.go
Expand Up @@ -55,12 +55,14 @@ type Config struct {
NumReaders int `yaml:"num_readers"`
NumSpanWorkers int `yaml:"num_span_workers"`
NumWorkers int `yaml:"num_workers"`
ObjectiveSpanTimerName string `yaml:"objective_span_timer_name"`
OmitEmptyHostname bool `yaml:"omit_empty_hostname"`
Percentiles []float64 `yaml:"percentiles"`
ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"`
SentryDsn string `yaml:"sentry_dsn"`
SignalfxAPIKey string `yaml:"signalfx_api_key"`
SignalfxEndpointBase string `yaml:"signalfx_endpoint_base"`
SignalfxFlushMaxPerBody int `yaml:"signalfx_flush_max_per_body"`
SignalfxHostnameTag string `yaml:"signalfx_hostname_tag"`
SignalfxMetricNamePrefixDrops []string `yaml:"signalfx_metric_name_prefix_drops"`
SignalfxMetricTagPrefixDrops []string `yaml:"signalfx_metric_tag_prefix_drops"`
Expand Down
15 changes: 14 additions & 1 deletion example.yaml
Expand Up @@ -68,7 +68,12 @@ grpc_address: "0.0.0.0:8128"
# The name of timer metrics that "indicator" spans should be tracked
# under. If this is unset, veneur doesn't report an additional timer
# metric for indicator spans.
indicator_span_timer_name: "indicator_span.duration_ms"
indicator_span_timer_name: "indicator_span.duration_ns"

# The name of timer metrics that objectives, derived from indicator
# spans, should be tracked under. If this is unset, veneur doesn't
# report an additional timer metric for indicator spans.
objective_span_timer_name: "objective_span.duration_ns"

# == METRICS CONFIGURATION ==

Expand Down Expand Up @@ -272,6 +277,14 @@ signalfx_metric_name_prefix_drops:
signalfx_metric_tag_prefix_drops:
- ""

# The maximum number of datapoints in a single HTTP request to
# signalfx. On flush time, if veneur would flush more than the number
# configured here, it breaks the flushes apart into batches of this
# configured max size and submits them in parallel HTTP requests. If
# set to zero (the default), veneur makes a single HTTP request per
# signalfx flush endpoint.
signalfx_flush_max_per_body: 0

# == AWS X-Ray ==
# X-Ray can be a sink for trace spans.

Expand Down
1 change: 1 addition & 0 deletions forward_test.go
Expand Up @@ -111,6 +111,7 @@ func TestE2EForwardingIndicatorMetrics(t *testing.T) {
span := &ssf.SSFSpan{
Id: 5,
TraceId: 5,
Name: "foo",
Service: "indicator_testing",
StartTimestamp: start.UnixNano(),
EndTimestamp: end.UnixNano(),
Expand Down
3 changes: 3 additions & 0 deletions http/http.go
Expand Up @@ -131,6 +131,9 @@ func (tripper *TraceRoundTripper) RoundTrip(req *http.Request) (*http.Response,
span.SetTag("action", tripper.prefix)
defer span.ClientFinish(tripper.tc)

// Add OpenTracing headers to the request, so downstream reqs can be identified:
trace.GlobalTracer.InjectRequest(span.Trace, req)

hct := newHTTPClientTracer(span.Attach(req.Context()), tripper.tc, tripper.prefix)
req = req.WithContext(httptrace.WithClientTrace(req.Context(), hct.getClientTrace()))
defer hct.finishSpan()
Expand Down
103 changes: 99 additions & 4 deletions parser_test.go
Expand Up @@ -52,6 +52,7 @@ func TestValidTrace(t *testing.T) {

trace.Id = 1
trace.TraceId = 1
trace.Name = "foo"
trace.StartTimestamp = 1
trace.EndTimestamp = 5
assert.True(t, protocol.ValidTrace(trace))
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestParseSSFValidTraceInvalidMetric(t *testing.T) {
trace := &ssf.SSFSpan{}
trace.Id = 1
trace.TraceId = 1
trace.Name = "foo"
trace.StartTimestamp = 1
trace.EndTimestamp = 5

Expand Down Expand Up @@ -189,7 +191,7 @@ func TestParseSSFIndicatorSpan(t *testing.T) {
require.NotNil(t, inSpan)
require.NoError(t, protocol.ValidateTrace(span))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name")
metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name", "")
assert.NoError(t, err)
if assert.Equal(t, 1, len(metrics)) {
m := metrics[0]
Expand Down Expand Up @@ -231,7 +233,7 @@ func TestParseSSFIndicatorSpanWithError(t *testing.T) {
require.NotNil(t, span)
require.NoError(t, protocol.ValidateTrace(span))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name")
metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name", "")
assert.NoError(t, err)
if assert.Equal(t, 1, len(metrics)) {
m := metrics[0]
Expand All @@ -248,6 +250,91 @@ func TestParseSSFIndicatorSpanWithError(t *testing.T) {
}
}

func TestParseSSFIndicatorObjective(t *testing.T) {
duration := 5 * time.Second
start := time.Now()
end := start.Add(duration)

span := &ssf.SSFSpan{}
span.Id = 1
span.TraceId = 5
span.Name = "foo"
span.StartTimestamp = start.UnixNano()
span.EndTimestamp = end.UnixNano()
span.Indicator = true
span.Service = "bar-srv"
span.Tags = map[string]string{
"this-tag": "definitely gets ignored",
"this-other-tag": "also gets dropped",
}
span.Metrics = make([]*ssf.SSFSample, 0)
buff, err := proto.Marshal(span)
assert.Nil(t, err)
inSpan, err := protocol.ParseSSF(buff)
assert.NoError(t, err)
require.NotNil(t, inSpan)
require.NoError(t, protocol.ValidateTrace(span))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "", "timer_name")
assert.NoError(t, err)
if assert.Equal(t, 1, len(metrics)) {
m := metrics[0]
assert.Equal(t, "timer_name", m.Name)
assert.Equal(t, "histogram", m.Type)
assert.InEpsilon(t, float32(duration/time.Nanosecond), m.Value, 0.001)
if assert.Equal(t, 3, len(m.Tags)) {
var tags sort.StringSlice = m.Tags
sort.Sort(tags)
assert.Equal(t, "error:false", tags[0])
assert.Equal(t, "objective:foo", tags[1])
assert.Equal(t, fmt.Sprintf("service:%s", span.Service), tags[2])
}
}
}

func TestParseSSFIndicatorObjectiveTag(t *testing.T) {
duration := 5 * time.Second
start := time.Now()
end := start.Add(duration)

span := &ssf.SSFSpan{}
span.Id = 1
span.TraceId = 5
span.Name = "foo"
span.StartTimestamp = start.UnixNano()
span.EndTimestamp = end.UnixNano()
span.Indicator = true
span.Service = "bar-srv"
span.Tags = map[string]string{
"this-tag": "definitely gets ignored",
"this-other-tag": "also gets dropped",
"ssf_objective": "bar",
}
span.Metrics = make([]*ssf.SSFSample, 0)
buff, err := proto.Marshal(span)
assert.Nil(t, err)
inSpan, err := protocol.ParseSSF(buff)
assert.NoError(t, err)
require.NotNil(t, inSpan)
require.NoError(t, protocol.ValidateTrace(span))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "", "timer_name")
assert.NoError(t, err)
if assert.Equal(t, 1, len(metrics)) {
m := metrics[0]
assert.Equal(t, "timer_name", m.Name)
assert.Equal(t, "histogram", m.Type)
assert.InEpsilon(t, float32(duration/time.Nanosecond), m.Value, 0.001)
if assert.Equal(t, 3, len(m.Tags)) {
var tags sort.StringSlice = m.Tags
sort.Sort(tags)
assert.Equal(t, "error:false", tags[0])
assert.Equal(t, "objective:bar", tags[1])
assert.Equal(t, fmt.Sprintf("service:%s", span.Service), tags[2])
}
}
}

func TestParseSSFIndicatorSpanNotNamed(t *testing.T) {
duration := 5 * time.Second
start := time.Now()
Expand All @@ -273,7 +360,7 @@ func TestParseSSFIndicatorSpanNotNamed(t *testing.T) {
require.NotNil(t, inSpan)
require.NoError(t, protocol.ValidateTrace(inSpan))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "")
metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "", "")
assert.NoError(t, err)
assert.Equal(t, 0, len(metrics))
}
Expand Down Expand Up @@ -304,7 +391,7 @@ func TestParseSSFNonIndicatorSpan(t *testing.T) {
require.NoError(t, err)
require.NoError(t, protocol.ValidateTrace(inSpan))

metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name")
metrics, err := samplers.ConvertIndicatorMetrics(inSpan, "timer_name", "objective_name")
assert.NoError(t, err)
assert.Equal(t, 0, len(metrics))
}
Expand Down Expand Up @@ -448,6 +535,14 @@ func TestParserTimer(t *testing.T) {
assert.Equal(t, "timer", m.Type, "Type")
}

func TestParserDistribution(t *testing.T) {
m, _ := samplers.ParseMetric([]byte("a.b.c:0.1716441474854946|d|#filter:flatulent"))
assert.NotNil(t, m, "Got nil metric!")
assert.Equal(t, "a.b.c", m.Name, "Name")
assert.Equal(t, float64(0.1716441474854946), m.Value, "Value")
assert.Equal(t, "histogram", m.Type, "Type")
}

func TestParserTimerFloat(t *testing.T) {
m, _ := samplers.ParseMetric([]byte("a.b.c:1.234|ms"))
assert.NotNil(t, m, "Got nil metric!")
Expand Down
21 changes: 10 additions & 11 deletions protocol/wire.go
Expand Up @@ -77,20 +77,19 @@ func (e *InvalidTrace) Error() string {
return fmt.Sprintf("not a valid trace span: %#v", e.span)
}

// ValidTrace takes in an SSF span and determines if it is valid or not.
// It also makes sure the Tags is non-nil, since we use it later.
// ValidTrace returns true if an SSFSpan contains all data necessary
// to synthesize a span that can be used as part of a trace.
func ValidTrace(span *ssf.SSFSpan) bool {
ret := true
ret = ret && span.Id != 0
ret = ret && span.TraceId != 0
ret = ret && span.StartTimestamp != 0
ret = ret && span.EndTimestamp != 0
return ret
return span.Id != 0 &&
span.TraceId != 0 &&
span.StartTimestamp != 0 &&
span.EndTimestamp != 0 &&
span.Name != ""
}

// ValidateTrace takes in an SSF span and determines if it is valid or
// not. It also makes sure the Tags is non-nil, since we use it
// later. If the span is not valid, it returns an error.
// ValidateTrace is identical to ValidTrace, except instead of returning
// a boolean, it returns a non-nil error if the SSFSpan cannot be interpreted
// as a span, and nil otherwise.
func ValidateTrace(span *ssf.SSFSpan) error {
if !ValidTrace(span) {
return &InvalidTrace{span}
Expand Down
45 changes: 45 additions & 0 deletions protocol/wire_test.go
Expand Up @@ -132,6 +132,51 @@ func TestReadSSFStreamBad(t *testing.T) {
}
}

func BenchmarkValidTrace(b *testing.B) {
const Len = 1000
input := make([]*ssf.SSFSpan, Len)
for i, _ := range input {
p := make([]byte, 10)
_, err := rand.Read(p)
if err != nil {
b.Fatalf("Error generating data: %s", err)
}
msg := &ssf.SSFSpan{
Version: 1,
TraceId: 1,
Id: 2,
ParentId: 3,
StartTimestamp: time.Now().Unix(),
EndTimestamp: time.Now().Add(5 * time.Second).Unix(),
Tags: map[string]string{
string(p[:4]): string(p[5:]),
string(p[3:7]): string(p[1:3]),
},
}

switch r := i % 5; r {
case 1:
msg.Id = 0
case 2:
msg.TraceId = 0
case 3:
msg.StartTimestamp = 0
case 4:
msg.EndTimestamp = 0
default:
// do nothing
}

input[i] = msg
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
_ = ValidTrace(input[i%Len])
}

}

func BenchmarkParseSSF(b *testing.B) {
const Len = 1000
input := make([][]byte, Len)
Expand Down
2 changes: 1 addition & 1 deletion public-docker-images/Dockerfile-alpine
@@ -1,4 +1,4 @@
FROM golang:1.11.0-alpine3.6 AS base
FROM golang:1.11.4-alpine3.8 AS base
MAINTAINER The Stripe Observability Team <support@stripe.com>


Expand Down
2 changes: 1 addition & 1 deletion public-docker-images/Dockerfile-debian-sid
@@ -1,4 +1,4 @@
FROM golang:1.11.0 AS base
FROM golang:1.11.4 AS base
MAINTAINER The Stripe Observability Team <support@stripe.com>


Expand Down

0 comments on commit df2294e

Please sign in to comment.