diff --git a/glide.lock b/glide.lock index e30a45c..d8c24a5 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 5562b649a1a9f318893ccf7bf26795bdc215d6170cd947d057d6faa87702ec05 -updated: 2017-05-23T19:50:01.633707628-04:00 +hash: 07a1a24c24370b50b8cb4f21bdd6fd9b1b787f00c87b1f3f95715f93d84594f2 +updated: 2017-05-26T12:06:55.555982654-04:00 imports: - name: github.com/apache/thrift version: 9549b25c77587b29be4e0b5c258221a4ed85d37a @@ -51,7 +51,7 @@ imports: - m3/thrift - m3/thriftudp - name: github.com/willf/bitset - version: 1ea0245d2bc8ce44623f24a1ae162beb06ad8cd6 + version: 988f4f24992fc745de53c42df0da6581e42a6686 - name: golang.org/x/net version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: @@ -74,8 +74,6 @@ imports: - name: gopkg.in/yaml.v2 version: a83829b6f1293c91addabc89d0571c246397bbf4 testImports: -- name: github.com/cw9/bitset - version: d79da395ef228e37d81befaf8c71827ab0cb2275 - name: github.com/davecgh/go-spew version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: diff --git a/glide.yaml b/glide.yaml index 83fbbf8..91c346e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -58,5 +58,3 @@ testImport: version: d77da356e56a7428ad25149ca77381849a6a5232 subpackages: - require -- package: github.com/cw9/bitset - version: d79da395ef228e37d81befaf8c71827ab0cb2275 diff --git a/policy/aggregation_id_compress.go b/policy/aggregation_id_compress.go index 17f251c..03db74d 100644 --- a/policy/aggregation_id_compress.go +++ b/policy/aggregation_id_compress.go @@ -23,7 +23,7 @@ package policy import ( "fmt" - "github.com/cw9/bitset" + "github.com/willf/bitset" ) // AggregationIDCompressor can compress AggregationTypes into an AggregationID. @@ -35,15 +35,15 @@ type AggregationIDCompressor interface { type AggregationIDDecompressor interface { // Decompress decompresses aggregation types, // returns error if any invalid aggregation type is encountered. - Decompress(pool AggregationTypesPool, compressed AggregationID) (AggregationTypes, error) + Decompress(compressed AggregationID) (AggregationTypes, error) } type aggregationIDCompressor struct { bs *bitset.BitSet } -// NewAggregationTypeCompressor returns a new AggregationTypeCompressor. -func NewAggregationTypeCompressor() AggregationIDCompressor { +// NewAggregationIDCompressor returns a new AggregationIDCompressor. +func NewAggregationIDCompressor() AggregationIDCompressor { // NB(cw): If we start to support more than 64 types, the library will // expand the underlying word list itself. return &aggregationIDCompressor{ @@ -63,7 +63,7 @@ func (c *aggregationIDCompressor) Compress(aggTypes AggregationTypes) (Aggregati codes := c.bs.Bytes() var id AggregationID // NB(cw) it's guaranteed that len(id) == len(codes) == AggregationIDLen, we need to copy - // the words in bitset out because the bitset contains a slice internally + // the words in bitset out because the bitset contains a slice internally. for i := 0; i < AggregationIDLen; i++ { id[i] = codes[i] } @@ -71,12 +71,13 @@ func (c *aggregationIDCompressor) Compress(aggTypes AggregationTypes) (Aggregati } type aggregationIDDecompressor struct { - bs *bitset.BitSet - buf []uint64 + bs *bitset.BitSet + buf []uint64 + pool AggregationTypesPool } -// NewAggregationTypeDecompressor returns a new AggregationTypeDecompressor. -func NewAggregationTypeDecompressor() AggregationIDDecompressor { +// NewAggregationIDDecompressor returns a new AggregationIDDecompressor. +func NewAggregationIDDecompressor() AggregationIDDecompressor { bs := bitset.New(totalAggregationTypes) return &aggregationIDDecompressor{ bs: bs, @@ -84,20 +85,28 @@ func NewAggregationTypeDecompressor() AggregationIDDecompressor { } } -func (c *aggregationIDDecompressor) Decompress(pool AggregationTypesPool, id AggregationID) (AggregationTypes, error) { +// NewPooledAggregationIDDecompressor returns a new pooled AggregationTypeDecompressor. +func NewPooledAggregationIDDecompressor(pool AggregationTypesPool) AggregationIDDecompressor { + bs := bitset.New(totalAggregationTypes) + return &aggregationIDDecompressor{ + bs: bs, + buf: bs.Bytes(), + pool: pool, + } +} + +func (c *aggregationIDDecompressor) Decompress(id AggregationID) (AggregationTypes, error) { // NB(cw) it's guaranteed that len(c.buf) == len(id) == AggregationIDLen, we need to copy - // the words from id into a slice to be used in bitset + // the words from id into a slice to be used in bitset. for i := range id { c.buf[i] = id[i] } - c.bs.Reset(c.buf) - var res AggregationTypes - if pool == nil { + if c.pool == nil { res = make(AggregationTypes, 0, totalAggregationTypes) } else { - res = pool.Get() + res = c.pool.Get() } for i, e := c.bs.NextSet(0); e; i, e = c.bs.NextSet(i + 1) { diff --git a/policy/aggregation_id_compress_test.go b/policy/aggregation_id_compress_test.go index c7b474f..4246ed2 100644 --- a/policy/aggregation_id_compress_test.go +++ b/policy/aggregation_id_compress_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/m3db/m3x/pool" + "github.com/stretchr/testify/require" ) @@ -47,28 +48,28 @@ func TestAggregationIDCompressRoundTrip(t *testing.T) { p.Init(func() AggregationTypes { return make(AggregationTypes, 0, totalAggregationTypes) }) - compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor() + compressor, decompressor := NewAggregationIDCompressor(), NewPooledAggregationIDDecompressor(p) for _, test := range testcases { codes, err := compressor.Compress(test.input) if test.expectErr { require.Error(t, err) continue } - res, err := decompressor.Decompress(p, codes) + res, err := decompressor.Decompress(codes) require.NoError(t, err) require.Equal(t, test.result, res) } } func TestAggregationIDDecompressError(t *testing.T) { - compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor() - _, err := decompressor.Decompress(nil, [AggregationIDLen]uint64{1}) // aggregation type: Unknown. + compressor, decompressor := NewAggregationIDCompressor(), NewAggregationIDDecompressor() + _, err := decompressor.Decompress([AggregationIDLen]uint64{1}) // aggregation type: Unknown. require.Error(t, err) max, err := compressor.Compress([]AggregationType{Last, Lower, Upper, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999}) require.NoError(t, err) max[0] = max[0] << 1 - _, err = decompressor.Decompress(nil, max) + _, err = decompressor.Decompress(max) require.Error(t, err) } diff --git a/policy/aggregation_type.go b/policy/aggregation_type.go index a09fd01..4d9afc6 100644 --- a/policy/aggregation_type.go +++ b/policy/aggregation_type.go @@ -58,6 +58,7 @@ const ( var ( emptyStruct struct{} + // DefaultAggregationTypes is a default list of aggregation types. DefaultAggregationTypes AggregationTypes @@ -239,7 +240,7 @@ func NewAggregationIDFromSchema(input []schema.AggregationType) (AggregationID, // TODO(cw): consider pooling these compressors, // this allocates one extra slice of length one per call. - id, err := NewAggregationTypeCompressor().Compress(aggTypes) + id, err := NewAggregationIDCompressor().Compress(aggTypes) if err != nil { return DefaultAggregationID, err } @@ -271,7 +272,7 @@ func (id AggregationID) Merge(other AggregationID) (AggregationID, bool) { // String for debugging. func (id AggregationID) String() string { - aggTypes, err := NewAggregationTypeDecompressor().Decompress(nil, id) + aggTypes, err := NewAggregationIDDecompressor().Decompress(id) if err != nil { return fmt.Sprintf("[invalid AggregationID: %v]", err) } diff --git a/policy/aggregation_type_test.go b/policy/aggregation_type_test.go index d851a17..e3e0291 100644 --- a/policy/aggregation_type_test.go +++ b/policy/aggregation_type_test.go @@ -92,7 +92,7 @@ func TestCompressedAggregationTypesMerge(t *testing.T) { } func mustCompress(aggTypes ...AggregationType) AggregationID { - res, err := NewAggregationTypeCompressor().Compress(aggTypes) + res, err := NewAggregationIDCompressor().Compress(aggTypes) if err != nil { panic(err.Error()) } diff --git a/policy/policy.go b/policy/policy.go index 4d3d1a0..2b53397 100644 --- a/policy/policy.go +++ b/policy/policy.go @@ -105,7 +105,7 @@ func ParsePolicy(str string) (Policy, error) { return DefaultPolicy, err } - id, err = NewAggregationTypeCompressor().Compress(aggTypes) + id, err = NewAggregationIDCompressor().Compress(aggTypes) if err != nil { return DefaultPolicy, err } diff --git a/policy/staged_policy.go b/policy/staged_policy.go index 0f3cabb..0131d0e 100644 --- a/policy/staged_policy.go +++ b/policy/staged_policy.go @@ -113,7 +113,7 @@ func (l PoliciesList) IsDefault() bool { } // SetDefaultAggregation updates the PoliciesList with default aggregation types. -// NB(cw) This function updates the PoliciesList in place +// NB(cw) This function updates the PoliciesList in place. func (l PoliciesList) SetDefaultAggregation() PoliciesList { for _, sp := range l { pl, ok := sp.Policies() diff --git a/rules/ruleset_test.go b/rules/ruleset_test.go index c50a90b..9b62cf1 100644 --- a/rules/ruleset_test.go +++ b/rules/ruleset_test.go @@ -35,7 +35,7 @@ import ( ) var ( - compressor = policy.NewAggregationTypeCompressor() + compressor = policy.NewAggregationIDCompressor() compressedUpper, _ = compressor.Compress(policy.AggregationTypes{policy.Upper}) compressedCount, _ = compressor.Compress(policy.AggregationTypes{policy.Count}) compressedLower, _ = compressor.Compress(policy.AggregationTypes{policy.Lower})