diff --git a/monitor/prometheus.go b/monitor/prometheus.go new file mode 100644 index 0000000..4272684 --- /dev/null +++ b/monitor/prometheus.go @@ -0,0 +1,114 @@ +package monitor + +import ( + "bytes" + "errors" + "strconv" +) + +// Metric represents a single Prometheus metric line, including its +// labels and timestamp. +type Metric struct { + Name []byte + Labels LabelPairs + Value int + Milliseconds int64 +} + +// LabelPairs contains the set of labels for a metric. +type LabelPairs []LabelPair + +// LabelPair contains a label name and value. +type LabelPair struct { + Name []byte + Value []byte +} + +// ParseMetric parses a single Promethesus metric line. +// +// Note: The implementation currently only supports integer values and +// also doesn't handle escaped characters nor multiple sequential +// whitespace characters. This is OK for now because this only needs +// to support metrics generated by the various influx-spout +// components. +func ParseMetric(s []byte) (*Metric, error) { + if len(s) < 3 { + return nil, errors.New("invalid metric") + } + + out := new(Metric) + var err error + + i := bytes.IndexAny(s, " {") + if i == -1 { + return nil, errors.New("no value") + } + out.Name = s[:i] + + if s[i] == '{' { + i++ + labels, n, err := parseLabels(s[i:]) + if err != nil { + return nil, err + } + out.Labels = labels + i += n + } + + i++ + j := bytes.IndexByte(s[i:], ' ') + if j == -1 { + j = len(s[i:]) // No timestamp + } + out.Value, err = strconv.Atoi(string(s[i : i+j])) + if err != nil { + return nil, errors.New("invalid value") + } + + i += j + if i < len(s) { + out.Milliseconds, err = strconv.ParseInt(string(s[i+1:]), 10, 64) + if err != nil { + return nil, errors.New("invalid timestamp") + } + } + + return out, nil +} + +func parseLabels(s []byte) (LabelPairs, int, error) { + if s[0] == '}' { + return nil, 1, nil + } + + i := 0 + out := make(LabelPairs, 0, 1) + for { + var label LabelPair + + j := bytes.Index(s[i:], []byte(`="`)) + if j == -1 { + return nil, i, errors.New("invalid label") + } + label.Name = s[i : i+j] + i = i + j + 2 + + j = bytes.IndexByte(s[i:], '"') + if j == -1 { + return nil, i, errors.New("missing label closing quote") + } + label.Value = s[i : i+j] + i = i + j + 1 + + out = append(out, label) + + switch s[i] { + case '}': + return out, i + 1, nil + case ',': + i++ + default: + return nil, i, errors.New("invalid label separator") + } + } +} diff --git a/monitor/prometheus_test.go b/monitor/prometheus_test.go new file mode 100644 index 0000000..bff35d0 --- /dev/null +++ b/monitor/prometheus_test.go @@ -0,0 +1,166 @@ +package monitor_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jumptrading/influx-spout/monitor" +) + +func TestBasic(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo 42")) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Value: 42, + }, m) +} + +func TestEmpty(t *testing.T) { + m, err := monitor.ParseMetric([]byte("")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid metric") +} + +func TestNoValue(t *testing.T) { + m, err := monitor.ParseMetric([]byte("what")) + assert.Nil(t, m) + assert.EqualError(t, err, "no value") +} + +func TestEmptyValue(t *testing.T) { + m, err := monitor.ParseMetric([]byte("what ")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid value") +} + +func TestFloatValue(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo 12.32")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid value") +} + +func TestStringValue(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo bar")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid value") +} + +func TestLabel(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method="post"} 2`)) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Labels: monitor.LabelPairs{{ + Name: []byte("method"), + Value: []byte("post"), + }}, + Value: 2, + }, m) +} + +func TestEmptyLabels(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{} 2`)) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Value: 2, + }, m) +} + +func TestMultipleLabels(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method="post",code="200"} 2`)) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Labels: monitor.LabelPairs{ + { + Name: []byte("method"), + Value: []byte("post"), + }, + { + Name: []byte("code"), + Value: []byte("200"), + }, + }, + Value: 2, + }, m) +} + +func TestBadLabelSep(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method="post"/code="200"} 2`)) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid label separator") +} + +func TestNoLabelValue(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method} 2`)) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid label") +} + +func TestMissingClosingBrace(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method="post" 2`)) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid label separator") +} + +func TestMissingLabelOpeningQuote(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method=post} 2`)) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid label") +} + +func TestMissingLabelClosingQuotes(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{method="post} 2`)) + assert.Nil(t, m) + assert.EqualError(t, err, "missing label closing quote") +} + +func TestTimestamp(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo 42 1234567")) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Value: 42, + Milliseconds: 1234567, + }, m) +} + +func TestInvalidTimestamp(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo 42 abc")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid timestamp") +} + +func TestTrailingSpace(t *testing.T) { + m, err := monitor.ParseMetric([]byte("foo 42 ")) + assert.Nil(t, m) + assert.EqualError(t, err, "invalid timestamp") +} + +func TestLabelsAndTimestamp(t *testing.T) { + m, err := monitor.ParseMetric([]byte(`foo{host="nyc01",bar="definitely",thing="forgot"} 42 123456789`)) + require.NoError(t, err) + assert.Equal(t, &monitor.Metric{ + Name: []byte("foo"), + Labels: monitor.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc01"), + }, + { + Name: []byte("bar"), + Value: []byte("definitely"), + }, + { + Name: []byte("thing"), + Value: []byte("forgot"), + }, + }, + Value: 42, + Milliseconds: 123456789, + }, m) +}