Skip to content

Commit

Permalink
Add gRPC client
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Mar 5, 2019
1 parent ffd5077 commit e5b40aa
Show file tree
Hide file tree
Showing 37 changed files with 1,770 additions and 493 deletions.
34 changes: 34 additions & 0 deletions calculation/calculation_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/generated/proto/servicepb"
)

// Op represents a calculation operator.
Expand Down Expand Up @@ -113,6 +114,39 @@ func (f *Op) UnmarshalJSON(data []byte) error {
return nil
}

// ToOptionalProto converts the calculation op to an optional calculation op protobuf message.
func (f *Op) ToOptionalProto() (servicepb.OptionalCalculationOp, error) {
if f == nil {
noValue := &servicepb.OptionalCalculationOp_NoValue{NoValue: true}
return servicepb.OptionalCalculationOp{Value: noValue}, nil
}
v, err := f.ToProto()
if err != nil {
return servicepb.OptionalCalculationOp{}, err
}
return servicepb.OptionalCalculationOp{
Value: &servicepb.OptionalCalculationOp_Data{Data: v},
}, nil
}

// ToProto converts the calculation op to a calculation op protobuf message.
func (f Op) ToProto() (servicepb.Calculation_Op, error) {
switch f {
case Count:
return servicepb.Calculation_COUNT, nil
case Sum:
return servicepb.Calculation_SUM, nil
case Avg:
return servicepb.Calculation_AVG, nil
case Min:
return servicepb.Calculation_MIN, nil
case Max:
return servicepb.Calculation_MAX, nil
default:
return servicepb.Calculation_UNKNOWNOP, fmt.Errorf("invalid calculation op %v", f)
}
}

var (
validFilterOps = map[Op]struct{}{
Count: struct{}{},
Expand Down
35 changes: 35 additions & 0 deletions calculation/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ type ValueUnion struct {
StringVal string
}

// NewValueFromProto creates a value from protobuf message.
func NewValueFromProto(pbValue servicepb.CalculationValue) (ValueUnion, error) {
var v ValueUnion
switch pbValue.Type {
case servicepb.CalculationValue_NUMBER:
v.Type = NumberType
v.NumberVal = pbValue.NumberVal
case servicepb.CalculationValue_STRING:
v.Type = StringType
v.StringVal = pbValue.StringVal
default:
return v, fmt.Errorf("invalid protobuf calculation value type %v", pbValue.Type)
}
return v, nil
}

// MarshalJSON marshals value as a JSON object.
func (u ValueUnion) MarshalJSON() ([]byte, error) {
switch u.Type {
Expand Down Expand Up @@ -170,6 +186,25 @@ func timeToValue(v *field.ValueUnion) ValueUnion {
return NewNumberUnion(float64(v.TimeNanosVal))
}

// Values is a list of calculation values.
type Values []ValueUnion

// NewValuesFromProto creates a list of calculation values from protobuf message.
func NewValuesFromProto(pbValues []servicepb.CalculationValue) (Values, error) {
if len(pbValues) == 0 {
return nil, nil
}
values := make(Values, 0, len(pbValues))
for _, pbValue := range pbValues {
value, err := NewValueFromProto(pbValue)
if err != nil {
return nil, err
}
values = append(values, value)
}
return values, nil
}

var (
toValueFnsByFieldType = map[field.ValueType]FieldValueToValueFn{
field.NullType: nullToValue,
Expand Down
39 changes: 39 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package client

import (
"context"

"github.com/xichen2020/eventdb/document"
"github.com/xichen2020/eventdb/query"
)

// Client is the database client.
type Client interface {
// Write writes a batch of documents.
Write(
ctx context.Context,
namespace []byte,
documents []document.Document,
) error

// QueryRaw performs a raw query.
QueryRaw(
ctx context.Context,
q query.UnparsedRawQuery,
) (*query.RawQueryResults, error)

// QueryGrouped performs a grouped query.
QueryGrouped(
ctx context.Context,
q query.UnparsedGroupedQuery,
) (*query.GroupedQueryResults, error)

// QueryTimeBucket performs a time bucket query.
QueryTimeBucket(
ctx context.Context,
q query.UnparsedGroupedQuery,
) (*query.TimeBucketQueryResults, error)

// Close closes the client.
Close() error
}
236 changes: 236 additions & 0 deletions client/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package grpc

import (
"context"
"strconv"
"time"

"github.com/xichen2020/eventdb/document"
"github.com/xichen2020/eventdb/generated/proto/servicepb"
"github.com/xichen2020/eventdb/query"

"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"
"github.com/uber-go/tally"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip" // Register gzip compressor
"google.golang.org/grpc/keepalive"
)

const (
batchSizeBucketVersion = 1
bucketSize = 500
numBuckets = 40
)

type clientMetrics struct {
write instrument.MethodMetrics
queryRaw instrument.MethodMetrics
queryGrouped instrument.MethodMetrics
queryTimeBucket instrument.MethodMetrics
batchSizeHist tally.Histogram
}

func newClientMetrics(
scope tally.Scope,
samplingRate float64,
) clientMetrics {
batchSizeBuckets := tally.MustMakeLinearValueBuckets(0, bucketSize, numBuckets)
return clientMetrics{
write: instrument.NewMethodMetrics(scope, "write", samplingRate),
queryRaw: instrument.NewMethodMetrics(scope, "queryRaw", samplingRate),
queryGrouped: instrument.NewMethodMetrics(scope, "queryGrouped", samplingRate),
queryTimeBucket: instrument.NewMethodMetrics(scope, "queryTimeBucket", samplingRate),
batchSizeHist: scope.Tagged(map[string]string{
"bucket-version": strconv.Itoa(batchSizeBucketVersion),
}).Histogram("batch-size", batchSizeBuckets),
}
}

// Client is a GRPC client.
type Client struct {
readTimeout time.Duration
writeTimeout time.Duration
conn *grpc.ClientConn
client servicepb.EventdbClient

metrics clientMetrics
nowFn clock.NowFn
}

// NewClient creates a new client.
func NewClient(
address string,
opts *Options,
) (*Client, error) {
if opts == nil {
opts = NewOptions()
}
dialOpts := []grpc.DialOption{
grpc.WithWriteBufferSize(opts.WriteBufferSize()),
grpc.WithMaxMsgSize(opts.MaxRecvMsgSize()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: opts.KeepAlivePeriod(),
}),
}
if opts.DialTimeout() != 0 {
dialOpts = append(dialOpts,
grpc.WithBlock(),
grpc.WithTimeout(opts.DialTimeout()),
)
}
if opts.UseInsecure() {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if opts.UseCompression() {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
))
}
conn, err := grpc.Dial(address, dialOpts...)
if err != nil {
return nil, err
}
client := servicepb.NewEventdbClient(conn)
instrumentOpts := opts.InstrumentOptions()
return &Client{
readTimeout: opts.ReadTimeout(),
writeTimeout: opts.WriteTimeout(),
conn: conn,
client: client,
metrics: newClientMetrics(instrumentOpts.MetricsScope(), instrumentOpts.MetricsSamplingRate()),
nowFn: opts.ClockOptions().NowFn(),
}, nil
}

// Write writes a batch of documents.
func (c *Client) Write(
ctx context.Context,
namespace []byte,
documents []document.Document,
) error {
callStart := c.nowFn()

pbDocs, err := document.Documents(documents).ToProto()
if err != nil {
c.metrics.write.ReportError(c.nowFn().Sub(callStart))
return err
}

ctx, cancelFn := context.WithTimeout(ctx, c.writeTimeout)
defer cancelFn()

req := &servicepb.WriteRequest{
Namespace: namespace,
Docs: pbDocs,
}
_, err = c.client.Write(ctx, req)
c.metrics.write.ReportSuccessOrError(err, c.nowFn().Sub(callStart))
return err
}

// QueryRaw performs a raw query.
func (c *Client) QueryRaw(
ctx context.Context,
q query.UnparsedRawQuery,
) (*query.RawQueryResults, error) {
callStart := c.nowFn()

pbRawQuery, err := q.ToProto()
if err != nil {
c.metrics.queryRaw.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

ctx, cancelFn := context.WithTimeout(ctx, c.readTimeout)
defer cancelFn()

pbRes, err := c.client.QueryRaw(ctx, pbRawQuery)
if err != nil {
c.metrics.queryRaw.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

res, err := query.NewRawQueryResultsFromProto(pbRes)
if err != nil {
c.metrics.queryRaw.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

c.metrics.queryRaw.ReportSuccess(c.nowFn().Sub(callStart))
return res, nil
}

// QueryGrouped performs a grouped query.
func (c *Client) QueryGrouped(
ctx context.Context,
q query.UnparsedGroupedQuery,
) (*query.GroupedQueryResults, error) {
callStart := c.nowFn()

pbGroupedQuery, err := q.ToProto()
if err != nil {
c.metrics.queryGrouped.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

ctx, cancelFn := context.WithTimeout(ctx, c.readTimeout)
defer cancelFn()

pbRes, err := c.client.QueryGrouped(ctx, pbGroupedQuery)
if err != nil {
c.metrics.queryGrouped.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

res, err := query.NewGroupedQueryResultsFromProto(pbRes)
if err != nil {
c.metrics.queryGrouped.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

c.metrics.queryGrouped.ReportSuccess(c.nowFn().Sub(callStart))
return res, nil
}

// QueryTimeBucket performs a time bucket query.
func (c *Client) QueryTimeBucket(
ctx context.Context,
q query.UnparsedTimeBucketQuery,
) (*query.TimeBucketQueryResults, error) {
callStart := c.nowFn()

pbTimeBucketQuery, err := q.ToProto()
if err != nil {
c.metrics.queryTimeBucket.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

ctx, cancelFn := context.WithTimeout(ctx, c.readTimeout)
defer cancelFn()

pbRes, err := c.client.QueryTimeBucket(ctx, pbTimeBucketQuery)
if err != nil {
c.metrics.queryTimeBucket.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

res, err := query.NewTimeBucketQueryResultsFromProto(pbRes)
if err != nil {
c.metrics.queryTimeBucket.ReportError(c.nowFn().Sub(callStart))
return nil, err
}

c.metrics.queryTimeBucket.ReportSuccess(c.nowFn().Sub(callStart))
return res, nil
}

// Close closes the client.
func (c *Client) Close() error {
var err error
if c.conn != nil {
err = c.conn.Close()
c.conn = nil
}
return err
}
Loading

0 comments on commit e5b40aa

Please sign in to comment.