Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add policy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed May 2, 2017
1 parent 0535033 commit 09fe1e7
Show file tree
Hide file tree
Showing 31 changed files with 1,938 additions and 339 deletions.
10 changes: 10 additions & 0 deletions generated/proto/policy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ message Policy {
Resolution resolution = 1;
Retention retention = 2;
}

message CompressedPolicy {
Policy policy = 1;
int64 id = 2;
}

message ActivePolicies {
repeated CompressedPolicy compressedPolicies = 1;
int64 cutoverTime = 2;
}
4 changes: 3 additions & 1 deletion generated/proto/schema/namespace.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 54 additions & 13 deletions generated/proto/schema/policy.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion generated/proto/schema/rule.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 48 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,61 @@
package: github.com/m3db/m3metrics
import:
- package: github.com/m3db/m3cluster
version: 6d7addb1bcc8d015e4d7bb7d8c28c435709383b0
subpackages:
- kv
- kv/mem
- kv/util/runtime
- package: github.com/m3db/m3x
version: d39b3685ef8430f7fbeedc666f7da781d7f55ba9
version: 84544f6528903a3213e5f6cb2500fa4b4499f715
subpackages:
- checked
- close
- instrument
- log
- pool
- time
- watch
- package: github.com/golang/protobuf
version: 7390af9dcd3c33042ebaf2474a1724a83cf1a7e6
subpackages:
- proto
- package: gopkg.in/vmihailenco/msgpack.v2
version: a1382b1ce0c749733b814157c245e02cc1f41076
- package: github.com/uber-go/tally
version: 08e86c3c1eb55364d563bd6beb982040225f4bb7
- package: github.com/facebookgo/clock
version: 600d898af40aa09a7a93ecb9265d87b0504b6f03
- package: github.com/golang/mock
version: bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
subpackages:
- gomock
- mockgen
- package: golang.org/x/net
version: f2499483f923065a842d38eb4c7f1927e6fc6e6d
subpackages:
- context
- package: google.golang.org/appengine
version: 4f7eeb5305a4ba1966344836ba4af9996b7b4e05
subpackages:
- datastore
- internal
- internal/app_identity
- internal/base
- internal/datastore
- internal/log
- internal/modules
- internal/remote_api
testImport:
- package: github.com/stretchr/testify
version: d77da356e56a7428ad25149ca77381849a6a5232
subpackages:
- require
- package: github.com/davecgh/go-spew
version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
subpackages:
- spew
- package: github.com/pmezard/go-difflib
version: d8ed2627bdf02c080bf22230dbb337003b7aba2d
subpackages:
- difflib
146 changes: 146 additions & 0 deletions policy/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package policy

import (
"errors"
"sync"

"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3cluster/kv/util/runtime"
"github.com/m3db/m3metrics/generated/proto/schema"
)

var (
errNilValue = errors.New("nil value received")
errNilPolicies = errors.New("policies cannot be nil")
errNilCompressedPolicy = errors.New("compressed policy cannot be nil")
errNilCompressedPolicies = errors.New("compressed policies cannot be nil")
errNilInternalPolicy = errors.New("internal policy for compressed policy cannot be nil")
)

// CompressionMap is a mapping from Policy to integer.
type CompressionMap map[Policy]int64

// NewCompressionMap constructs a new CompressionMap from a slice of
// compressed policies.
func NewCompressionMap(policies []*schema.CompressedPolicy) (CompressionMap, error) {
if policies == nil {
return nil, errNilPolicies
}

compressionMap := make(CompressionMap, len(policies))
for _, compressedPolicy := range policies {
if compressedPolicy == nil {
return nil, errNilCompressedPolicy
}

internalPolicy := compressedPolicy.GetPolicy()
if internalPolicy == nil {
return nil, errNilInternalPolicy
}

policy, err := NewPolicyFromSchema(internalPolicy)
if err != nil {
return nil, err
}
compressionMap[policy] = compressedPolicy.Id
}

return compressionMap, nil
}

// Compressor maintains a mapping of policies to integers. It is useful when
// encoding policies so that an integer representation of a policy can be
// sent instead of the entire Policy.
type Compressor interface {
ID(p Policy) (int64, bool)
}

type noopCompressor struct{}

// NewNoopCompressor returns a new Compressor which never matches a policy.
func NewNoopCompressor() Compressor {
return noopCompressor{}
}

func (m noopCompressor) ID(p Policy) (int64, bool) {
return 0, false
}

type staticCompressor struct {
policies CompressionMap
}

// NewStaticCompressor returns a new static Compressor.
func NewStaticCompressor(policies CompressionMap) Compressor {
return staticCompressor{policies: policies}
}

func (m staticCompressor) ID(p Policy) (int64, bool) {
i, ok := m.policies[p]
return i, ok
}

type dynamicCompressor struct {
sync.RWMutex
runtime.Value

policies CompressionMap

proto *schema.ActivePolicies
}

// NewDynamicCompressor returns a new dynamic Compressor.
func NewDynamicCompressor(opts Options) Compressor {
compressor := &dynamicCompressor{
policies: make(CompressionMap),
proto: &schema.ActivePolicies{},
}

valueOpts := runtime.NewOptions().
SetInstrumentOptions(opts.InstrumentOptions()).
SetInitWatchTimeout(opts.InitWatchTimeout()).
SetKVStore(opts.KVStore()).
SetUnmarshalFn(compressor.toCompressionMap).
SetProcessFn(compressor.process)

compressor.Value = runtime.NewValue(opts.CompressionKey(), valueOpts)

return compressor
}

func (c *dynamicCompressor) ID(p Policy) (int64, bool) {
c.RLock()
i, ok := c.policies[p]
c.RUnlock()
return i, ok
}

func (c *dynamicCompressor) process(value interface{}) error {
c.Lock()

p := value.(CompressionMap)
c.policies = p

c.Unlock()
return nil
}

func (c *dynamicCompressor) toCompressionMap(v kv.Value) (interface{}, error) {
if v == nil {
return nil, errNilValue
}

c.Lock()
defer c.Unlock()

if err := v.Unmarshal(c.proto); err != nil {
return nil, err
}

compressedPolicies := c.proto.GetCompressedPolicies()
if compressedPolicies == nil {
return nil, errNilCompressedPolicies
}

return NewCompressionMap(compressedPolicies)
}
Loading

0 comments on commit 09fe1e7

Please sign in to comment.