diff --git a/policy/aggregation_type.go b/policy/aggregation_type.go index cee2f70..76cce85 100644 --- a/policy/aggregation_type.go +++ b/policy/aggregation_type.go @@ -183,7 +183,23 @@ func ParseAggregationTypes(str string) (AggregationTypes, error) { } // CompressedAggregationTypes represents a compressed AggregationTypes. -type CompressedAggregationTypes [CompressedSize]int64 +type CompressedAggregationTypes [CompressedSize]uint64 + +// NewCompressedAggregationTypesFromSchema creates a list of aggregation types from a schema +func NewCompressedAggregationTypesFromSchema(input []schema.AggregationType) (CompressedAggregationTypes, error) { + aggTypes, err := NewAggregationTypesFromSchema(input) + if err != nil { + return EmptyCompressedAggregationTypes, err + } + + // TODO(cw): consider pooling these compressors, + // this allocates one extra slice of length one per call. + compressed, err := NewAggregationTypeCompressor().Compress(aggTypes) + if err != nil { + return EmptyCompressedAggregationTypes, err + } + return compressed, nil +} // IsDefault checks if the CompressedAggregationTypes is the default aggregation type. func (compressed CompressedAggregationTypes) IsDefault() bool { diff --git a/policy/aggregation_type_compress.go b/policy/aggregation_type_compress.go index 795d919..55a843b 100644 --- a/policy/aggregation_type_compress.go +++ b/policy/aggregation_type_compress.go @@ -77,7 +77,7 @@ type aggregationCompresser struct { func NewAggregationTypeCompressor() AggregationTypeCompressor { // NB(cw): If we start to support more than 64 types, the library will // expand the underlying word list itself. - return &aggregationCompresser{bs: bitset.New(CompressedSize)} + return &aggregationCompresser{bs: bitset.New(TotalAggregationTypes)} } func (c *aggregationCompresser) Compress(aggTypes AggregationTypes) (CompressedAggregationTypes, error) { @@ -91,18 +91,22 @@ func (c *aggregationCompresser) Compress(aggTypes AggregationTypes) (CompressedA codes := c.bs.Bytes() var res CompressedAggregationTypes for i := 0; i < CompressedSize && i < len(codes); i++ { - res[i] = int64(codes[i]) + res[i] = codes[i] } return res, nil } type aggregationDecompresser struct { + buf []uint64 res []AggregationType } // NewAggregationTypeDecompressor returns a new AggregationTypeDecompressor. func NewAggregationTypeDecompressor() AggregationTypeDecompressor { - return &aggregationDecompresser{res: make([]AggregationType, 0, TotalAggregationTypes)} + return &aggregationDecompresser{ + buf: make([]uint64, CompressedSize), + res: make([]AggregationType, 0, TotalAggregationTypes), + } } func (c *aggregationDecompresser) Decompress(compressed CompressedAggregationTypes) (AggregationTypes, error) { @@ -122,11 +126,11 @@ func (c *aggregationDecompresser) DecompressForTimer(compressed CompressedAggreg } func (c *aggregationDecompresser) decompress(compressed CompressedAggregationTypes, validate validateFn, err errorFn) (AggregationTypes, error) { - codes := make([]uint64, CompressedSize) for i := range compressed { - codes[i] = uint64(compressed[i]) + c.buf[i] = compressed[i] } - bs := bitset.From([]uint64(codes)) + + bs := bitset.From(c.buf) c.res = c.res[:0] for i, e := bs.NextSet(0); e; i, e = bs.NextSet(i + 1) { diff --git a/policy/aggregation_type_compress_test.go b/policy/aggregation_type_compress_test.go index 835c292..c0e8f77 100644 --- a/policy/aggregation_type_compress_test.go +++ b/policy/aggregation_type_compress_test.go @@ -57,7 +57,7 @@ func TestAggregationTypeCompressRoundTrip(t *testing.T) { func TestAggregationTypeDecompressError(t *testing.T) { compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor() - _, err := decompressor.Decompress([CompressedSize]int64{1}) + _, err := decompressor.Decompress([CompressedSize]uint64{1}) // aggregation type: Unknown require.Error(t, err) max, err := compressor.Compress([]AggregationType{Last, Lower, Upper, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999}) diff --git a/policy/policy.go b/policy/policy.go index 108d0e1..1ab5084 100644 --- a/policy/policy.go +++ b/policy/policy.go @@ -142,13 +142,7 @@ func NewUnaggregatedPolicyFromSchema(p *schema.Policy) (UnaggregatedPolicy, erro return EmptyUnaggregatedPolicy, err } - aggTypes, err := NewAggregationTypesFromSchema(p.AggregationTypes) - if err != nil { - return EmptyUnaggregatedPolicy, err - } - - // TODO(cw): consider pooling these compressors - compressed, err := NewAggregationTypeCompressor().Compress(aggTypes) + compressedAggTypes, err := NewCompressedAggregationTypesFromSchema(p.AggregationTypes) if err != nil { return EmptyUnaggregatedPolicy, err } @@ -159,7 +153,7 @@ func NewUnaggregatedPolicyFromSchema(p *schema.Policy) (UnaggregatedPolicy, erro Precision: unit, }, retention: Retention(p.Retention.Period), - }, compressed), nil + }, compressedAggTypes), nil } diff --git a/protocol/msgpack/base_encoder.go b/protocol/msgpack/base_encoder.go index c4161da..d572451 100644 --- a/protocol/msgpack/base_encoder.go +++ b/protocol/msgpack/base_encoder.go @@ -109,7 +109,7 @@ func (enc *baseEncoder) encodeCompressedAggregationTypes(aggTypes policy.Compres if policy.CompressedSize == 1 { enc.encodeNumObjectFields(numFieldsForType(compressedAggregationTypesShort)) enc.encodeObjectType(compressedAggregationTypesShort) - enc.encodeVarintFn(aggTypes[0]) + enc.encodeVarintFn(int64(aggTypes[0])) return } @@ -118,7 +118,7 @@ func (enc *baseEncoder) encodeCompressedAggregationTypes(aggTypes policy.Compres enc.encodeObjectType(compressedAggregationTypesLong) enc.encodeArrayLen(policy.CompressedSize) for _, v := range aggTypes { - enc.encodeVarint(v) + enc.encodeVarint(int64(v)) } } diff --git a/protocol/msgpack/base_iterator.go b/protocol/msgpack/base_iterator.go index a4ea1fc..f1976e5 100644 --- a/protocol/msgpack/base_iterator.go +++ b/protocol/msgpack/base_iterator.go @@ -97,7 +97,7 @@ func (it *baseIterator) decodeCompressedAggregationTypes() policy.CompressedAggr case defaultAggregationTypes: case compressedAggregationTypesShort: value := it.decodeVarint() - aggTypes[0] = value + aggTypes[0] = uint64(value) case compressedAggregationTypesLong: numValues := it.decodeArrayLen() if numValues > policy.CompressedSize { @@ -106,7 +106,7 @@ func (it *baseIterator) decodeCompressedAggregationTypes() policy.CompressedAggr } for i := 0; i < numValues; i++ { - aggTypes[i] = it.decodeVarint() + aggTypes[i] = uint64(it.decodeVarint()) } default: it.decodeErr = fmt.Errorf("unrecognized aggregation encode type %v", aggregationEncodeType)