Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge branch 'wyndhblb-dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Feb 4, 2015
2 parents 0f9a4d8 + 4a5f671 commit 12bef7b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 68 deletions.
6 changes: 5 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ Features

* Added ability for HttpListenInput to capture specified HTTP request headers
and write them as message fields.


* Added ability for Statsd input handler to treat a malformed stat line
one at time without skipping all the "good" stats in a multi-line input
(more like statsd itself).

* Added `message_interval` setting support to heka-flood test configuration.

0.8.3 (2015-01-08)
Expand Down
106 changes: 58 additions & 48 deletions plugins/statsd/statsd_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012-2014
# Portions created by the Initial Developer are Copyright (C) 2012-2015
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
Expand Down Expand Up @@ -125,56 +125,67 @@ func (s *StatsdInput) Stop() {
// Parses received raw statsd bytes data and converts it into a StatPacket
// object that can be passed to the StatMonitor.
func (s *StatsdInput) handleMessage(message []byte) {
stats, err := parseMessage(message)
if err != nil {
s.ir.LogError(fmt.Errorf("Can not parse message: %s", message))
return
stats, badLines := parseMessage(message)
for _, line := range badLines {
s.ir.LogError(fmt.Errorf("can't parse message: %s", string(line)))
}

for _, stat := range stats {
if !s.statAccum.DropStat(stat) {
s.ir.LogError(fmt.Errorf("Undelivered stat: %v", stat))
s.ir.LogError(fmt.Errorf("undelivered stat: %+v", stat))
}
}
}

func parseMessage(message []byte) ([]Stat, error) {
func parseMessage(message []byte) ([]Stat, [][]byte) {
message = bytes.Trim(message, " \t\n")

stats := make([]Stat, 0, int(math.Max(1, float64(bytes.Count(message, []byte("\n"))))))

errFmt := "Invalid statsd message %s"

var lines [][]byte
if bytes.IndexByte(message, '\n') > -1 {
lines = bytes.Split(message, []byte("\n"))
} else {
lines = [][]byte{message}
}

for _, line := range lines {
badLines := make([][]byte, 0, 2)

for _, s_line := range lines {
//trim white space
line := bytes.Trim(s_line, " \t\n")

// skip blank lines
if len(line) == 0 {
continue
}

colonPos := bytes.IndexByte(line, ':')
if colonPos == -1 {
return nil, fmt.Errorf(errFmt, line)
if colonPos == -1 || len(line) < colonPos+3 {
badLines = append(badLines, line)
continue
}

pipePos := bytes.IndexByte(line, '|')
if pipePos == -1 {
return nil, fmt.Errorf(errFmt, line)
pipePos := bytes.IndexByte(line[colonPos+1:], '|') + colonPos + 1
if pipePos == -1 || len(line) < pipePos+2 {
badLines = append(badLines, line)
continue
}

bucket := line[:colonPos]
value := line[colonPos+1 : pipePos]
modifier, err := extractModifier(line, pipePos+1)

modifier, sampleMaybe, err := extractModifier(line[pipePos+1:])
if err != nil {
return nil, err
badLines = append(badLines, line)
continue
}

sampleRate := float32(1)
sampleRate, err = extractSampleRate(line)
if err != nil {
return nil, err
if sampleMaybe {
sampleRate, err = extractSampleRate(line[pipePos+2:])
if err != nil {
badLines = append(badLines, line)
continue
}
}

var stat Stat
Expand All @@ -186,48 +197,47 @@ func parseMessage(message []byte) ([]Stat, error) {
stats = append(stats, stat)
}

return stats, nil
return stats, badLines
}

func extractModifier(message []byte, startAt int) ([]byte, error) {
modifier := message[startAt:]

l := len(modifier)

if l == 1 {
func extractModifier(message []byte) ([]byte, bool, error) {
l := len(message)
switch {
case l == 1:
for _, m := range []byte{'g', 'h', 'm', 'c'} {
if modifier[0] == m {
return modifier, nil
if message[0] == m {
return message, false, nil
}
}
}

if l >= 2 {
if bytes.HasPrefix(modifier, []byte("ms")) {
return []byte("ms"), nil
case l == 2:
if bytes.Equal(message, []byte("ms")) {
return []byte("ms"), false, nil
}

if modifier[0] == 'c' {
return []byte("c"), nil
case l > 2:
if message[0] == 'c' {
return []byte("c"), true, nil
}
if bytes.HasPrefix(message, []byte("ms")) {
return []byte("ms"), true, nil
}
}

return []byte{}, fmt.Errorf("Can not find modifier in message %s", message)
return []byte{}, false, fmt.Errorf("invalid modifier in message %s", message)
}

func extractSampleRate(message []byte) (float32, error) {
atPos := bytes.IndexByte(message, '@')

// no sample rate
if atPos == -1 {
l := len(message)
if l > 0 && message[0] == 's' {
// Leftover "s" from "ms" modifier.
l = l - 1
message = message[1:]
}
if l < 3 || !bytes.HasPrefix(message, []byte("|@")) {
return 1, nil
}

sampleRate, err := strconv.ParseFloat(string(message[atPos+1:]), 32)
sampleRate, err := strconv.ParseFloat(string(message[2:]), 32)
if err != nil {
return 1, err
}

return float32(sampleRate), nil
}

Expand Down
36 changes: 17 additions & 19 deletions plugins/statsd/statsd_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,30 @@ func TestParseMessage(t *testing.T) {
}

for msg, expected := range testData {
obtained, err := parseMessage([]byte(msg + "\n"))

if err != nil {
t.Fatalf("error should be nil, got %s", err)
stats, badLines := parseMessage([]byte(msg + "\n"))
if len(badLines) != 0 {
t.Fatalf("valid statsd stat failed to parse: %s", badLines[0])
}

if len(obtained) != len(expected) {
t.Fatalf("expected %d Stat objects, got %d", len(expected), len(obtained))
if len(stats) != len(expected) {
t.Fatalf("expected %d Stat objects, got %d", len(expected), len(stats))
}

for index, stat := range obtained {
for index, stat := range stats {
if stat.Bucket != expected[index].Bucket {
t.Fatalf("expected %s at index %d, got %s", expected[index].Bucket, index, stat.Bucket)
t.Fatalf("expected %s at index %d, got %s",
expected[index].Bucket, index, stat.Bucket)
}

if stat.Value != expected[index].Value {
t.Fatalf("expected %s at index %d, got %s", expected[index].Value, index, stat.Value)
t.Fatalf("expected %s at index %d, got %s",
expected[index].Value, index, stat.Value)
}

if stat.Modifier != expected[index].Modifier {
t.Fatalf("expected %s at index %d, got %s", expected[index].Modifier, index, stat.Modifier)
t.Fatalf("expected %s at index %d, got %s",
expected[index].Modifier, index, stat.Modifier)
}

if stat.Sampling != expected[index].Sampling {
t.Fatalf("expected %f at index %d, got %f", expected[index].Sampling, index, stat.Sampling)
t.Fatalf("expected %f at index %d, got %f",
expected[index].Sampling, index, stat.Sampling)
}
}
}
Expand All @@ -204,10 +203,9 @@ func TestParseMessageInvalid(t *testing.T) {
}

for _, m := range messages {
_, err := parseMessage([]byte(m + "\n"))

if err == nil {
t.Fatalf("err should not be nil, got : %s", err.Error())
stats, badLines := parseMessage([]byte(m + "\n"))
if len(stats) > 0 || len(badLines) != 1 || string(badLines[0]) != m {
t.Fatalf("bad statsd stat successfully parsed: %s", m)
}
}
}
Expand Down

0 comments on commit 12bef7b

Please sign in to comment.