Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Merge pull request #6 from m3db/xichen-encode-chunked-metric
Browse files Browse the repository at this point in the history
Support encoding metrics with chunked IDs efficiently
  • Loading branch information
xichen2020 committed Jan 10, 2017
2 parents 92d021a + 4099134 commit ffa77a9
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 12 deletions.
8 changes: 4 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import:
- pool
- time
- package: gopkg.in/vmihailenco/msgpack.v2
version: 94af6ea04d2da6a09d6914ca79b8fb94f3acbba4
version: a1382b1ce0c749733b814157c245e02cc1f41076
testImport:
- package: github.com/stretchr/testify
version: d77da356e56a7428ad25149ca77381849a6a5232
Expand Down
15 changes: 14 additions & 1 deletion metric/aggregated/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ import (

// Metric is a metric, which is essentially a named value at certain time.
type Metric struct {
ID metric.ID
metric.ID
Timestamp time.Time
Value float64
}

// ChunkedMetric is a metric with a chunked ID
type ChunkedMetric struct {
metric.ChunkedID
Timestamp time.Time
Value float64
}
Expand Down Expand Up @@ -62,6 +69,12 @@ type MetricWithPolicy struct {
policy.Policy
}

// ChunkedMetricWithPolicy is a chunked metric with applicable policy
type ChunkedMetricWithPolicy struct {
ChunkedMetric
policy.Policy
}

// RawMetricWithPolicy is a raw metric with applicable policy
type RawMetricWithPolicy struct {
RawMetric
Expand Down
14 changes: 14 additions & 0 deletions metric/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@

package metric

import "fmt"

// ID is the metric id
// TODO(xichen): make ID a union of numeric ID and bytes-backed IDs
// so we can compress IDs on a per-connection basis
type ID []byte

// String is the string representation of an id
func (id ID) String() string { return string(id) }

// ChunkedID is a three-part id
type ChunkedID struct {
Prefix []byte
Data []byte
Suffix []byte
}

// String is the string representation of the chunked id
func (cid ChunkedID) String() string {
return fmt.Sprintf("%s%s%s", cid.Prefix, cid.Data, cid.Suffix)
}
30 changes: 28 additions & 2 deletions protocol/msgpack/aggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type encodeRawMetricWithPolicyFn func(data []byte, p policy.Policy)
type encodeRawMetricFn func(data []byte)
type encodeMetricAsRawFn func(m aggregated.Metric) []byte
type encodeChunkedMetricAsRawFn func(m aggregated.ChunkedMetric) []byte

// aggregatedEncoder uses MessagePack for encoding aggregated metrics.
// It is not thread-safe.
Expand All @@ -39,6 +40,7 @@ type aggregatedEncoder struct {
encodeRawMetricWithPolicyFn encodeRawMetricWithPolicyFn // raw metric with policy encoding function
encodeRawMetricFn encodeRawMetricFn // raw metric encoding function
encodeMetricAsRawFn encodeMetricAsRawFn // metric to raw metric conversion function
encodeChunkedMetricAsRawFn encodeChunkedMetricAsRawFn // chunked metric to raw metric conversion function
}

// NewAggregatedEncoder creates an aggregated encoder
Expand All @@ -52,6 +54,7 @@ func NewAggregatedEncoder(encoder BufferedEncoder) AggregatedEncoder {
enc.encodeRawMetricWithPolicyFn = enc.encodeRawMetricWithPolicy
enc.encodeRawMetricFn = enc.encodeRawMetric
enc.encodeMetricAsRawFn = enc.encodeMetricAsRaw
enc.encodeChunkedMetricAsRawFn = enc.encodeChunkedMetricAsRaw

return enc
}
Expand All @@ -71,6 +74,16 @@ func (enc *aggregatedEncoder) EncodeMetricWithPolicy(mp aggregated.MetricWithPol
return enc.err()
}

func (enc *aggregatedEncoder) EncodeChunkedMetricWithPolicy(cmp aggregated.ChunkedMetricWithPolicy) error {
if err := enc.err(); err != nil {
return err
}
enc.encodeRootObjectFn(rawMetricWithPolicyType)
data := enc.encodeChunkedMetricAsRawFn(cmp.ChunkedMetric)
enc.encodeRawMetricWithPolicyFn(data, cmp.Policy)
return enc.err()
}

func (enc *aggregatedEncoder) EncodeRawMetricWithPolicy(rp aggregated.RawMetricWithPolicy) error {
if err := enc.err(); err != nil {
return err
Expand All @@ -88,14 +101,27 @@ func (enc *aggregatedEncoder) encodeRootObject(objType objectType) {

func (enc *aggregatedEncoder) encodeMetricAsRaw(m aggregated.Metric) []byte {
enc.buf.resetData()
enc.buf.encodeVersion(metricVersion)
enc.buf.encodeNumObjectFields(numFieldsForType(metricType))
enc.encodeMetricProlog()
enc.buf.encodeID(m.ID)
enc.buf.encodeTime(m.Timestamp)
enc.buf.encodeFloat64(m.Value)
return enc.buf.encoder().Bytes()
}

func (enc *aggregatedEncoder) encodeChunkedMetricAsRaw(m aggregated.ChunkedMetric) []byte {
enc.buf.resetData()
enc.encodeMetricProlog()
enc.buf.encodeChunkedID(m.ChunkedID)
enc.buf.encodeTime(m.Timestamp)
enc.buf.encodeFloat64(m.Value)
return enc.buf.encoder().Bytes()
}

func (enc *aggregatedEncoder) encodeMetricProlog() {
enc.buf.encodeVersion(metricVersion)
enc.buf.encodeNumObjectFields(numFieldsForType(metricType))
}

func (enc *aggregatedEncoder) encodeRawMetricWithPolicy(data []byte, p policy.Policy) {
enc.encodeNumObjectFields(numFieldsForType(rawMetricWithPolicyType))
enc.encodeRawMetricFn(data)
Expand Down
10 changes: 10 additions & 0 deletions protocol/msgpack/aggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func expectedResultsForAggregatedMetricWithPolicy(t *testing.T, m interface{}, p
case aggregated.Metric:
rm := toRawMetric(t, m)
results = append(results, expectedResultsForRawMetricWithPolicy(t, rm, p)...)
case aggregated.ChunkedMetric:
rm := toRawMetric(t, m)
results = append(results, expectedResultsForRawMetricWithPolicy(t, rm, p)...)
case aggregated.RawMetric:
results = append(results, expectedResultsForRawMetricWithPolicy(t, m, p)...)
default:
Expand Down Expand Up @@ -83,6 +86,13 @@ func TestAggregatedEncodeMetricWithPolicy(t *testing.T) {
require.Equal(t, expected, *results)
}

func TestAggregatedEncodeChunkedMetricWithPolicy(t *testing.T) {
encoder, results := testCapturingAggregatedEncoder(t)
require.NoError(t, testAggregatedEncode(t, encoder, testChunkedMetric, testPolicy))
expected := expectedResultsForAggregatedMetricWithPolicy(t, testChunkedMetric, testPolicy)
require.Equal(t, expected, *results)
}

func TestAggregatedEncodeRawMetricWithPolicy(t *testing.T) {
encoder, results := testCapturingAggregatedEncoder(t)
rawMetric := toRawMetric(t, testMetric)
Expand Down
55 changes: 51 additions & 4 deletions protocol/msgpack/aggregated_roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ var (
Timestamp: time.Now(),
Value: 123.45,
}
testChunkedMetric = aggregated.ChunkedMetric{
ChunkedID: metric.ChunkedID{
Prefix: []byte("foo."),
Data: []byte("bar"),
Suffix: []byte(".baz"),
},
Timestamp: time.Now(),
Value: 123.45,
}
testMetric2 = aggregated.Metric{
ID: metric.ID("bar"),
Timestamp: time.Now(),
Value: 678.90,
}

testPolicy = policy.Policy{
Resolution: policy.Resolution{Window: time.Second, Precision: xtime.Second},
Retention: policy.Retention(time.Hour),
Expand All @@ -73,6 +81,11 @@ func testAggregatedEncode(t *testing.T, encoder AggregatedEncoder, m interface{}
Metric: m,
Policy: p,
})
case aggregated.ChunkedMetric:
return encoder.EncodeChunkedMetricWithPolicy(aggregated.ChunkedMetricWithPolicy{
ChunkedMetric: m,
Policy: p,
})
case aggregated.RawMetric:
return encoder.EncodeRawMetricWithPolicy(aggregated.RawMetricWithPolicy{
RawMetric: m,
Expand All @@ -83,9 +96,17 @@ func testAggregatedEncode(t *testing.T, encoder AggregatedEncoder, m interface{}
}
}

func toRawMetric(t *testing.T, m aggregated.Metric) aggregated.RawMetric {
func toRawMetric(t *testing.T, m interface{}) aggregated.RawMetric {
encoder := NewAggregatedEncoder(newBufferedEncoder()).(*aggregatedEncoder)
data := encoder.encodeMetricAsRaw(m)
var data []byte
switch m := m.(type) {
case aggregated.Metric:
data = encoder.encodeMetricAsRaw(m)
case aggregated.ChunkedMetric:
data = encoder.encodeChunkedMetricAsRaw(m)
default:
require.Fail(t, "unrecognized metric type %T", m)
}
require.NoError(t, encoder.err())
return NewRawMetric(data)
}
Expand Down Expand Up @@ -117,6 +138,20 @@ func validateAggregatedRoundtripWithEncoderAndIterator(
policy: input.policy,
})
require.NoError(t, testAggregatedEncode(t, encoder, inputMetric, input.policy))
case aggregated.ChunkedMetric:
var id metric.ID
id = append(id, inputMetric.ChunkedID.Prefix...)
id = append(id, inputMetric.ChunkedID.Data...)
id = append(id, inputMetric.ChunkedID.Suffix...)
expected = append(expected, metricWithPolicy{
metric: aggregated.Metric{
ID: id,
Timestamp: inputMetric.Timestamp,
Value: inputMetric.Value,
},
policy: input.policy,
})
require.NoError(t, testAggregatedEncode(t, encoder, inputMetric, input.policy))
case aggregated.RawMetric:
m, err := inputMetric.Metric()
require.NoError(t, err)
Expand Down Expand Up @@ -155,6 +190,13 @@ func TestAggregatedEncodeDecodeMetricWithPolicy(t *testing.T) {
})
}

func TestAggregatedEncodeDecodeChunkedMetricWithPolicy(t *testing.T) {
validateAggregatedRoundtrip(t, metricWithPolicy{
metric: testChunkedMetric,
policy: testPolicy,
})
}

func TestAggregatedEncodeDecodeRawMetricWithPolicy(t *testing.T) {
validateAggregatedRoundtrip(t, metricWithPolicy{
metric: toRawMetric(t, testMetric),
Expand All @@ -173,11 +215,16 @@ func TestAggregatedEncodeDecodeStress(t *testing.T) {
for i := 0; i < numIter; i++ {
var inputs []metricWithPolicy
for j := 0; j < numMetrics; j++ {
if j%2 == 0 {
if j%3 == 0 {
inputs = append(inputs, metricWithPolicy{
metric: testMetric,
policy: testPolicy,
})
} else if j%3 == 1 {
inputs = append(inputs, metricWithPolicy{
metric: testChunkedMetric,
policy: testPolicy,
})
} else {
inputs = append(inputs, metricWithPolicy{
metric: toRawMetric(t, testMetric2),
Expand Down
25 changes: 25 additions & 0 deletions protocol/msgpack/base_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type encodeTimeFn func(t time.Time)
type encodeVarintFn func(value int64)
type encodeFloat64Fn func(value float64)
type encodeBytesFn func(value []byte)
type encodeBytesLenFn func(value int)
type encodeArrayLenFn func(value int)

// baseEncoder is the base encoder that provides common encoding APIs
Expand All @@ -43,6 +44,7 @@ type baseEncoder struct {
encodeVarintFn encodeVarintFn // varint encoding function
encodeFloat64Fn encodeFloat64Fn // float64 encoding function
encodeBytesFn encodeBytesFn // byte slice encoding function
encodeBytesLenFn encodeBytesLenFn // byte slice length encoding function
encodeArrayLenFn encodeArrayLenFn // array length encoding function
}

Expand All @@ -54,6 +56,7 @@ func newBaseEncoder(encoder BufferedEncoder) encoderBase {
enc.encodeVarintFn = enc.encodeVarintInternal
enc.encodeFloat64Fn = enc.encodeFloat64Internal
enc.encodeBytesFn = enc.encodeBytesInternal
enc.encodeBytesLenFn = enc.encodeBytesLenInternal
enc.encodeArrayLenFn = enc.encodeArrayLenInternal

return enc
Expand All @@ -71,13 +74,21 @@ func (enc *baseEncoder) encodeTime(t time.Time) { enc.encodeTimeFn(
func (enc *baseEncoder) encodeVarint(value int64) { enc.encodeVarintFn(value) }
func (enc *baseEncoder) encodeFloat64(value float64) { enc.encodeFloat64Fn(value) }
func (enc *baseEncoder) encodeBytes(value []byte) { enc.encodeBytesFn(value) }
func (enc *baseEncoder) encodeBytesLen(value int) { enc.encodeBytesLenFn(value) }
func (enc *baseEncoder) encodeArrayLen(value int) { enc.encodeArrayLenFn(value) }

func (enc *baseEncoder) reset(encoder BufferedEncoder) {
enc.bufEncoder = encoder
enc.encodeErr = nil
}

func (enc *baseEncoder) encodeChunkedID(id metric.ChunkedID) {
enc.encodeBytesLen(len(id.Prefix) + len(id.Data) + len(id.Suffix))
enc.writeRaw(id.Prefix)
enc.writeRaw(id.Data)
enc.writeRaw(id.Suffix)
}

func (enc *baseEncoder) encodePolicyInternal(p policy.Policy) {
enc.encodeNumObjectFields(numFieldsForType(policyType))
enc.encodeResolution(p.Resolution)
Expand Down Expand Up @@ -152,9 +163,23 @@ func (enc *baseEncoder) encodeBytesInternal(value []byte) {
enc.encodeErr = enc.bufEncoder.EncodeBytes(value)
}

func (enc *baseEncoder) encodeBytesLenInternal(value int) {
if enc.encodeErr != nil {
return
}
enc.encodeErr = enc.bufEncoder.EncodeBytesLen(value)
}

func (enc *baseEncoder) encodeArrayLenInternal(value int) {
if enc.encodeErr != nil {
return
}
enc.encodeErr = enc.bufEncoder.EncodeArrayLen(value)
}

func (enc *baseEncoder) writeRaw(buf []byte) {
if enc.encodeErr != nil {
return
}
_, enc.encodeErr = enc.bufEncoder.Buffer.Write(buf)
}
9 changes: 9 additions & 0 deletions protocol/msgpack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type encoderBase interface {
// encodeID encodes an ID
encodeID(id metric.ID)

// encodeChunkedID encodes a chunked ID
encodeChunkedID(id metric.ChunkedID)

// encodeTime encodes a time
encodeTime(t time.Time)

Expand All @@ -100,6 +103,9 @@ type encoderBase interface {
// encodeBytes encodes a byte slice
encodeBytes(value []byte)

// encodeBytesLen encodes the length of a byte slice
encodeBytesLen(value int)

// encodeArrayLen encodes the length of an array
encodeArrayLen(value int)
}
Expand Down Expand Up @@ -245,6 +251,9 @@ type AggregatedEncoder interface {
// EncodeMetricWithPolicy encodes a metric with an applicable policy
EncodeMetricWithPolicy(mp aggregated.MetricWithPolicy) error

// EncodeChunkedMetricWithPolicy encodes a chunked metric with an applicable policy
EncodeChunkedMetricWithPolicy(cmp aggregated.ChunkedMetricWithPolicy) error

// EncodeRawMetricWithPolicy encodes a raw metric with an applicable policy
EncodeRawMetricWithPolicy(rp aggregated.RawMetricWithPolicy) error

Expand Down
Loading

0 comments on commit ffa77a9

Please sign in to comment.