Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Support policy bitflags in msgpack and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed May 16, 2017
1 parent 21e1131 commit 0b35bfe
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 31 deletions.
49 changes: 39 additions & 10 deletions policy/hpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,22 @@ func (i BitflagIterator) Index() int {
return i.idx
}

// Encoder encodes policies into a bitflag.
type Encoder struct {
// Encoder is capable of encoding policies into a bitflag representation.
type Encoder interface {
Encode(policies, buffer []Policy) (Bitflag, []Policy)

Reset()
}

// encoder encodes policies into a bitflag.
type encoder struct {
dynamicTable map[Policy]uint
}

// NewEncoder returns a new Encoder.
func NewEncoder() Encoder {
dt := make(map[Policy]uint, defaultTableSize)
return Encoder{dt}
return &encoder{dt}
}

// Encode encodes a slice of polices into a bitflag. It returns a bitflag
Expand All @@ -113,16 +120,18 @@ func NewEncoder() Encoder {
// its table so that subsequent calls to Encode will be able to encode them
// in the returned bitflag as well. It accepts a buffer argument so that
// slices of policies can be reused between calls.
func (e *Encoder) Encode(policies, buffer []Policy) (Bitflag, []Policy) {
func (e *encoder) Encode(policies, buffer []Policy) (Bitflag, []Policy) {
var flag Bitflag
if buffer == nil {
buffer = make([]Policy, 0)
}

for _, policy := range policies {
id, ok := e.dynamicTable[policy]
if !ok && len(e.dynamicTable) <= defaultTableSize {
e.dynamicTable[policy] = uint(len(e.dynamicTable))
if !ok {
if len(e.dynamicTable) <= defaultTableSize {
e.dynamicTable[policy] = uint(len(e.dynamicTable))
}
buffer = append(buffer, policy)
continue
}
Expand All @@ -133,15 +142,31 @@ func (e *Encoder) Encode(policies, buffer []Policy) (Bitflag, []Policy) {
return flag, buffer
}

// Decoder decodes a bitflag representing a slice of policies.
type Decoder struct {
// Reset clears the encoder's internal table of policies so that all
// policies will be new policies.
func (e *encoder) Reset() {
for key := range e.dynamicTable {
delete(e.dynamicTable, key)
}
}

// Decoder is capable of decoding a bitflag representation of policies into
// the corresponding policies.
type Decoder interface {
Decode(policies []Policy, flag Bitflag) (Bitflag, []Policy, error)

Reset()
}

// decoder decodes a bitflag representing a slice of policies.
type decoder struct {
dynamicTable []Policy
}

// NewDecoder returns a new decoder for policy bitflags.
func NewDecoder() Decoder {
dt := make([]Policy, 0, defaultTableSize)
return Decoder{dt}
return &decoder{dt}
}

// Decode decodes a policies bitflag, appending the associated policies
Expand All @@ -150,7 +175,7 @@ func NewDecoder() Decoder {
// to Decode. Decode also returns a bitflag representing all the policies
// which were passed into it, including those in both the policies bitflag
// and the slice of policies.
func (d *Decoder) Decode(
func (d *decoder) Decode(
policies []Policy,
flag Bitflag,
) (Bitflag, []Policy, error) {
Expand Down Expand Up @@ -189,3 +214,7 @@ func (d *Decoder) Decode(

return flag, policies, nil
}

// Reset clears the decoder's internal table of policies so that all
// policies will be new policies.
func (d *decoder) Reset() { d.dynamicTable = d.dynamicTable[:0] }
77 changes: 76 additions & 1 deletion policy/hpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,60 @@ func TestEncoder(t *testing.T) {
}
}

func TestEncoderReset(t *testing.T) {
var (
enc = NewEncoder()
policy = NewPolicy(time.Second, xtime.Second, time.Hour)
policies = []Policy{policy}
buffer = make([]Policy, 0)
expectedBitflag = EmptyBitflag
)

actualBitflag, actualPolicies := enc.Encode(policies, buffer)
require.Equal(t, expectedBitflag, actualBitflag)
require.Equal(t, policies, actualPolicies)

enc.Reset()

// After a Reset we expect that the previously seen policy will not be
// encoded in the bitflag.
actualBitflag, actualPolicies = enc.Encode(policies, buffer)
require.Equal(t, expectedBitflag, actualBitflag)
require.Equal(t, policies, actualPolicies)
}

func TestEncoderOverflow(t *testing.T) {
var (
enc = NewEncoder()
policies = make([]Policy, 0, 64)
expectedPolicies = make([]Policy, 0, 1)
buffer = make([]Policy, 0)
expectedBitflag = EmptyBitflag
)

for i := 0; i <= 64; i++ {
policy := NewPolicy(time.Duration(i)*time.Second, xtime.Second, time.Hour)
policies = append(policies, policy)
if i == 64 {
expectedPolicies = append(expectedPolicies, policy)
continue
}
expectedBitflag = expectedBitflag.Set(uint(i))
}

actualBitflag, actualPolicies := enc.Encode(policies, buffer)
require.Equal(t, EmptyBitflag, actualBitflag)
require.Equal(t, policies, actualPolicies)

buffer = buffer[:0]

// An encoder can only encode up to 63 policies so any policies over that
// limit should not be encoded in the bitflag.
actualBitflag, actualPolicies = enc.Encode(policies, buffer)
require.Equal(t, expectedBitflag, actualBitflag)
require.Equal(t, expectedPolicies, actualPolicies)
}

func TestDecoder(t *testing.T) {
var (
dec = NewDecoder()
Expand Down Expand Up @@ -185,7 +239,7 @@ func TestDecoder(t *testing.T) {
}
}

func TestDecodeError(t *testing.T) {
func TestDecoderError(t *testing.T) {
var (
dec = NewDecoder()
policies = make([]Policy, 0)
Expand All @@ -198,6 +252,27 @@ func TestDecodeError(t *testing.T) {
require.Error(t, err)
}

func TestDecoderReset(t *testing.T) {
var (
dec = NewDecoder()
policy = NewPolicy(time.Second, xtime.Second, time.Hour)
policies = []Policy{policy}
bitflag = EmptyBitflag
)

_, _, err := dec.Decode(policies, bitflag)
require.NoError(t, err)

dec.Reset()
bitflag = bitflag.Set(0)
policies = []Policy{}

// After a Reset the decoder should not be aware of any previously seen
// policies.
_, _, err = dec.Decode(policies, bitflag)
require.Error(t, err)
}

func TestRoundTrip(t *testing.T) {
var (
enc = NewEncoder()
Expand Down
2 changes: 1 addition & 1 deletion protocol/msgpack/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const (
numMetricFields = 3
numDefaultStagedPoliciesListFields = 1
numCustomStagedPoliciesListFields = 2
numStagedPoliciesFields = 3
numStagedPoliciesFields = 4
numPolicyFields = 2
numKnownResolutionFields = 2
numUnknownResolutionFields = 3
Expand Down
25 changes: 22 additions & 3 deletions protocol/msgpack/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"github.com/m3db/m3metrics/policy"
)

const (
defaultPolicyBufferSize = 8
)

// Various object-level encoding functions to facilitate testing.
type encodeRootObjectFn func(objType objectType)
type encodeCounterWithPoliciesListFn func(cp unaggregated.CounterWithPoliciesList)
Expand All @@ -40,6 +44,9 @@ type encodePoliciesListFn func(spl policy.PoliciesList)
type unaggregatedEncoder struct {
encoderBase

policiesEncoder policy.Encoder
policiesBuffer []policy.Policy

encodeRootObjectFn encodeRootObjectFn
encodeCounterWithPoliciesListFn encodeCounterWithPoliciesListFn
encodeBatchTimerWithPoliciesListFn encodeBatchTimerWithPoliciesListFn
Expand All @@ -52,7 +59,11 @@ type unaggregatedEncoder struct {

// NewUnaggregatedEncoder creates a new unaggregated encoder.
func NewUnaggregatedEncoder(encoder BufferedEncoder) UnaggregatedEncoder {
enc := &unaggregatedEncoder{encoderBase: newBaseEncoder(encoder)}
enc := &unaggregatedEncoder{
encoderBase: newBaseEncoder(encoder),
policiesEncoder: policy.NewEncoder(),
policiesBuffer: make([]policy.Policy, 0, defaultPolicyBufferSize),
}

enc.encodeRootObjectFn = enc.encodeRootObject
enc.encodeCounterWithPoliciesListFn = enc.encodeCounterWithPoliciesList
Expand All @@ -66,8 +77,11 @@ func NewUnaggregatedEncoder(encoder BufferedEncoder) UnaggregatedEncoder {
return enc
}

func (enc *unaggregatedEncoder) Encoder() BufferedEncoder { return enc.encoder() }
func (enc *unaggregatedEncoder) Reset(encoder BufferedEncoder) { enc.reset(encoder) }
func (enc *unaggregatedEncoder) Encoder() BufferedEncoder { return enc.encoder() }
func (enc *unaggregatedEncoder) Reset(encoder BufferedEncoder) {
enc.reset(encoder)
enc.policiesEncoder.Reset()
}

func (enc *unaggregatedEncoder) EncodeCounter(c unaggregated.Counter) error {
if err := enc.err(); err != nil {
Expand Down Expand Up @@ -188,6 +202,11 @@ func (enc *unaggregatedEncoder) encodeStagedPolicies(sp policy.StagedPolicies) {
enc.encodeVarint(sp.CutoverNanos)
enc.encodeBool(sp.Tombstoned)
policies, _ := sp.Policies()
// NB(jeromefroe): We need to ensure the policies buffer has been reset
// before being passed to Encode.
enc.policiesBuffer = enc.policiesBuffer[:0]
bitflag, policies := enc.policiesEncoder.Encode(policies, enc.policiesBuffer)
enc.encodeVarint(int64(bitflag))
enc.encodeArrayLen(len(policies))
for _, policy := range policies {
enc.encodePolicy(policy)
Expand Down
Loading

0 comments on commit 0b35bfe

Please sign in to comment.