Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add protobuf message definitions and encoding/decoding APIs (#148) (#169
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xichen2020 committed Jun 7, 2018
1 parent f869595 commit f9d74d6
Show file tree
Hide file tree
Showing 100 changed files with 13,750 additions and 1,106 deletions.
2 changes: 1 addition & 1 deletion .ci
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ license_dir := .ci/uber-licence
license_node_modules := $(license_dir)/node_modules
auto_gen := .ci/auto-gen.sh
protoc_go_package := github.com/golang/protobuf/protoc-gen-go
proto_output_dir := generated/proto/schema
proto_output_dir := generated/proto
proto_rules_dir := generated/proto
mocks_output_dir := generated/mocks/mocks
mocks_rules_dir := generated/mocks
Expand Down
22 changes: 20 additions & 2 deletions aggregation/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package aggregation
import (
"fmt"

"github.com/m3db/m3metrics/generated/proto/schema"
"github.com/m3db/m3metrics/generated/proto/aggregationpb"
)

const (
Expand All @@ -45,7 +45,7 @@ var (
type ID [IDLen]uint64

// NewIDFromSchema creates an ID from schema.
func NewIDFromSchema(input []schema.AggregationType) (ID, error) {
func NewIDFromSchema(input []aggregationpb.AggregationType) (ID, error) {
aggTypes, err := NewTypesFromSchema(input)
if err != nil {
return DefaultID, err
Expand Down Expand Up @@ -98,3 +98,21 @@ func (id ID) String() string {
}
return aggTypes.String()
}

// ToProto converts the aggregation id to a protobuf message in place.
func (id ID) ToProto(pb *aggregationpb.AggregationID) error {
if IDLen != 1 {
return fmt.Errorf("id length %d cannot be represented by a single integer", IDLen)
}
pb.Id = id[0]
return nil
}

// FromProto converts the protobuf message to an aggregation id in place.
func (id *ID) FromProto(pb aggregationpb.AggregationID) error {
if IDLen != 1 {
return fmt.Errorf("id length %d cannot be represented by a single integer", IDLen)
}
(*id)[0] = pb.Id
return nil
}
61 changes: 24 additions & 37 deletions protocol/msgpack/schema_test.go → aggregation/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,39 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package msgpack
package aggregation

import (
"testing"

"github.com/m3db/m3metrics/generated/proto/aggregationpb"

"github.com/stretchr/testify/require"
)

func TestObjectTypeIsValid(t *testing.T) {
inputs := []objectType{
rootObjectType,
counterWithPoliciesListType,
batchTimerWithPoliciesListType,
gaugeWithPoliciesListType,
rawMetricWithStoragePolicyType,
counterType,
batchTimerType,
gaugeType,
metricType,
defaultPoliciesListType,
customPoliciesListType,
stagedPoliciesType,
storagePolicyType,
knownResolutionType,
unknownResolutionType,
knownRetentionType,
unknownRetentionType,
defaultAggregationID,
shortAggregationID,
longAggregationID,
policyType,
rawMetricWithStoragePolicyAndEncodeTimeType,
}
var (
testID = ID{6}
testIDProto = aggregationpb.AggregationID{Id: 6}
)

for _, input := range inputs {
require.True(t, input.isValid())
}
func TestIDToProto(t *testing.T) {
var pb aggregationpb.AggregationID
require.NoError(t, testID.ToProto(&pb))
require.Equal(t, testIDProto, pb)
}

func TestObjectTypeIsValidInvalidType(t *testing.T) {
inputs := []objectType{
unknownType,
numObjectTypes + 1,
}
func TestIDFromProto(t *testing.T) {
var res ID
require.NoError(t, res.FromProto(testIDProto))
require.Equal(t, testID, res)
}

for _, input := range inputs {
require.False(t, input.isValid())
}
func TestIDRoundTrip(t *testing.T) {
var (
pb aggregationpb.AggregationID
res ID
)
require.NoError(t, testID.ToProto(&pb))
require.NoError(t, res.FromProto(pb))
require.Equal(t, testID, res)
}
20 changes: 10 additions & 10 deletions aggregation/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"fmt"
"strings"

"github.com/m3db/m3metrics/generated/proto/schema"
"github.com/m3db/m3metrics/generated/proto/aggregationpb"
"github.com/m3db/m3x/pool"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ var (
type Type int

// NewTypeFromSchema creates an aggregation type from a schema.
func NewTypeFromSchema(input schema.AggregationType) (Type, error) {
func NewTypeFromSchema(input aggregationpb.AggregationType) (Type, error) {
aggType := Type(input)
if !aggType.IsValid() {
return UnknownType, fmt.Errorf("invalid aggregation type from schema: %s", input)
Expand Down Expand Up @@ -191,10 +191,10 @@ func (a Type) Quantile() (float64, bool) {
}

// Schema returns the schema of the aggregation type.
func (a Type) Schema() (schema.AggregationType, error) {
s := schema.AggregationType(a)
func (a Type) Schema() (aggregationpb.AggregationType, error) {
s := aggregationpb.AggregationType(a)
if err := validateSchemaType(s); err != nil {
return schema.AggregationType_UNKNOWN, err
return aggregationpb.AggregationType_UNKNOWN, err
}
return s, nil
}
Expand All @@ -214,8 +214,8 @@ func (a *Type) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

func validateSchemaType(a schema.AggregationType) error {
_, ok := schema.AggregationType_name[int32(a)]
func validateSchemaType(a aggregationpb.AggregationType) error {
_, ok := aggregationpb.AggregationType_name[int32(a)]
if !ok {
return fmt.Errorf("invalid schema aggregation type: %v", a)
}
Expand All @@ -235,7 +235,7 @@ func ParseType(str string) (Type, error) {
type Types []Type

// NewTypesFromSchema creates a list of aggregation types from a schema.
func NewTypesFromSchema(input []schema.AggregationType) (Types, error) {
func NewTypesFromSchema(input []aggregationpb.AggregationType) (Types, error) {
res := make([]Type, len(input))
for i, t := range input {
aggType, err := NewTypeFromSchema(t)
Expand Down Expand Up @@ -359,14 +359,14 @@ func (aggTypes Types) PooledQuantiles(p pool.FloatsPool) ([]float64, bool) {
}

// Schema returns the schema of the aggregation types.
func (aggTypes Types) Schema() ([]schema.AggregationType, error) {
func (aggTypes Types) Schema() ([]aggregationpb.AggregationType, error) {
// This is the same as returning an empty slice from the functionality perspective.
// It makes creating testing fixtures much simpler.
if aggTypes == nil {
return nil, nil
}

res := make([]schema.AggregationType, len(aggTypes))
res := make([]aggregationpb.AggregationType, len(aggTypes))
for i, aggType := range aggTypes {
s, err := aggType.Schema()
if err != nil {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
96 changes: 96 additions & 0 deletions encoding/protobuf/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package protobuf

import (
"math"

"github.com/m3db/m3x/pool"
)

// Buffer contains a byte slice backed by an optional bytes pool.
type Buffer struct {
buf []byte
pool pool.BytesPool
closed bool
}

// NewBuffer create a new buffer.
func NewBuffer(buf []byte, p pool.BytesPool) Buffer {
return Buffer{buf: buf, pool: p}
}

// Bytes returns the raw byte slice.
func (b *Buffer) Bytes() []byte { return b.buf }

// Close closes the buffer.
func (b *Buffer) Close() {
if b.closed {
return
}
b.closed = true
if b.pool != nil && b.buf != nil {
b.pool.Put(b.buf)
}
b.pool = nil
b.buf = nil
}

type copyDataMode int

const (
dontCopyData copyDataMode = iota
copyData
)

// ensureBufferSize returns a buffer with at least the specified target size.
// If the specified buffer has enough capacity, it is returned as is. Otherwise,
// a new buffer is allocated with at least the specified target size, and the
// specified buffer is returned to pool if possible.
func ensureBufferSize(
buf []byte,
p pool.BytesPool,
targetSize int,
copyDataMode copyDataMode,
) []byte {
bufSize := len(buf)
if bufSize >= targetSize {
return buf
}
newSize := int(math.Max(float64(targetSize), float64(bufSize*2)))
newBuf := allocate(p, newSize)
if copyDataMode == copyData {
copy(newBuf, buf)
}
if p != nil && buf != nil {
p.Put(buf)
}
return newBuf
}

// allocate allocates a byte slice with at least the specified size.
func allocate(p pool.BytesPool, targetSize int) []byte {
if p == nil {
return make([]byte, targetSize)
}
b := p.Get(targetSize)
return b[:cap(b)]
}
Loading

0 comments on commit f9d74d6

Please sign in to comment.