From 53a2a083dd980a855d1ff3539e465d2d0c910c85 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Tue, 20 Mar 2018 17:44:58 -0400 Subject: [PATCH] Add storage policies to pipeline struct --- op/applied/type.go | 17 ++++++++++------- op/type.go | 16 +++++++++++----- op/type_test.go | 6 +++--- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/op/applied/type.go b/op/applied/type.go index b04a225..d54269c 100644 --- a/op/applied/type.go +++ b/op/applied/type.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3metrics/aggregation" "github.com/m3db/m3metrics/op" + "github.com/m3db/m3metrics/policy" ) // Rollup captures the rollup metadata after the operation is applied against a metric ID. @@ -37,15 +38,12 @@ type Rollup struct { } func (op Rollup) String() string { - var b bytes.Buffer - b.WriteString("{") - fmt.Fprintf(&b, "id: %s, ", op.ID) - fmt.Fprintf(&b, "aggregation: %v", op.AggregationID) - b.WriteString("}") - return b.String() + return fmt.Sprintf("{id: %s, aggregation: %v}", op.ID, op.AggregationID) } // Union is a union of different types of operation. +// NB: It does not contain an aggregation operation since that +// is already captured by the aggregation ID in the metadata. type Union struct { Type op.Type Aggregation op.Aggregation @@ -74,6 +72,9 @@ func (u Union) String() string { type Pipeline struct { // a list of pipeline operations. Operations []Union + // A list of storage policies that are applied to metrics + // generated from this pipeline. + StoragePolicies []policy.StoragePolicy } // IsEmpty determines whether a pipeline is empty. @@ -90,6 +91,8 @@ func (p Pipeline) String() string { b.WriteString(", ") } } - b.WriteString("]}") + b.WriteString("], ") + fmt.Fprintf(&b, "storagePolicies: %v", p.StoragePolicies) + b.WriteString("}") return b.String() } diff --git a/op/type.go b/op/type.go index b0c439a..463183b 100644 --- a/op/type.go +++ b/op/type.go @@ -25,6 +25,7 @@ import ( "fmt" "github.com/m3db/m3metrics/aggregation" + "github.com/m3db/m3metrics/policy" "github.com/m3db/m3metrics/transformation" ) @@ -41,12 +42,12 @@ const ( // Aggregation is an aggregation operation. type Aggregation struct { - // Type of aggregation performed. - Type aggregation.Type + // Type of aggregations performed. + ID aggregation.ID } func (op Aggregation) String() string { - return op.Type.String() + return op.ID.String() } // Transformation is a transformation operation. @@ -113,8 +114,11 @@ func (u Union) String() string { // Pipeline is a pipeline of operations. type Pipeline struct { - // a list of pipeline operations. + // A list of pipeline operations. Operations []Union + // A list of storage policies that are applied to metrics + // generated from this pipeline. + StoragePolicies []policy.StoragePolicy } func (p Pipeline) String() string { @@ -126,6 +130,8 @@ func (p Pipeline) String() string { b.WriteString(", ") } } - b.WriteString("]}") + b.WriteString("], ") + fmt.Fprintf(&b, "storagePolicies: %v", p.StoragePolicies) + b.WriteString("}") return b.String() } diff --git a/op/type_test.go b/op/type_test.go index d1609f3..41cbc4c 100644 --- a/op/type_test.go +++ b/op/type_test.go @@ -39,7 +39,7 @@ func TestPipelineString(t *testing.T) { Operations: []Union{ { Type: AggregationType, - Aggregation: Aggregation{Type: aggregation.Last}, + Aggregation: Aggregation{ID: aggregation.MustCompressTypes(aggregation.Last)}, }, { Type: TransformationType, @@ -55,7 +55,7 @@ func TestPipelineString(t *testing.T) { }, }, }, - expected: "{operations: [{aggregation: Last}, {transformation: PerSecond}, {rollup: {name: foo, tags: [tag1, tag2], aggregation: Sum}}]}", + expected: "{operations: [{aggregation: Last}, {transformation: PerSecond}, {rollup: {name: foo, tags: [tag1, tag2], aggregation: Sum}}], storagePolicies: []}", }, { p: Pipeline{ @@ -65,7 +65,7 @@ func TestPipelineString(t *testing.T) { }, }, }, - expected: "{operations: [{unknown op type: Type(10)}]}", + expected: "{operations: [{unknown op type: Type(10)}], storagePolicies: []}", }, }