Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Address comments, use original bitset repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed May 26, 2017
1 parent 7137e28 commit 7058fb8
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 33 deletions.
8 changes: 3 additions & 5 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,3 @@ testImport:
version: d77da356e56a7428ad25149ca77381849a6a5232
subpackages:
- require
- package: github.com/cw9/bitset
version: d79da395ef228e37d81befaf8c71827ab0cb2275
39 changes: 24 additions & 15 deletions policy/aggregation_id_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package policy
import (
"fmt"

"github.com/cw9/bitset"
"github.com/willf/bitset"
)

// AggregationIDCompressor can compress AggregationTypes into an AggregationID.
Expand All @@ -35,15 +35,15 @@ type AggregationIDCompressor interface {
type AggregationIDDecompressor interface {
// Decompress decompresses aggregation types,
// returns error if any invalid aggregation type is encountered.
Decompress(pool AggregationTypesPool, compressed AggregationID) (AggregationTypes, error)
Decompress(compressed AggregationID) (AggregationTypes, error)
}

type aggregationIDCompressor struct {
bs *bitset.BitSet
}

// NewAggregationTypeCompressor returns a new AggregationTypeCompressor.
func NewAggregationTypeCompressor() AggregationIDCompressor {
// NewAggregationIDCompressor returns a new AggregationIDCompressor.
func NewAggregationIDCompressor() AggregationIDCompressor {
// NB(cw): If we start to support more than 64 types, the library will
// expand the underlying word list itself.
return &aggregationIDCompressor{
Expand All @@ -63,41 +63,50 @@ func (c *aggregationIDCompressor) Compress(aggTypes AggregationTypes) (Aggregati
codes := c.bs.Bytes()
var id AggregationID
// NB(cw) it's guaranteed that len(id) == len(codes) == AggregationIDLen, we need to copy
// the words in bitset out because the bitset contains a slice internally
// the words in bitset out because the bitset contains a slice internally.
for i := 0; i < AggregationIDLen; i++ {
id[i] = codes[i]
}
return id, nil
}

type aggregationIDDecompressor struct {
bs *bitset.BitSet
buf []uint64
bs *bitset.BitSet
buf []uint64
pool AggregationTypesPool
}

// NewAggregationTypeDecompressor returns a new AggregationTypeDecompressor.
func NewAggregationTypeDecompressor() AggregationIDDecompressor {
// NewAggregationIDDecompressor returns a new AggregationIDDecompressor.
func NewAggregationIDDecompressor() AggregationIDDecompressor {
bs := bitset.New(totalAggregationTypes)
return &aggregationIDDecompressor{
bs: bs,
buf: bs.Bytes(),
}
}

func (c *aggregationIDDecompressor) Decompress(pool AggregationTypesPool, id AggregationID) (AggregationTypes, error) {
// NewPooledAggregationIDDecompressor returns a new pooled AggregationTypeDecompressor.
func NewPooledAggregationIDDecompressor(pool AggregationTypesPool) AggregationIDDecompressor {
bs := bitset.New(totalAggregationTypes)
return &aggregationIDDecompressor{
bs: bs,
buf: bs.Bytes(),
pool: pool,
}
}

func (c *aggregationIDDecompressor) Decompress(id AggregationID) (AggregationTypes, error) {
// NB(cw) it's guaranteed that len(c.buf) == len(id) == AggregationIDLen, we need to copy
// the words from id into a slice to be used in bitset
// the words from id into a slice to be used in bitset.
for i := range id {
c.buf[i] = id[i]
}

c.bs.Reset(c.buf)

var res AggregationTypes
if pool == nil {
if c.pool == nil {
res = make(AggregationTypes, 0, totalAggregationTypes)
} else {
res = pool.Get()
res = c.pool.Get()
}

for i, e := c.bs.NextSet(0); e; i, e = c.bs.NextSet(i + 1) {
Expand Down
11 changes: 6 additions & 5 deletions policy/aggregation_id_compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

"github.com/m3db/m3x/pool"

"github.com/stretchr/testify/require"
)

Expand All @@ -47,28 +48,28 @@ func TestAggregationIDCompressRoundTrip(t *testing.T) {
p.Init(func() AggregationTypes {
return make(AggregationTypes, 0, totalAggregationTypes)
})
compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor()
compressor, decompressor := NewAggregationIDCompressor(), NewPooledAggregationIDDecompressor(p)
for _, test := range testcases {
codes, err := compressor.Compress(test.input)
if test.expectErr {
require.Error(t, err)
continue
}
res, err := decompressor.Decompress(p, codes)
res, err := decompressor.Decompress(codes)
require.NoError(t, err)
require.Equal(t, test.result, res)
}
}

func TestAggregationIDDecompressError(t *testing.T) {
compressor, decompressor := NewAggregationTypeCompressor(), NewAggregationTypeDecompressor()
_, err := decompressor.Decompress(nil, [AggregationIDLen]uint64{1}) // aggregation type: Unknown.
compressor, decompressor := NewAggregationIDCompressor(), NewAggregationIDDecompressor()
_, err := decompressor.Decompress([AggregationIDLen]uint64{1}) // aggregation type: Unknown.
require.Error(t, err)

max, err := compressor.Compress([]AggregationType{Last, Lower, Upper, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999})
require.NoError(t, err)

max[0] = max[0] << 1
_, err = decompressor.Decompress(nil, max)
_, err = decompressor.Decompress(max)
require.Error(t, err)
}
5 changes: 3 additions & 2 deletions policy/aggregation_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (

var (
emptyStruct struct{}

// DefaultAggregationTypes is a default list of aggregation types.
DefaultAggregationTypes AggregationTypes

Expand Down Expand Up @@ -239,7 +240,7 @@ func NewAggregationIDFromSchema(input []schema.AggregationType) (AggregationID,

// TODO(cw): consider pooling these compressors,
// this allocates one extra slice of length one per call.
id, err := NewAggregationTypeCompressor().Compress(aggTypes)
id, err := NewAggregationIDCompressor().Compress(aggTypes)
if err != nil {
return DefaultAggregationID, err
}
Expand Down Expand Up @@ -271,7 +272,7 @@ func (id AggregationID) Merge(other AggregationID) (AggregationID, bool) {

// String for debugging.
func (id AggregationID) String() string {
aggTypes, err := NewAggregationTypeDecompressor().Decompress(nil, id)
aggTypes, err := NewAggregationIDDecompressor().Decompress(id)
if err != nil {
return fmt.Sprintf("[invalid AggregationID: %v]", err)
}
Expand Down
2 changes: 1 addition & 1 deletion policy/aggregation_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestCompressedAggregationTypesMerge(t *testing.T) {
}

func mustCompress(aggTypes ...AggregationType) AggregationID {
res, err := NewAggregationTypeCompressor().Compress(aggTypes)
res, err := NewAggregationIDCompressor().Compress(aggTypes)
if err != nil {
panic(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func ParsePolicy(str string) (Policy, error) {
return DefaultPolicy, err
}

id, err = NewAggregationTypeCompressor().Compress(aggTypes)
id, err = NewAggregationIDCompressor().Compress(aggTypes)
if err != nil {
return DefaultPolicy, err
}
Expand Down
2 changes: 1 addition & 1 deletion policy/staged_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (l PoliciesList) IsDefault() bool {
}

// SetDefaultAggregation updates the PoliciesList with default aggregation types.
// NB(cw) This function updates the PoliciesList in place
// NB(cw) This function updates the PoliciesList in place.
func (l PoliciesList) SetDefaultAggregation() PoliciesList {
for _, sp := range l {
pl, ok := sp.Policies()
Expand Down
2 changes: 1 addition & 1 deletion rules/ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

var (
compressor = policy.NewAggregationTypeCompressor()
compressor = policy.NewAggregationIDCompressor()
compressedUpper, _ = compressor.Compress(policy.AggregationTypes{policy.Upper})
compressedCount, _ = compressor.Compress(policy.AggregationTypes{policy.Count})
compressedLower, _ = compressor.Compress(policy.AggregationTypes{policy.Lower})
Expand Down

0 comments on commit 7058fb8

Please sign in to comment.