Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Encoder now supports custom resolutions and retentions
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Dec 20, 2016
1 parent 8b04c6f commit f817202
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 115 deletions.
6 changes: 3 additions & 3 deletions policy/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
// ResolutionValue is the resolution value
type ResolutionValue int

// List of resolution values currently supported
// List of known resolution values
const (
UnknownResolution ResolutionValue = iota
UnknownResolutionValue ResolutionValue = iota
OneSecond
TenSeconds
OneMinute
Expand Down Expand Up @@ -63,7 +63,7 @@ func ValueFromResolution(resolution Resolution) (ResolutionValue, error) {
if exists {
return value, nil
}
return UnknownResolution, errUnknownResolution
return UnknownResolutionValue, errUnknownResolution
}

var (
Expand Down
2 changes: 1 addition & 1 deletion policy/resolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestValidResolutionValue(t *testing.T) {

func TestInvalidResolutionValue(t *testing.T) {
inputs := []ResolutionValue{
UnknownResolution,
UnknownResolutionValue,
ResolutionValue(100),
}
for _, value := range inputs {
Expand Down
6 changes: 3 additions & 3 deletions policy/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
// RetentionValue is the retention value
type RetentionValue int

// List of retention values currently supported
// List of known retention values
const (
UnknownRetention RetentionValue = iota
UnknownRetentionValue RetentionValue = iota
OneHour
SixHours
TwelveHours
Expand Down Expand Up @@ -65,7 +65,7 @@ func ValueFromRetention(retention Retention) (RetentionValue, error) {
if exists {
return value, nil
}
return UnknownRetention, errUnknownRetention
return UnknownRetentionValue, errUnknownRetention
}

var (
Expand Down
2 changes: 1 addition & 1 deletion policy/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestValidRetentionValue(t *testing.T) {

func TestInvalidRetentionValue(t *testing.T) {
inputs := []RetentionValue{
UnknownRetention,
UnknownRetentionValue,
RetentionValue(100),
}
for _, value := range inputs {
Expand Down
12 changes: 8 additions & 4 deletions protocol/msgpack/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,25 @@ var (
},
},
},
// Retain this metric at 10 second resolution for 6 hours,
// then 1 minute resolution for 2 days
// Retain this metric at 100 second resolution for 6 hours,
// then custom resolution for 2 days, then 1 minute for 25 days
{
metric: testBatchTimer,
versionedPolicies: policy.VersionedPolicies{
Version: 2,
Policies: []policy.Policy{
{
Resolution: policy.Resolution{Window: time.Duration(10), Precision: xtime.Second},
Resolution: policy.Resolution{Window: time.Duration(100), Precision: xtime.Second},
Retention: policy.Retention(6 * time.Hour),
},
{
Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Minute},
Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Unit(100)},
Retention: policy.Retention(2 * 24 * time.Hour),
},
{
Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Minute},
Retention: policy.Retention(25 * 24 * time.Hour),
},
},
},
},
Expand Down
27 changes: 22 additions & 5 deletions protocol/msgpack/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ package msgpack

type objectType int

const (
// Current version for encoding unaggregated metrics
unaggregatedVersion int = 1
)

const (
unknownType = iota

Expand All @@ -60,8 +65,12 @@ const (
batchTimerType
gaugeType
policyType
defaultVersionedPolicyType
customVersionedPolicyType
knownResolutionType
unknownResolutionType
knownRetentionType
unknownRetentionType
defaultVersionedPoliciesType
customVersionedPoliciesType

// Total number of object types
numObjectTypes = iota
Expand All @@ -76,8 +85,12 @@ const (
numBatchTimerFields = 2
numGaugeFields = 2
numPolicyFields = 2
numKnownResolutionFields = 2
numUnknownResolutionFields = 3
numKnownRetentionFields = 2
numUnknownRetentionFields = 2
numDefaultVersionedPolicyFields = 1
numCustomVersionedPolicyFields = 2
numCustomVersionedPolicyFields = 3
)

var numObjectFields []int
Expand All @@ -100,7 +113,11 @@ func init() {
setNumFieldsForType(batchTimerType, numBatchTimerFields)
setNumFieldsForType(gaugeType, numGaugeFields)
setNumFieldsForType(policyType, numPolicyFields)
setNumFieldsForType(defaultVersionedPolicyType, numDefaultVersionedPolicyFields)
setNumFieldsForType(customVersionedPolicyType, numCustomVersionedPolicyFields)
setNumFieldsForType(knownResolutionType, numKnownResolutionFields)
setNumFieldsForType(unknownResolutionType, numUnknownResolutionFields)
setNumFieldsForType(knownRetentionType, numKnownRetentionFields)
setNumFieldsForType(unknownRetentionType, numKnownRetentionFields)
setNumFieldsForType(defaultVersionedPoliciesType, numDefaultVersionedPolicyFields)
setNumFieldsForType(customVersionedPoliciesType, numCustomVersionedPolicyFields)

}
4 changes: 0 additions & 4 deletions protocol/msgpack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ import (
"gopkg.in/vmihailenco/msgpack.v2"
)

const (
supportedVersion int = 1
)

// BufferedEncoder is an messagePack-based encoder backed by byte buffers
type BufferedEncoder struct {
*msgpack.Encoder
Expand Down
71 changes: 42 additions & 29 deletions protocol/msgpack/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (enc *unaggregatedEncoder) EncodeGaugeWithPolicies(
}

func (enc *unaggregatedEncoder) encodeRootObject(objType objectType) {
enc.encodeVersion(supportedVersion)
enc.encodeVersion(unaggregatedVersion)
enc.encodeNumObjectFields(numFieldsForType(rootObjectType))
enc.encodeObjectType(objType)
}
Expand Down Expand Up @@ -189,18 +189,55 @@ func (enc *unaggregatedEncoder) encodePolicy(p policy.Policy) {
enc.encodeRetention(p.Retention)
}

func (enc *unaggregatedEncoder) encodeResolution(resolution policy.Resolution) {
if enc.err != nil {
return
}
// If this is a known resolution, only encode its corresponding value
if resolutionValue, err := policy.ValueFromResolution(resolution); err == nil {
enc.encodeNumObjectFields(numFieldsForType(knownResolutionType))
enc.encodeObjectType(knownResolutionType)
enc.encodeVarintFn(int64(resolutionValue))
return
}
// Otherwise encode the entire resolution object
// TODO(xichen): validate the resolution before putting it on the wire
enc.encodeNumObjectFields(numFieldsForType(unknownResolutionType))
enc.encodeObjectType(unknownResolutionType)
enc.encodeVarintFn(int64(resolution.Window))
enc.encodeVarintFn(int64(resolution.Precision))
}

func (enc *unaggregatedEncoder) encodeRetention(retention policy.Retention) {
if enc.err != nil {
return
}
// If this is a known retention, only encode its corresponding value
if retentionValue, err := policy.ValueFromRetention(retention); err == nil {
enc.encodeNumObjectFields(numFieldsForType(knownRetentionType))
enc.encodeObjectType(knownRetentionType)
enc.encodeVarintFn(int64(retentionValue))
return
}
// Otherwise encode the entire retention object
// TODO(xichen): validate the retention before putting it on the wire
enc.encodeNumObjectFields(numFieldsForType(unknownRetentionType))
enc.encodeObjectType(unknownRetentionType)
enc.encodeVarintFn(int64(retention))
}

func (enc *unaggregatedEncoder) encodeVersionedPolicies(vp policy.VersionedPolicies) {
// NB(xichen): if this is a default policy, we only encode the policy version
// and not the actual policies to optimize for the common case where the policies
// are the default ones
if vp.Version == policy.DefaultPolicyVersion {
enc.encodeNumObjectFields(numFieldsForType(defaultVersionedPolicyType))
enc.encodeVersion(vp.Version)
enc.encodeNumObjectFields(numFieldsForType(defaultVersionedPoliciesType))
enc.encodeObjectType(defaultVersionedPoliciesType)
return
}

// Otherwise fallback to encoding the entire object
enc.encodeNumObjectFields(numFieldsForType(customVersionedPolicyType))
enc.encodeNumObjectFields(numFieldsForType(customVersionedPoliciesType))
enc.encodeObjectType(customVersionedPoliciesType)
enc.encodeVersion(vp.Version)
enc.encodeArrayLenFn(len(vp.Policies))
for _, policy := range vp.Policies {
Expand All @@ -224,30 +261,6 @@ func (enc *unaggregatedEncoder) encodeID(id metric.ID) {
enc.encodeBytesFn([]byte(id))
}

func (enc *unaggregatedEncoder) encodeResolution(resolution policy.Resolution) {
if enc.err != nil {
return
}
resolutionValue, err := policy.ValueFromResolution(resolution)
if err != nil {
enc.err = err
return
}
enc.encodeVarintFn(int64(resolutionValue))
}

func (enc *unaggregatedEncoder) encodeRetention(retention policy.Retention) {
if enc.err != nil {
return
}
retentionValue, err := policy.ValueFromRetention(retention)
if err != nil {
enc.err = err
return
}
enc.encodeVarintFn(int64(retentionValue))
}

// NB(xichen): the underlying msgpack encoder implementation
// always cast an integer value to an int64 and encodes integer
// values as varints, regardless of the actual integer type
Expand Down
40 changes: 32 additions & 8 deletions protocol/msgpack/unaggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func testCapturingUnaggregatedEncoder(t *testing.T) (UnaggregatedEncoder, *[]int

func getExpectedResultsForMetricWithPolicies(t *testing.T, m *unaggregated.MetricUnion, p policy.VersionedPolicies) []interface{} {
results := []interface{}{
int64(supportedVersion),
int64(unaggregatedVersion),
numFieldsForType(rootObjectType),
}

Expand Down Expand Up @@ -100,25 +100,49 @@ func getExpectedResultsForMetricWithPolicies(t *testing.T, m *unaggregated.Metri

if p.Version == policy.DefaultPolicyVersion {
results = append(results, []interface{}{
numFieldsForType(defaultVersionedPolicyType),
int64(p.Version),
numFieldsForType(defaultVersionedPoliciesType),
int64(defaultVersionedPoliciesType),
}...)
} else {
results = append(results, []interface{}{
numFieldsForType(customVersionedPolicyType),
numFieldsForType(customVersionedPoliciesType),
int64(customVersionedPoliciesType),
int64(p.Version),
len(p.Policies),
}...)
for _, p := range p.Policies {
results = append(results, numFieldsForType(policyType))

resolutionValue, err := policy.ValueFromResolution(p.Resolution)
require.NoError(t, err)
results = append(results, int64(resolutionValue))
if err == nil {
results = append(results, []interface{}{
numFieldsForType(knownResolutionType),
int64(knownResolutionType),
int64(resolutionValue),
}...)
} else {
results = append(results, []interface{}{
numFieldsForType(unknownResolutionType),
int64(unknownResolutionType),
int64(p.Resolution.Window),
int64(p.Resolution.Precision),
}...)
}

retentionValue, err := policy.ValueFromRetention(p.Retention)
require.NoError(t, err)
results = append(results, int64(retentionValue))
if err == nil {
results = append(results, []interface{}{
numFieldsForType(knownRetentionType),
int64(knownRetentionType),
int64(retentionValue),
}...)
} else {
results = append(results, []interface{}{
numFieldsForType(unknownRetentionType),
int64(unknownRetentionType),
int64(p.Retention),
}...)
}
}
}

Expand Down
Loading

0 comments on commit f817202

Please sign in to comment.