Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
First round of fixes to address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed Apr 19, 2017
1 parent 5d4a46c commit 4792c45
Show file tree
Hide file tree
Showing 23 changed files with 587 additions and 473 deletions.
2 changes: 1 addition & 1 deletion generated/proto/policy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ message CompressedPolicy {

message ActivePolicies {
repeated CompressedPolicy compressedPolicies = 1;
}
}
16 changes: 3 additions & 13 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import:
version: 7390af9dcd3c33042ebaf2474a1724a83cf1a7e6
subpackages:
- proto
- package: go.uber.org/zap
version: 1.1.0
- package: go.uber.org/atomic
version: 1.1.0
- package: gopkg.in/vmihailenco/msgpack.v2
version: a1382b1ce0c749733b814157c245e02cc1f41076
testImport:
Expand Down
153 changes: 153 additions & 0 deletions policy/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package policy

import (
"errors"
"sync"

"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3metrics/generated/proto/schema"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/log"
)

var (
errNilPolicies = errors.New("policies cannot be nil")
errNilCompressedPolicy = errors.New("compressed policy cannot be nil")
errNilInternalPolicy = errors.New("internal policy for compressed policy cannot be nil")
)

// CompressionMap is a mapping from Policy to integer.
type CompressionMap map[Policy]int64

// NewCompressionMap constructs a new CompressionMap from a slice of
// compressed policies.
func NewCompressionMap(policies []*schema.CompressedPolicy) (CompressionMap, error) {
if policies == nil {
return nil, errNilPolicies
}

compressionMap := make(CompressionMap, len(policies))
for _, compressedPolicy := range policies {
if compressedPolicy == nil {
return nil, errNilCompressedPolicy
}

internalPolicy := compressedPolicy.GetPolicy()
if internalPolicy == nil {
return nil, errNilInternalPolicy
}

policy, err := NewPolicyFromSchema(internalPolicy)
if err != nil {
return nil, err
}
compressionMap[policy] = compressedPolicy.Id
}

return compressionMap, nil
}

// Compressor maintains a mapping of policies to integers. It is useful when
// encoding policies so that an integer representation of a policy can be
// sent instead of the entire Policy.
type Compressor interface {
ID(p Policy) (int64, bool)
}

type noopCompressor struct{}

// NewNoopCompressor returns a new Compressor which never matches a policy.
func NewNoopCompressor() Compressor {
return noopCompressor{}
}

func (m noopCompressor) ID(p Policy) (int64, bool) {
return 0, false
}

type staticCompressor struct {
policies CompressionMap
}

// NewStaticCompressor returns a new static Compressor.
func NewStaticCompressor(policies CompressionMap) Compressor {
return staticCompressor{policies: policies}
}

func (m staticCompressor) ID(p Policy) (int64, bool) {
i, ok := m.policies[p]
return i, ok
}

type dynamicCompressor struct {
sync.RWMutex

policies CompressionMap
log xlog.Logger
}

// NewDynamicCompressor returns a new dynamic Compressor.
func NewDynamicCompressor(w kv.ValueWatch, opts instrument.Options) Compressor {
m := &dynamicCompressor{
policies: make(CompressionMap),
log: opts.Logger(),
}
m.update(w.Get())

go m.run(w)

return m
}

func (m *dynamicCompressor) ID(p Policy) (int64, bool) {
m.RLock()
i, ok := m.policies[p]
m.RUnlock()
return i, ok
}

func (m *dynamicCompressor) run(w kv.ValueWatch) {
for {
_, ok := <-w.C()
if ok {
m.log.Debug("received an update to compressor policies")
m.update(w.Get())
} else {
m.log.Debug("watch for compressor policies was closed")
return
}
}
}

func (m *dynamicCompressor) update(v kv.Value) {
if v == nil {
m.log.Warn("received a nil Value for compressor update")
return
}

activePolicies := schema.ActivePolicies{}
if err := v.Unmarshal(&activePolicies); err != nil {
m.log.WithFields(
xlog.NewLogField("version", v.Version()),
xlog.NewLogErrField(err),
).Error("failed to unmarshal compressor update")
return
}

compressedPolicies := activePolicies.GetCompressedPolicies()
if compressedPolicies == nil {
m.log.Warn("encoded policies in update to compressor is nil")
return
}

newPolicies, err := NewCompressionMap(compressedPolicies)
if err != nil {
m.log.WithFields(
xlog.NewLogErrField(err),
).Error("failed to construct new compressed policies map")
}

m.Lock()
m.policies = newPolicies
m.Unlock()
}
72 changes: 58 additions & 14 deletions policy/compressor_test.go → policy/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/m3db/m3metrics/generated/proto/schema"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/time"

"github.com/stretchr/testify/assert"
)

var (
compressorPolicies = map[Policy]int64{
compressorPolicies = CompressionMap{
Policy{
resolution: Resolution{Window: 10 * time.Second, Precision: xtime.Second},
retention: Retention(6 * time.Hour),
Expand Down Expand Up @@ -42,11 +43,53 @@ var (
}
)

func TestNewCompressionMap(t *testing.T) {
sampleSchemaPolicy := &schema.Policy{
Resolution: &schema.Resolution{WindowSize: int64(10 * time.Second), Precision: int64(time.Second)},
Retention: &schema.Retention{Period: int64(2 * time.Hour)},
}
samplePolicy, _ := NewPolicyFromSchema(sampleSchemaPolicy)

tests := []struct {
policies []*schema.CompressedPolicy
expectedErr bool
expectedMap CompressionMap
}{
// Test case for when policies is nil.
{nil, true, nil},
// Test case for when a compressed policy is nil.
{[]*schema.CompressedPolicy{nil}, true, nil},
// Test case for when an internal policy is nil.
{[]*schema.CompressedPolicy{&schema.CompressedPolicy{Policy: nil}}, true, nil},
// Test case for valid policies.
{
[]*schema.CompressedPolicy{
&schema.CompressedPolicy{
Policy: sampleSchemaPolicy,
Id: 1,
},
},
false,
CompressionMap{samplePolicy: 1},
},
}

for _, test := range tests {
actualMap, actualErr := NewCompressionMap(test.policies)
if test.expectedErr {
assert.Error(t, actualErr)
continue
}
assert.NoError(t, actualErr)
assert.Equal(t, test.expectedMap, actualMap)
}
}

func TestNoopCompressor(t *testing.T) {
c := NewNoopCompressor()

for policy := range compressorPolicies {
_, ok := c.Get(policy)
_, ok := c.ID(policy)
assert.False(t, ok)
}
}
Expand All @@ -55,7 +98,7 @@ func TestStaticCompressor(t *testing.T) {
c := NewStaticCompressor(compressorPolicies)

for policy, expectedID := range compressorPolicies {
actualID, ok := c.Get(policy)
actualID, ok := c.ID(policy)
assert.True(t, ok)
assert.Equal(t, expectedID, actualID)
}
Expand All @@ -64,12 +107,12 @@ func TestStaticCompressor(t *testing.T) {
resolution: Resolution{Window: 10 * time.Minute, Precision: xtime.Minute},
retention: Retention(48 * time.Hour),
}
_, ok := c.Get(unusedPolicy)
_, ok := c.ID(unusedPolicy)
assert.False(t, ok)
}

func TestDynamicCompressor(t *testing.T) {
policies := make(map[Policy]int64, len(compressorPolicies))
policies := make(CompressionMap, len(compressorPolicies))
for k, v := range compressorPolicies {
policies[k] = v
}
Expand All @@ -81,12 +124,12 @@ func TestDynamicCompressor(t *testing.T) {
c := NewDynamicCompressor(w, instrument.NewOptions())

for policy, expectedID := range compressorPolicies {
actualID, ok := c.Get(policy)
actualID, ok := c.ID(policy)
assert.True(t, ok)
assert.Equal(t, expectedID, actualID)
}

// Insert a new policy
// Insert a new policy.
newPolicy := Policy{
resolution: Resolution{Window: 10 * time.Minute, Precision: xtime.Minute},
retention: Retention(48 * time.Hour),
Expand All @@ -96,33 +139,34 @@ func TestDynamicCompressor(t *testing.T) {
proto = compressorProtoFromPolicies(policies)
vw.Update(mem.NewValue(2, proto))

for i := 0; i < 100; i++ {
if _, ok := c.Get(newPolicy); ok {
// Wait for an update.
for {
if _, ok := c.ID(newPolicy); ok {
break
}
time.Sleep(10 * time.Millisecond)
}

actualID, ok := c.Get(newPolicy)
actualID, ok := c.ID(newPolicy)
assert.True(t, ok)
assert.Equal(t, id, actualID)

// Remove the new policy
// Remove the new policy.
delete(policies, newPolicy)
proto = compressorProtoFromPolicies(policies)
vw.Update(mem.NewValue(3, proto))

for i := 0; i < 100; i++ {
if _, ok := c.Get(newPolicy); !ok {
if _, ok := c.ID(newPolicy); !ok {
break
}
time.Sleep(10 * time.Millisecond)
}
_, ok = c.Get(newPolicy)
_, ok = c.ID(newPolicy)
assert.False(t, ok)
}

func compressorProtoFromPolicies(policies map[Policy]int64) proto.Message {
func compressorProtoFromPolicies(policies CompressionMap) proto.Message {
activePolicies := &schema.ActivePolicies{}
for policy, id := range policies {
precision, err := xtime.DurationFromUnit(policy.resolution.Precision)
Expand Down
Loading

0 comments on commit 4792c45

Please sign in to comment.