Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Move aggregation logic to its own package
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Mar 15, 2018
1 parent 1ed6b29 commit f05ebf4
Show file tree
Hide file tree
Showing 42 changed files with 1,342 additions and 1,265 deletions.
95 changes: 95 additions & 0 deletions aggregation/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregation

import (
"fmt"

"github.com/m3db/m3metrics/generated/proto/schema"
)

const (
// IDLen is the length of the ID.
// The IDLen will be 1 when maxTypeID <= 63.
IDLen = (maxTypeID)/64 + 1

// ID uses an array of int64 to represent aggregation types.
idBitShift = 6
idBitMask = 63
)

// ID represents a compressed view of Types.
type ID [IDLen]uint64

// NewIDFromSchema creates an ID from schema.
func NewIDFromSchema(input []schema.AggregationType) (ID, error) {
aggTypes, err := NewTypesFromSchema(input)
if err != nil {
return DefaultID, err
}

// TODO(cw): consider pooling these compressors,
// this allocates one extra slice of length one per call.
id, err := NewIDCompressor().Compress(aggTypes)
if err != nil {
return DefaultID, err
}
return id, nil
}

// MustCompressTypes compresses a list of aggregation types to
// an ID, it panics if an error was encountered.
func MustCompressTypes(aggTypes ...Type) ID {
res, err := NewIDCompressor().Compress(aggTypes)
if err != nil {
panic(err.Error())
}
return res
}

// IsDefault checks if the ID is the default aggregation type.
func (id ID) IsDefault() bool {
return id == DefaultID
}

// Contains checks if the given aggregation type is contained in the aggregation id.
func (id ID) Contains(aggType Type) bool {
if !aggType.IsValid() {
return false
}
idx := int(aggType) >> idBitShift // aggType / 64
offset := uint(aggType) & idBitMask // aggType % 64
return (id[idx] & (1 << offset)) > 0
}

// Types returns the aggregation types defined by the id.
func (id ID) Types() (Types, error) {
return NewIDDecompressor().Decompress(id)
}

// String for debugging.
func (id ID) String() string {
aggTypes, err := id.Types()
if err != nil {
return fmt.Sprintf("[invalid ID: %v]", err)
}
return aggTypes.String()
}
70 changes: 35 additions & 35 deletions policy/aggregation_id_compress.go → aggregation/id_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,113 +18,113 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package policy
package aggregation

import (
"fmt"

"github.com/willf/bitset"
)

// AggregationIDCompressor can compress AggregationTypes into an AggregationID.
type AggregationIDCompressor interface {
// IDCompressor can compress Types into an ID.
type IDCompressor interface {
// Compress compresses a set of aggregation types into an aggregation id.
Compress(aggTypes AggregationTypes) (AggregationID, error)
Compress(aggTypes Types) (ID, error)

// MustCompress compresses a set of aggregation types into an aggregation id,
// and panics if an error is encountered.
MustCompress(aggTypes AggregationTypes) AggregationID
MustCompress(aggTypes Types) ID
}

// AggregationIDDecompressor can decompress AggregationID.
type AggregationIDDecompressor interface {
// IDDecompressor can decompress ID.
type IDDecompressor interface {
// Decompress decompresses aggregation types,
// returns error if any invalid aggregation type is encountered.
Decompress(compressed AggregationID) (AggregationTypes, error)
Decompress(compressed ID) (Types, error)
}

type aggregationIDCompressor struct {
type idCompressor struct {
bs *bitset.BitSet
}

// NewAggregationIDCompressor returns a new AggregationIDCompressor.
func NewAggregationIDCompressor() AggregationIDCompressor {
// NewIDCompressor returns a new IDCompressor.
func NewIDCompressor() IDCompressor {
// NB(cw): If we start to support more than 64 types, the library will
// expand the underlying word list itself.
return &aggregationIDCompressor{
bs: bitset.New(MaxAggregationTypeID),
return &idCompressor{
bs: bitset.New(maxTypeID),
}
}

func (c *aggregationIDCompressor) Compress(aggTypes AggregationTypes) (AggregationID, error) {
func (c *idCompressor) Compress(aggTypes Types) (ID, error) {
c.bs.ClearAll()
for _, aggType := range aggTypes {
if !aggType.IsValid() {
return DefaultAggregationID, fmt.Errorf("could not compress invalid AggregationType %v", aggType)
return DefaultID, fmt.Errorf("could not compress invalid Type %v", aggType)
}
c.bs.Set(uint(aggType.ID()))
}

codes := c.bs.Bytes()
var id AggregationID
// NB(cw) it's guaranteed that len(id) == len(codes) == AggregationIDLen, we need to copy
var id ID
// NB(cw) it's guaranteed that len(id) == len(codes) == IDLen, we need to copy
// the words in bitset out because the bitset contains a slice internally.
for i := 0; i < AggregationIDLen; i++ {
for i := 0; i < IDLen; i++ {
id[i] = codes[i]
}
return id, nil
}

func (c *aggregationIDCompressor) MustCompress(aggTypes AggregationTypes) AggregationID {
func (c *idCompressor) MustCompress(aggTypes Types) ID {
id, err := c.Compress(aggTypes)
if err != nil {
panic(fmt.Errorf("unable to compress %v: %v", aggTypes, err))
}
return id
}

type aggregationIDDecompressor struct {
type idDecompressor struct {
bs *bitset.BitSet
buf []uint64
pool AggregationTypesPool
pool TypesPool
}

// NewAggregationIDDecompressor returns a new AggregationIDDecompressor.
func NewAggregationIDDecompressor() AggregationIDDecompressor {
return NewPooledAggregationIDDecompressor(nil)
// NewIDDecompressor returns a new IDDecompressor.
func NewIDDecompressor() IDDecompressor {
return NewPooledIDDecompressor(nil)
}

// NewPooledAggregationIDDecompressor returns a new pooled AggregationTypeDecompressor.
func NewPooledAggregationIDDecompressor(pool AggregationTypesPool) AggregationIDDecompressor {
bs := bitset.New(MaxAggregationTypeID)
return &aggregationIDDecompressor{
// NewPooledIDDecompressor returns a new pooled TypeDecompressor.
func NewPooledIDDecompressor(pool TypesPool) IDDecompressor {
bs := bitset.New(maxTypeID)
return &idDecompressor{
bs: bs,
buf: bs.Bytes(),
pool: pool,
}
}

func (c *aggregationIDDecompressor) Decompress(id AggregationID) (AggregationTypes, error) {
func (c *idDecompressor) Decompress(id ID) (Types, error) {
if id.IsDefault() {
return DefaultAggregationTypes, nil
return DefaultTypes, nil
}
// NB(cw) it's guaranteed that len(c.buf) == len(id) == AggregationIDLen, we need to copy
// NB(cw) it's guaranteed that len(c.buf) == len(id) == IDLen, we need to copy
// the words from id into a slice to be used in bitset.
for i := range id {
c.buf[i] = id[i]
}

var res AggregationTypes
var res Types
if c.pool == nil {
res = make(AggregationTypes, 0, MaxAggregationTypeID)
res = make(Types, 0, maxTypeID)
} else {
res = c.pool.Get()
}

for i, e := c.bs.NextSet(0); e; i, e = c.bs.NextSet(i + 1) {
aggType := AggregationType(i)
aggType := Type(i)
if !aggType.IsValid() {
return DefaultAggregationTypes, fmt.Errorf("invalid AggregationType: %s", aggType.String())
return DefaultTypes, fmt.Errorf("invalid Type: %s", aggType.String())
}

res = append(res, aggType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package policy
package aggregation

import (
"testing"
Expand All @@ -28,27 +28,27 @@ import (
"github.com/stretchr/testify/require"
)

func TestAggregationIDCompressRoundTrip(t *testing.T) {
func TestIDCompressRoundTrip(t *testing.T) {
testcases := []struct {
input AggregationTypes
result AggregationTypes
input Types
result Types
expectErr bool
}{
{DefaultAggregationTypes, DefaultAggregationTypes, false},
{[]AggregationType{UnknownAggregationType}, DefaultAggregationTypes, true},
{[]AggregationType{Min, Max}, []AggregationType{Min, Max}, false},
{[]AggregationType{Last}, []AggregationType{Last}, false},
{[]AggregationType{P999, P9999}, []AggregationType{P999, P9999}, false},
{[]AggregationType{1, 5, 9, 3, 2}, []AggregationType{1, 2, 3, 5, 9}, false},
{DefaultTypes, DefaultTypes, false},
{[]Type{UnknownType}, DefaultTypes, true},
{[]Type{Min, Max}, []Type{Min, Max}, false},
{[]Type{Last}, []Type{Last}, false},
{[]Type{P999, P9999}, []Type{P999, P9999}, false},
{[]Type{1, 5, 9, 3, 2}, []Type{1, 2, 3, 5, 9}, false},
// 50 is an unknown aggregation type.
{[]AggregationType{10, 50}, DefaultAggregationTypes, true},
{[]Type{10, 50}, DefaultTypes, true},
}

p := NewAggregationTypesPool(pool.NewObjectPoolOptions().SetSize(1))
p.Init(func() AggregationTypes {
return make(AggregationTypes, 0, MaxAggregationTypeID)
p := NewTypesPool(pool.NewObjectPoolOptions().SetSize(1))
p.Init(func() Types {
return make(Types, 0, maxTypeID)
})
compressor, decompressor := NewAggregationIDCompressor(), NewPooledAggregationIDDecompressor(p)
compressor, decompressor := NewIDCompressor(), NewPooledIDDecompressor(p)
for _, test := range testcases {
codes, err := compressor.Compress(test.input)
if test.expectErr {
Expand All @@ -61,12 +61,12 @@ func TestAggregationIDCompressRoundTrip(t *testing.T) {
}
}

func TestAggregationIDDecompressError(t *testing.T) {
compressor, decompressor := NewAggregationIDCompressor(), NewAggregationIDDecompressor()
_, err := decompressor.Decompress([AggregationIDLen]uint64{1}) // aggregation type: UnknownAggregationType.
func TestIDDecompressError(t *testing.T) {
compressor, decompressor := NewIDCompressor(), NewIDDecompressor()
_, err := decompressor.Decompress([IDLen]uint64{1})
require.Error(t, err)

max, err := compressor.Compress([]AggregationType{Last, Min, Max, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999})
max, err := compressor.Compress([]Type{Last, Min, Max, Mean, Median, Count, Sum, SumSq, Stdev, P95, P99, P999, P9999})
require.NoError(t, err)

max[0] = max[0] << 1
Expand Down

0 comments on commit f05ebf4

Please sign in to comment.