Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Provide APIs to match multiple policies within a time range and send …
Browse files Browse the repository at this point in the history
…multiple policies across network
  • Loading branch information
xichen2020 committed May 3, 2017
1 parent 0535033 commit 38323d3
Show file tree
Hide file tree
Showing 24 changed files with 1,076 additions and 839 deletions.
14 changes: 7 additions & 7 deletions metric/aggregated/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,25 @@ import (
// Metric is a metric, which is essentially a named value at certain time.
type Metric struct {
metric.ID
Timestamp time.Time
Value float64
TimeNs int64
Value float64
}

// String is the string representation of a metric.
func (m Metric) String() string {
return fmt.Sprintf(
"{id:%s,timestamp:%s,value:%f}",
m.ID.String(),
m.Timestamp.String(),
time.Unix(0, m.TimeNs).String(),
m.Value,
)
}

// ChunkedMetric is a metric with a chunked ID.
type ChunkedMetric struct {
metric.ChunkedID
Timestamp time.Time
Value float64
TimeNs int64
Value float64
}

// RawMetric is a metric in its raw form (e.g., encoded bytes associated with
Expand All @@ -58,8 +58,8 @@ type RawMetric interface {
// ID is the metric identifier.
ID() (metric.ID, error)

// Timestamp is the metric timestamp.
Timestamp() (time.Time, error)
// TimeNs is the metric timestamp in nanoseconds.
TimeNs() (int64, error)

// Value is the metric value.
Value() (float64, error)
Expand Down
18 changes: 9 additions & 9 deletions metric/unaggregated/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,22 @@ type Gauge struct {
Value float64
}

// CounterWithPolicies is a counter with applicable policies.
type CounterWithPolicies struct {
// CounterWithPoliciesList is a counter with applicable policies list.
type CounterWithPoliciesList struct {
Counter
policy.VersionedPolicies
policy.PoliciesList
}

// BatchTimerWithPolicies is a batch timer with applicable policies.
type BatchTimerWithPolicies struct {
// BatchTimerWithPoliciesList is a batch timer with applicable policies list.
type BatchTimerWithPoliciesList struct {
BatchTimer
policy.VersionedPolicies
policy.PoliciesList
}

// GaugeWithPolicies is a gauge with applicable policies.
type GaugeWithPolicies struct {
// GaugeWithPoliciesList is a gauge with applicable policies list.
type GaugeWithPoliciesList struct {
Gauge
policy.VersionedPolicies
policy.PoliciesList
}

// MetricUnion is a union of different types of metrics, only one of which is valid
Expand Down
96 changes: 56 additions & 40 deletions policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ var (
// EmptyPolicy represents an empty policy.
EmptyPolicy Policy

// EmptyVersionedPolicies represents an empty VersionPolicies.
EmptyVersionedPolicies VersionedPolicies
// EmptyStagedPolicies represents an empty staged policies.
EmptyStagedPolicies StagedPolicies

errNilPolicySchema = errors.New("nil policy schema")
// DefaultPoliciesList represents a default policies list.
DefaultPoliciesList = PoliciesList{EmptyStagedPolicies}

// defaultPolicies are the default policies.
// TODO(xichen): possibly make this dynamically configurable in the future.
defaultPolicies = []Policy{
NewPolicy(10*time.Second, xtime.Second, 2*24*time.Hour),
NewPolicy(time.Minute, xtime.Minute, 30*24*time.Hour),
}

errNilPolicySchema = errors.New("nil policy schema")
)

// Policy represents the resolution and retention period metric datapoints
Expand Down Expand Up @@ -148,66 +151,79 @@ func (pr ByResolutionAsc) Less(i, j int) bool {
return pr[i].Resolution().Precision < pr[i].Resolution().Precision
}

// VersionedPolicies represent a list of policies at a specified version.
type VersionedPolicies struct {
// Version is the version of the policies.
Version int

// StagedPolicies represent a list of policies at a specified version.
type StagedPolicies struct {
// Cutover is when the policies take effect.
Cutover time.Time
CutoverNs int64

// isDefault determines whether the policies are the default policies.
isDefault bool
// Tombstoned determines whether the associated (rollup) metric has been tombstoned.
Tombstoned bool

// policies represent the list of policies.
policies []Policy
}

// IsDefault determines whether the policies are the default policies.
func (vp VersionedPolicies) IsDefault() bool { return vp.isDefault }
// NewStagedPolicies create a new staged policies.
func NewStagedPolicies(cutoverNs int64, tombstoned bool, policies []Policy) StagedPolicies {
return StagedPolicies{CutoverNs: cutoverNs, Tombstoned: tombstoned, policies: policies}
}

// Reset resets the staged policies.
func (p *StagedPolicies) Reset() { *p = EmptyStagedPolicies }

// Policies returns the policies.
func (vp VersionedPolicies) Policies() []Policy {
if vp.isDefault {
func (p StagedPolicies) Policies() []Policy {
if p.hasDefaultPolicies() {
return defaultPolicies
}
return vp.policies
return p.policies
}

// String is the representation of versioned policies.
func (vp VersionedPolicies) String() string {
// SamePolicies returns whether two staged policies have the same policy list,
// assuming the policies are sorted in the same order.
func (p StagedPolicies) SamePolicies(other StagedPolicies) bool {
if p.hasDefaultPolicies() && other.hasDefaultPolicies() {
return true
}
currPolicies := p.Policies()
otherPolicies := other.Policies()
if len(currPolicies) != len(otherPolicies) {
return false
}
for i := 0; i < len(currPolicies); i++ {
if currPolicies[i] != otherPolicies[i] {
return false
}
}
return true
}

// String is the representation of staged policies.
func (p StagedPolicies) String() string {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("{version:%d,cutover:%s,isDefault:%v,policies:[", vp.Version, vp.Cutover.String(), vp.isDefault))
for i := range vp.policies {
buf.WriteString(vp.policies[i].String())
if i < len(vp.policies)-1 {
buf.WriteString(fmt.Sprintf("{cutover:%s,tombstoned:%v,policies:[", time.Unix(0, p.CutoverNs).String(), p.Tombstoned))
for i := range p.policies {
buf.WriteString(p.policies[i].String())
if i < len(p.policies)-1 {
buf.WriteString(",")
}
}
buf.WriteString("]}")
return buf.String()
}

// Reset resets the versioned policies.
func (vp *VersionedPolicies) Reset() {
*vp = EmptyVersionedPolicies
func (p StagedPolicies) isEmpty() bool {
return p.CutoverNs == 0 && !p.Tombstoned && p.hasDefaultPolicies()
}

// DefaultVersionedPolicies creates a new default versioned policies.
func DefaultVersionedPolicies(version int, cutover time.Time) VersionedPolicies {
return VersionedPolicies{
Version: version,
Cutover: cutover,
isDefault: true,
}
func (p StagedPolicies) hasDefaultPolicies() bool {
return len(p.policies) == 0
}

// CustomVersionedPolicies creates a new custom versioned policies.
func CustomVersionedPolicies(version int, cutover time.Time, policies []Policy) VersionedPolicies {
return VersionedPolicies{
Version: version,
Cutover: cutover,
isDefault: false,
policies: policies,
}
// PoliciesList is a list of staged policies.
type PoliciesList []StagedPolicies

// IsDefault determines whether this is a default policies list.
func (l PoliciesList) IsDefault() bool {
return len(l) == 1 && l[0].isEmpty()
}
74 changes: 35 additions & 39 deletions policy/policy_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,81 +25,77 @@ import (
"time"
)

func BenchmarkVersionedPoliciesAsStruct(b *testing.B) {
vp := CustomVersionedPolicies(InitPolicyVersion, time.Now(), defaultPolicies)
var (
testNowNs = time.Now().UnixNano()
)

func BenchmarkStagedPoliciesAsStruct(b *testing.B) {
sp := NewStagedPolicies(testNowNs, false, defaultPolicies)
for n := 0; n < b.N; n++ {
validatePolicyByValue(b, vp)
validatePolicyByValue(b, sp)
}
}

func BenchmarkVersionedPoliciesAsPointer(b *testing.B) {
vp := CustomVersionedPolicies(InitPolicyVersion, time.Now(), defaultPolicies)
func BenchmarkStagedPoliciesAsPointer(b *testing.B) {
sp := NewStagedPolicies(testNowNs, false, defaultPolicies)
for n := 0; n < b.N; n++ {
validatePolicyByPointer(b, &vp)
validatePolicyByPointer(b, &sp)
}
}

func BenchmarkVersionedPoliciesAsInterface(b *testing.B) {
vp := &testVersionedPolicies{version: InitPolicyVersion, cutover: time.Now(), policies: defaultPolicies}
func BenchmarkStagedPoliciesAsInterface(b *testing.B) {
sp := &testStagedPolicies{cutoverNs: testNowNs, policies: defaultPolicies}
for n := 0; n < b.N; n++ {
validatePolicyByInterface(b, vp)
validatePolicyByInterface(b, sp)
}
}

func BenchmarkVersionedPoliciesAsStructExported(b *testing.B) {
vp := testVersionedPolicies{version: InitPolicyVersion, cutover: time.Now(), policies: defaultPolicies}
func BenchmarkStagedPoliciesAsStructExported(b *testing.B) {
sp := testStagedPolicies{cutoverNs: testNowNs, policies: defaultPolicies}
for n := 0; n < b.N; n++ {
validatePolicyByStructExported(b, vp)
validatePolicyByStructExported(b, sp)
}
}

type testVersionedPoliciesInt interface {
Version() int
type testStagedPoliciesInt64 interface {
CutoverNs() int64
}

// VersionedPolicies represent a list of policies at a specified version.
type testVersionedPolicies struct {
// Version is the version of the policies.
version int

// Cutover is when the policies take effect.
cutover time.Time

// isDefault determines whether the policies are the default policies.
isDefault bool

// policies represent the list of policies.
policies []Policy
// StagedPolicies represent a list of policies at a specified version.
type testStagedPolicies struct {
cutoverNs int64
tombstoned bool
policies []Policy
}

func (v testVersionedPolicies) ValVersion() int {
return v.version
func (v testStagedPolicies) ValCutoverNs() int64 {
return v.cutoverNs
}

func (v *testVersionedPolicies) Version() int {
return v.version
func (v *testStagedPolicies) CutoverNs() int64 {
return v.cutoverNs
}

func validatePolicyByValue(b *testing.B, vps VersionedPolicies) {
if vps.Version != InitPolicyVersion {
func validatePolicyByValue(b *testing.B, sp StagedPolicies) {
if sp.CutoverNs != testNowNs {
b.FailNow()
}
}

func validatePolicyByPointer(b *testing.B, vps *VersionedPolicies) {
if vps.Version != InitPolicyVersion {
func validatePolicyByPointer(b *testing.B, sp *StagedPolicies) {
if sp.CutoverNs != testNowNs {
b.FailNow()
}
}

func validatePolicyByInterface(b *testing.B, vps testVersionedPoliciesInt) {
if vps.Version() != InitPolicyVersion {
func validatePolicyByInterface(b *testing.B, sp testStagedPoliciesInt64) {
if sp.CutoverNs() != testNowNs {
b.FailNow()
}
}

func validatePolicyByStructExported(b *testing.B, vps testVersionedPolicies) {
if vps.ValVersion() != InitPolicyVersion {
func validatePolicyByStructExported(b *testing.B, sp testStagedPolicies) {
if sp.ValCutoverNs() != testNowNs {
b.FailNow()
}
}
38 changes: 14 additions & 24 deletions policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,20 @@ func TestPoliciesByResolutionAsc(t *testing.T) {
require.Equal(t, expected, inputs)
}

func TestDefaultVersionedPolicies(t *testing.T) {
var (
version = 2
cutover = time.Now()
)
vp := DefaultVersionedPolicies(version, cutover)
require.Equal(t, version, vp.Version)
require.Equal(t, cutover, vp.Cutover)
require.True(t, vp.IsDefault())
require.Equal(t, defaultPolicies, vp.Policies())
func TestStagedPoliciesHasDefaultPolicies(t *testing.T) {
sp := NewStagedPolicies(testNowNs, true, nil)
require.Equal(t, testNowNs, sp.CutoverNs)
require.True(t, sp.hasDefaultPolicies())
require.Equal(t, defaultPolicies, sp.Policies())
}

func TestCustomVersionedPolicies(t *testing.T) {
var (
version = 2
cutover = time.Now()
policies = []Policy{
NewPolicy(10*time.Second, xtime.Second, 6*time.Hour),
NewPolicy(10*time.Second, xtime.Second, 2*time.Hour),
}
)
vp := CustomVersionedPolicies(version, cutover, policies)
require.Equal(t, version, vp.Version)
require.Equal(t, cutover, vp.Cutover)
require.False(t, vp.IsDefault())
require.Equal(t, policies, vp.Policies())
func TestStagedPoliciesHasCustomPolicies(t *testing.T) {
policies := []Policy{
NewPolicy(10*time.Second, xtime.Second, 6*time.Hour),
NewPolicy(10*time.Second, xtime.Second, 2*time.Hour),
}
sp := NewStagedPolicies(testNowNs, false, policies)
require.Equal(t, testNowNs, sp.CutoverNs)
require.False(t, sp.hasDefaultPolicies())
require.Equal(t, policies, sp.Policies())
}
4 changes: 2 additions & 2 deletions protocol/msgpack/aggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (enc *aggregatedEncoder) encodeMetricAsRaw(m aggregated.Metric) []byte {
enc.buf.resetData()
enc.encodeMetricProlog()
enc.buf.encodeID(m.ID)
enc.buf.encodeTime(m.Timestamp)
enc.buf.encodeVarint(m.TimeNs)
enc.buf.encodeFloat64(m.Value)
return enc.buf.encoder().Bytes()
}
Expand All @@ -112,7 +112,7 @@ func (enc *aggregatedEncoder) encodeChunkedMetricAsRaw(m aggregated.ChunkedMetri
enc.buf.resetData()
enc.encodeMetricProlog()
enc.buf.encodeChunkedID(m.ChunkedID)
enc.buf.encodeTime(m.Timestamp)
enc.buf.encodeVarint(m.TimeNs)
enc.buf.encodeFloat64(m.Value)
return enc.buf.encoder().Bytes()
}
Expand Down
Loading

0 comments on commit 38323d3

Please sign in to comment.