Skip to content

Commit

Permalink
SignalFX converts service checks into a gauge (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
aubrey-stripe committed Apr 12, 2018
1 parent 24a8c8b commit 68706c1
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Improvements
* Receiving SSF in UDP packets now happens on `num_readers` goroutines. Thanks, [antifuchs](https://github.com/antifuchs)
* Updated [SignalFx library](https://github.com/signalfx/golib) dependency so that compression is enabled by default, saving significant time on large metric bodies. Thanks, [gphat](https://github.com/gphat)
* SignalFX sink can now handle and convert ssf service checks (represented as a gauge). Thanks, [stealthcode](https://github.com/stealthcode)!
* Decreased logging output of veneur-proxy. Thanks, [gphat](https://github.com/gphat)!
* Better warnings when invalid flag combinations are passed to `veneur-emit`. Thanks, [sdboyer](https://github.com/sdboyer)!
* Revamped how sinks handle DogStatsD's events and service checks. Thanks, [gphat](https://github.com/gphat)
Expand Down
11 changes: 11 additions & 0 deletions samplers/derived.go
@@ -0,0 +1,11 @@
package samplers

import (
"github.com/stripe/veneur/ssf"
)

// DerivedMetricsProcessor processes any metric created from events or service checks into
// the worker channels for flushing
type DerivedMetricsProcessor interface {
SendSample(sample *ssf.SSFSample) error
}
4 changes: 2 additions & 2 deletions samplers/metrictype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions samplers/samplers.go
Expand Up @@ -21,6 +21,8 @@ const (
CounterMetric MetricType = iota
// GaugeMetric is a gauge
GaugeMetric
// StatusMetric is a status (synonymous with a service check)
StatusMetric
)

// RouteInformation is a key-only map indicating sink names that are
Expand Down Expand Up @@ -253,6 +255,72 @@ func NewGauge(Name string, Tags []string) *Gauge {
return &Gauge{Name: Name, Tags: Tags}
}

// StatusCheck retains whatever the last value was.
type StatusCheck struct {
Name string
Tags []string
value float64
}

// Sample takes on whatever value is passed in as a sample.
func (g *StatusCheck) Sample(sample float64, sampleRate float32) {
g.value = sample
}

// Flush generates an InterMetric from the current state of this status check.
func (g *StatusCheck) Flush() []InterMetric {
tags := make([]string, len(g.Tags))
copy(tags, g.Tags)
return []InterMetric{{
Name: g.Name,
Timestamp: time.Now().Unix(),
Value: float64(g.value),
Tags: tags,
Type: StatusMetric,
Sinks: routeInfo(tags),
}}
}

// Export converts a StatusCheck into a JSONMetric.
func (g *StatusCheck) Export() (JSONMetric, error) {
var buf bytes.Buffer

err := binary.Write(&buf, binary.LittleEndian, g.value)
if err != nil {
return JSONMetric{}, err
}

return JSONMetric{
MetricKey: MetricKey{
Name: g.Name,
Type: "status",
JoinedTags: strings.Join(g.Tags, ","),
},
Tags: g.Tags,
Value: buf.Bytes(),
}, nil
}

// Combine is pretty naïve for StatusChecks, as it just overwrites the value.
func (g *StatusCheck) Combine(other []byte) error {
var otherValue float64
buf := bytes.NewReader(other)
err := binary.Read(buf, binary.LittleEndian, &otherValue)

if err != nil {
return err
}

g.value = otherValue

return nil
}

// NewStatusCheck genearaaaa who am I kidding just getting rid of the warning.
func NewStatusCheck(Name string, Tags []string) *StatusCheck {
return &StatusCheck{Name: Name, Tags: Tags}
}

// Set is a list of unique values seen.
type Set struct {
Name string
Expand Down
2 changes: 1 addition & 1 deletion server.go
Expand Up @@ -308,7 +308,7 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) {
for _, perTag := range conf.SignalfxPerTagAPIKeys {
byTagClients[perTag.Name] = signalfx.NewClient(conf.SignalfxEndpointBase, perTag.APIKey, *ret.HTTPClient)
}
sfxSink, err := signalfx.NewSignalFxSink(conf.SignalfxHostnameTag, conf.Hostname, ret.TagsAsMap, log, fallback, conf.SignalfxVaryKeyBy, byTagClients)
sfxSink, err := signalfx.NewSignalFxSink(conf.SignalfxHostnameTag, conf.Hostname, ret.TagsAsMap, log, fallback, conf.SignalfxVaryKeyBy, byTagClients, metricSink)
if err != nil {
return ret, err
}
Expand Down
2 changes: 2 additions & 0 deletions sinks/datadog/datadog.go
Expand Up @@ -138,6 +138,8 @@ func (dd *DatadogMetricSink) Flush(ctx context.Context, interMetrics []samplers.
return nil
}

// FlushOtherSamples serializes Events or Service Checks directly to datadog.
// May make 2 external calls to the datadog client.
func (dd *DatadogMetricSink) FlushOtherSamples(ctx context.Context, samples []ssf.SSFSample) {

events := []DDEvent{}
Expand Down
180 changes: 123 additions & 57 deletions sinks/signalfx/signalfx.go
Expand Up @@ -88,6 +88,7 @@ type SignalFxSink struct {
log *logrus.Logger
traceClient *trace.Client
excludedTags map[string]struct{}
derivedMetrics samplers.DerivedMetricsProcessor
}

// A DPClient is a client that can be used to submit signalfx data
Expand All @@ -106,7 +107,7 @@ func NewClient(endpoint, apiKey string, client http.Client) DPClient {
}

// NewSignalFxSink creates a new SignalFx sink for metrics.
func NewSignalFxSink(hostnameTag string, hostname string, commonDimensions map[string]string, log *logrus.Logger, client DPClient, varyBy string, perTagClients map[string]DPClient) (*SignalFxSink, error) {
func NewSignalFxSink(hostnameTag string, hostname string, commonDimensions map[string]string, log *logrus.Logger, client DPClient, varyBy string, perTagClients map[string]DPClient, derivedMetrics samplers.DerivedMetricsProcessor) (*SignalFxSink, error) {
return &SignalFxSink{
defaultClient: client,
clientsByTagValue: perTagClients,
Expand All @@ -115,6 +116,7 @@ func NewSignalFxSink(hostnameTag string, hostname string, commonDimensions map[s
commonDimensions: commonDimensions,
log: log,
varyBy: varyBy,
derivedMetrics: derivedMetrics,
}, nil
}

Expand Down Expand Up @@ -188,13 +190,17 @@ func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.Inte
for k := range sfx.excludedTags {
delete(dims, k)
}
delete(dims, "veneursinkonly")

var point *datapoint.Datapoint
if metric.Type == samplers.GaugeMetric {
switch metric.Type {
case samplers.GaugeMetric:
point = sfxclient.GaugeF(metric.Name, dims, metric.Value)
} else if metric.Type == samplers.CounterMetric {
case samplers.CounterMetric:
// TODO I am not certain if this should be a Counter or a Cumulative
point = sfxclient.Counter(metric.Name, dims, int64(metric.Value))
case samplers.StatusMetric:
point = sfxclient.GaugeF(metric.Name, dims, metric.Value)
}
coll.addPoint(metricKey, point)
numPoints++
Expand All @@ -211,69 +217,43 @@ func (sfx *SignalFxSink) Flush(ctx context.Context, interMetrics []samplers.Inte
return err
}

// FlushEventsChecks sends events to SignalFx. It does not support checks.
var successSpanTags = map[string]string{"sink": "signalfx", "results": "success"}
var failureSpanTags = map[string]string{"sink": "signalfx", "results": "failure"}

// FlushOtherSamples sends events to SignalFx. Event type samples will be serialized as SFX
// Events directly. Service check type samples will be enqueued as a gauge metric during
// metric sink flushing.
func (sfx *SignalFxSink) FlushOtherSamples(ctx context.Context, samples []ssf.SSFSample) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.ClientFinish(sfx.traceClient)

var countFailed = 0
var countSuccess = 0
for _, sample := range samples {

if _, ok := sample.Tags[dogstatsd.EventIdentifierKey]; !ok {
// This isn't an event, just continue
continue
}

dims := map[string]string{}

// Copy common dimensions in
for k, v := range sfx.commonDimensions {
dims[k] = v
}
// And hostname
dims[sfx.hostnameTag] = sfx.hostname

for k, v := range sample.Tags {
if k == dogstatsd.EventIdentifierKey {
// Don't copy this tag
continue
switch ddOtherSampleKind(sample.Tags) {
case ddSampleEvent:
sfx.reportEvent(ctx, &sample)
case ddSampleServiceCheck:
err := sfx.reportServiceCheck(&sample)
if err != nil {
countFailed++
} else {
countSuccess++
}
dims[k] = v
}

for k := range sfx.excludedTags {
delete(dims, k)
}
name := sample.Name
if len(name) > EventNameMaxLength {
name = name[0:EventNameMaxLength]
}
message := sample.Message
if len(message) > EventDescriptionMaxLength {
message = message[0:EventDescriptionMaxLength]
}
// Datadog requires some weird chars to do markdown… SignalFx does not so
// we'll chop those out
// https://docs.datadoghq.com/graphing/event_stream/#markdown-events
message = strings.Replace(message, "%%% \n", "", 1)
message = strings.Replace(message, "\n %%%", "", 1)
// Sometimes there are leading and trailing spaces
message = strings.TrimSpace(message)

ev := event.Event{
EventType: name,
Category: event.USERDEFINED,
Dimensions: dims,
Timestamp: time.Unix(sample.Timestamp, 0),
Properties: map[string]interface{}{
"description": message,
},
// report svchk
default:
// currently only supports a service check and an event; other types are discarded
continue
}
// TODO: Split events out the same way as points
sfx.defaultClient.AddEvents(ctx, []*event.Event{&ev})
}
if countSuccess > 0 {
span.Add(ssf.Count(sinks.ServiceCheckConversionCount, float32(countSuccess), successSpanTags))
}
if countFailed > 0 {
span.Add(ssf.Count(sinks.ServiceCheckConversionCount, float32(countFailed), failureSpanTags))
}
}

// SetTagExcludes sets the excluded tag names. Any tags with the
// SetExcludedTags sets the excluded tag names. Any tags with the
// provided key (name) will be excluded.
func (sfx *SignalFxSink) SetExcludedTags(excludes []string) {

Expand All @@ -283,3 +263,89 @@ func (sfx *SignalFxSink) SetExcludedTags(excludes []string) {
}
sfx.excludedTags = tagsSet
}

type ddSampleKind int

const (
ddSampleUnknown ddSampleKind = iota
ddSampleEvent
ddSampleServiceCheck
)

func ddOtherSampleKind(tags map[string]string) ddSampleKind {
if _, ok := tags[dogstatsd.CheckIdentifierKey]; ok {
return ddSampleServiceCheck
}
if _, ok := tags[dogstatsd.EventIdentifierKey]; ok {
return ddSampleEvent
}
return ddSampleUnknown
}

func (sfx *SignalFxSink) reportServiceCheck(sample *ssf.SSFSample) error {
name := sample.Name
tags := map[string]string{}
for k, v := range sample.Tags {
if k != dogstatsd.CheckIdentifierKey {
tags[k] = v
}
}
tags[sfx.hostnameTag] = sfx.hostname
tags["veneursinkonly"] = sfx.Name()
for k, v := range sfx.commonDimensions {
tags[k] = v
}
statusCheckSample := ssf.Status(name, sample.Status, tags)
return sfx.derivedMetrics.SendSample(statusCheckSample)
}

func (sfx *SignalFxSink) reportEvent(ctx context.Context, sample *ssf.SSFSample) {
// Copy common dimensions in
dims := map[string]string{}
for k, v := range sfx.commonDimensions {
dims[k] = v
}
// And hostname
dims[sfx.hostnameTag] = sfx.hostname

for k, v := range sample.Tags {
if k == dogstatsd.EventIdentifierKey {
// Don't copy this tag
continue
}
dims[k] = v
}

for k := range sfx.excludedTags {
delete(dims, k)
}
name := sample.Name
if len(name) > EventNameMaxLength {
name = name[0:EventNameMaxLength]
}
message := sample.Message
if len(message) > EventDescriptionMaxLength {
message = message[0:EventDescriptionMaxLength]
}
// Datadog requires some weird chars to do markdown… SignalFx does not so
// we'll chop those out
// https://docs.datadoghq.com/graphing/event_stream/#markdown-events
message = strings.Replace(message, "%%% \n", "", 1)
message = strings.Replace(message, "\n %%%", "", 1)
// Sometimes there are leading and trailing spaces
message = strings.TrimSpace(message)

ev := event.Event{
EventType: name,
Category: event.USERDEFINED,
Dimensions: dims,
Timestamp: time.Unix(sample.Timestamp, 0),
Properties: map[string]interface{}{
"description": message,
},
}

// TODO: Split events out the same way as points
// report evt
sfx.defaultClient.AddEvents(ctx, []*event.Event{&ev})
}

0 comments on commit 68706c1

Please sign in to comment.