Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
reuse the slice for bitset
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed May 22, 2017
1 parent ee49c04 commit dc33b9a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
18 changes: 17 additions & 1 deletion policy/aggregation_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,23 @@ func ParseAggregationTypes(str string) (AggregationTypes, error) {
}

// CompressedAggregationTypes represents a compressed AggregationTypes.
type CompressedAggregationTypes [CompressedSize]int64
type CompressedAggregationTypes [CompressedSize]uint64

// NewCompressedAggregationTypesFromSchema creates a list of aggregation types from a schema
func NewCompressedAggregationTypesFromSchema(input []schema.AggregationType) (CompressedAggregationTypes, error) {
aggTypes, err := NewAggregationTypesFromSchema(input)
if err != nil {
return EmptyCompressedAggregationTypes, err
}

// TODO(cw): consider pooling these compressors,
// this allocates one extra slice of length one per call.
compressed, err := NewAggregationTypeCompressor().Compress(aggTypes)
if err != nil {
return EmptyCompressedAggregationTypes, err
}
return compressed, nil
}

// IsDefault checks if the CompressedAggregationTypes is the default aggregation type.
func (compressed CompressedAggregationTypes) IsDefault() bool {
Expand Down
16 changes: 10 additions & 6 deletions policy/aggregation_type_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type aggregationCompresser struct {
func NewAggregationTypeCompressor() AggregationTypeCompressor {
// NB(cw): If we start to support more than 64 types, the library will
// expand the underlying word list itself.
return &aggregationCompresser{bs: bitset.New(CompressedSize)}
return &aggregationCompresser{bs: bitset.New(TotalAggregationTypes)}
}

func (c *aggregationCompresser) Compress(aggTypes AggregationTypes) (CompressedAggregationTypes, error) {
Expand All @@ -91,18 +91,22 @@ func (c *aggregationCompresser) Compress(aggTypes AggregationTypes) (CompressedA
codes := c.bs.Bytes()
var res CompressedAggregationTypes
for i := 0; i < CompressedSize && i < len(codes); i++ {
res[i] = int64(codes[i])
res[i] = codes[i]
}
return res, nil
}

type aggregationDecompresser struct {
buf []uint64
res []AggregationType
}

// NewAggregationTypeDecompressor returns a new AggregationTypeDecompressor.
func NewAggregationTypeDecompressor() AggregationTypeDecompressor {
return &aggregationDecompresser{res: make([]AggregationType, 0, TotalAggregationTypes)}
return &aggregationDecompresser{
buf: make([]uint64, CompressedSize),
res: make([]AggregationType, 0, TotalAggregationTypes),
}
}

func (c *aggregationDecompresser) Decompress(compressed CompressedAggregationTypes) (AggregationTypes, error) {
Expand All @@ -122,11 +126,11 @@ func (c *aggregationDecompresser) DecompressForTimer(compressed CompressedAggreg
}

func (c *aggregationDecompresser) decompress(compressed CompressedAggregationTypes, validate validateFn, err errorFn) (AggregationTypes, error) {
codes := make([]uint64, CompressedSize)
for i := range compressed {
codes[i] = uint64(compressed[i])
c.buf[i] = compressed[i]
}
bs := bitset.From([]uint64(codes))

bs := bitset.From(c.buf)

c.res = c.res[:0]
for i, e := bs.NextSet(0); e; i, e = bs.NextSet(i + 1) {
Expand Down
2 changes: 1 addition & 1 deletion policy/aggregation_type_compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAggregationTypeCompressRoundTrip(t *testing.T) {

func TestAggregationTypeDecompressError(t *testing.T) {
compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor()
_, err := decompressor.Decompress([CompressedSize]int64{1})
_, err := decompressor.Decompress([CompressedSize]uint64{1}) // aggregation type: Unknown
require.Error(t, err)

max, err := compressor.Compress([]AggregationType{Last, Lower, Upper, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999})
Expand Down
10 changes: 2 additions & 8 deletions policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,7 @@ func NewUnaggregatedPolicyFromSchema(p *schema.Policy) (UnaggregatedPolicy, erro
return EmptyUnaggregatedPolicy, err
}

aggTypes, err := NewAggregationTypesFromSchema(p.AggregationTypes)
if err != nil {
return EmptyUnaggregatedPolicy, err
}

// TODO(cw): consider pooling these compressors
compressed, err := NewAggregationTypeCompressor().Compress(aggTypes)
compressedAggTypes, err := NewCompressedAggregationTypesFromSchema(p.AggregationTypes)
if err != nil {
return EmptyUnaggregatedPolicy, err
}
Expand All @@ -159,7 +153,7 @@ func NewUnaggregatedPolicyFromSchema(p *schema.Policy) (UnaggregatedPolicy, erro
Precision: unit,
},
retention: Retention(p.Retention.Period),
}, compressed), nil
}, compressedAggTypes), nil

}

Expand Down
4 changes: 2 additions & 2 deletions protocol/msgpack/base_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (enc *baseEncoder) encodeCompressedAggregationTypes(aggTypes policy.Compres
if policy.CompressedSize == 1 {
enc.encodeNumObjectFields(numFieldsForType(compressedAggregationTypesShort))
enc.encodeObjectType(compressedAggregationTypesShort)
enc.encodeVarintFn(aggTypes[0])
enc.encodeVarintFn(int64(aggTypes[0]))
return
}

Expand All @@ -118,7 +118,7 @@ func (enc *baseEncoder) encodeCompressedAggregationTypes(aggTypes policy.Compres
enc.encodeObjectType(compressedAggregationTypesLong)
enc.encodeArrayLen(policy.CompressedSize)
for _, v := range aggTypes {
enc.encodeVarint(v)
enc.encodeVarint(int64(v))
}
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/msgpack/base_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (it *baseIterator) decodeCompressedAggregationTypes() policy.CompressedAggr
case defaultAggregationTypes:
case compressedAggregationTypesShort:
value := it.decodeVarint()
aggTypes[0] = value
aggTypes[0] = uint64(value)
case compressedAggregationTypesLong:
numValues := it.decodeArrayLen()
if numValues > policy.CompressedSize {
Expand All @@ -106,7 +106,7 @@ func (it *baseIterator) decodeCompressedAggregationTypes() policy.CompressedAggr
}

for i := 0; i < numValues; i++ {
aggTypes[i] = it.decodeVarint()
aggTypes[i] = uint64(it.decodeVarint())
}
default:
it.decodeErr = fmt.Errorf("unrecognized aggregation encode type %v", aggregationEncodeType)
Expand Down

0 comments on commit dc33b9a

Please sign in to comment.