Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add migration decoder (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Apr 12, 2018
1 parent f398976 commit 3ef5d19
Show file tree
Hide file tree
Showing 9 changed files with 1,399 additions and 64 deletions.
130 changes: 130 additions & 0 deletions encoding/migration/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"fmt"

"github.com/m3db/m3metrics/encoding"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3metrics/metric/unaggregated"
"github.com/m3db/m3metrics/policy"
)

func toUnaggregatedMessageUnion(
metric unaggregated.MetricUnion,
policiesList policy.PoliciesList,
) (encoding.UnaggregatedMessageUnion, error) {
metadatas := toStagedMetadatas(policiesList)
switch metric.Type {
case unaggregated.CounterType:
return encoding.UnaggregatedMessageUnion{
Type: encoding.CounterWithMetadatasType,
CounterWithMetadatas: unaggregated.CounterWithMetadatas{
Counter: metric.Counter(),
StagedMetadatas: metadatas,
},
}, nil
case unaggregated.BatchTimerType:
return encoding.UnaggregatedMessageUnion{
Type: encoding.BatchTimerWithMetadatasType,
BatchTimerWithMetadatas: unaggregated.BatchTimerWithMetadatas{
BatchTimer: metric.BatchTimer(),
StagedMetadatas: metadatas,
},
}, nil
case unaggregated.GaugeType:
return encoding.UnaggregatedMessageUnion{
Type: encoding.GaugeWithMetadatasType,
GaugeWithMetadatas: unaggregated.GaugeWithMetadatas{
Gauge: metric.Gauge(),
StagedMetadatas: metadatas,
},
}, nil
default:
return encoding.UnaggregatedMessageUnion{}, fmt.Errorf("unknown metric type: %v", metric.Type)
}
}

// TODO: look into reuse metadatas during conversion.
func toStagedMetadatas(
policiesList policy.PoliciesList,
) metadata.StagedMetadatas {
numStagedPolicies := len(policiesList)
res := make(metadata.StagedMetadatas, 0, numStagedPolicies)
for _, sp := range policiesList {
if sp.IsDefault() {
sm := metadata.DefaultStagedMetadata
res = append(res, sm)
continue
}
sm := metadata.StagedMetadata{}
sm.CutoverNanos = sp.CutoverNanos
sm.Tombstoned = sp.Tombstoned
policies, _ := sp.Policies()
sm.Metadata = toMetadata(policies)
res = append(res, sm)
}
return res
}

// TODO: look into reuse metadata during conversion.
func toMetadata(policies []policy.Policy) metadata.Metadata {
res := metadata.Metadata{}
for _, p := range policies {
// Find if there is an existing pipeline in the set of metadatas
// with the same aggregation ID.
pipelineIdx := -1
for i := 0; i < len(res.Pipelines); i++ {
if p.AggregationID == res.Pipelines[i].AggregationID {
pipelineIdx = i
break
}
}
// If there is no existing pipeline with the same aggregation ID,
// create a new pipeline with the aggregation ID.
if pipelineIdx == -1 {
res.Pipelines = append(res.Pipelines, metadata.PipelineMetadata{
AggregationID: p.AggregationID,
})
pipelineIdx = len(res.Pipelines) - 1
}

// Find if the storage policy already exists in the set of storage
// policies in the corresponding pipeline.
pipelines := res.Pipelines
policyIdx := -1
for i := 0; i < len(pipelines[pipelineIdx].StoragePolicies); i++ {
if pipelines[pipelineIdx].StoragePolicies[i] == p.StoragePolicy {
policyIdx = i
break
}
}
// If the policy already exists in the pipeline, nothing to do.
if policyIdx != -1 {
continue
}
// Otherwise we append the policy to the end. This maintains the original
// ordering (if any) of the policies within each pipeline.
pipelines[pipelineIdx].StoragePolicies = append(pipelines[pipelineIdx].StoragePolicies, p.StoragePolicy)
}
return res
}
222 changes: 222 additions & 0 deletions encoding/migration/convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"strings"
"testing"
"time"

"github.com/m3db/m3metrics/aggregation"
"github.com/m3db/m3metrics/encoding"
"github.com/m3db/m3metrics/metadata"
"github.com/m3db/m3metrics/metric/unaggregated"
"github.com/m3db/m3metrics/policy"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/require"
)

var (
testConvertCounterUnion = unaggregated.MetricUnion{
Type: unaggregated.CounterType,
ID: []byte("testConvertCounter"),
CounterVal: 1234,
}
testConvertBatchTimerUnion = unaggregated.MetricUnion{
Type: unaggregated.BatchTimerType,
ID: []byte("testConvertBatchTimer"),
BatchTimerVal: []float64{222.22, 345.67, 901.23345},
}
testConvertGaugeUnion = unaggregated.MetricUnion{
Type: unaggregated.GaugeType,
ID: []byte("testConvertGauge"),
GaugeVal: 123.456,
}
testConvertPoliciesList = policy.PoliciesList{
// Default staged policies.
policy.DefaultStagedPolicies,

// Single pipeline.
policy.NewStagedPolicies(
1234,
false,
[]policy.Policy{
policy.NewPolicy(
policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour),
aggregation.DefaultID,
),
policy.NewPolicy(
policy.NewStoragePolicy(time.Minute, xtime.Minute, 6*time.Hour),
aggregation.DefaultID,
),
// A duplicate policy.
policy.NewPolicy(
policy.NewStoragePolicy(time.Minute, xtime.Minute, 6*time.Hour),
aggregation.DefaultID,
),
},
),

// Multiple pipelines.
policy.NewStagedPolicies(
5678,
true,
[]policy.Policy{
policy.NewPolicy(
policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour),
aggregation.DefaultID,
),
policy.NewPolicy(
policy.NewStoragePolicy(time.Minute, xtime.Minute, 6*time.Hour),
aggregation.MustCompressTypes(aggregation.Count, aggregation.Last),
),
policy.NewPolicy(
policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 24*time.Hour),
aggregation.MustCompressTypes(aggregation.Count, aggregation.Last),
),
policy.NewPolicy(
policy.NewStoragePolicy(time.Hour, xtime.Hour, 30*24*time.Hour),
aggregation.MustCompressTypes(aggregation.Sum),
),
},
),
}
testConvertCounter = unaggregated.Counter{
ID: []byte("testConvertCounter"),
Value: 1234,
}
testConvertBatchTimer = unaggregated.BatchTimer{
ID: []byte("testConvertBatchTimer"),
Values: []float64{222.22, 345.67, 901.23345},
}
testConvertGauge = unaggregated.Gauge{
ID: []byte("testConvertGauge"),
Value: 123.456,
}
testConvertStagedMetadatas = metadata.StagedMetadatas{
metadata.DefaultStagedMetadata,
metadata.StagedMetadata{
CutoverNanos: 1234,
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
{
AggregationID: aggregation.DefaultID,
StoragePolicies: []policy.StoragePolicy{
policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour),
policy.NewStoragePolicy(time.Minute, xtime.Minute, 6*time.Hour),
},
},
},
},
},
metadata.StagedMetadata{
CutoverNanos: 5678,
Tombstoned: true,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
{
AggregationID: aggregation.DefaultID,
StoragePolicies: []policy.StoragePolicy{
policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour),
},
},
{
AggregationID: aggregation.MustCompressTypes(aggregation.Count, aggregation.Last),
StoragePolicies: []policy.StoragePolicy{
policy.NewStoragePolicy(time.Minute, xtime.Minute, 6*time.Hour),
policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 24*time.Hour),
},
},
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: []policy.StoragePolicy{
policy.NewStoragePolicy(time.Hour, xtime.Hour, 30*24*time.Hour),
},
},
},
},
},
}
)

func TestToStagedMetadatas(t *testing.T) {
require.Equal(t, testConvertStagedMetadatas, toStagedMetadatas(testConvertPoliciesList))
}

func TestToUnaggregatedMessageUnion(t *testing.T) {
inputs := []struct {
metricUnion unaggregated.MetricUnion
policiesList policy.PoliciesList
}{
{
metricUnion: testConvertCounterUnion,
policiesList: testConvertPoliciesList,
},
{
metricUnion: testConvertBatchTimerUnion,
policiesList: testConvertPoliciesList,
},
{
metricUnion: testConvertGaugeUnion,
policiesList: testConvertPoliciesList,
},
}
expected := []encoding.UnaggregatedMessageUnion{
{
Type: encoding.CounterWithMetadatasType,
CounterWithMetadatas: unaggregated.CounterWithMetadatas{
Counter: testConvertCounter,
StagedMetadatas: testConvertStagedMetadatas,
},
},
{
Type: encoding.BatchTimerWithMetadatasType,
BatchTimerWithMetadatas: unaggregated.BatchTimerWithMetadatas{
BatchTimer: testConvertBatchTimer,
StagedMetadatas: testConvertStagedMetadatas,
},
},
{
Type: encoding.GaugeWithMetadatasType,
GaugeWithMetadatas: unaggregated.GaugeWithMetadatas{
Gauge: testConvertGauge,
StagedMetadatas: testConvertStagedMetadatas,
},
},
}

for i, input := range inputs {
res, err := toUnaggregatedMessageUnion(input.metricUnion, input.policiesList)
require.NoError(t, err)
require.Equal(t, expected[i], res)
}
}

func TestToUnaggregatedMessageUnionError(t *testing.T) {
invalidMetric := unaggregated.MetricUnion{
Type: unaggregated.UnknownType,
}
_, err := toUnaggregatedMessageUnion(invalidMetric, testConvertPoliciesList)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "unknown metric type"))
}
Loading

0 comments on commit 3ef5d19

Please sign in to comment.