Skip to content

Commit

Permalink
feat: implement gRPC metrics plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobmarble committed May 2, 2023
1 parent 40053a3 commit 0707fd9
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 6 deletions.
8 changes: 4 additions & 4 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func main() {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
metricsReaderFactory, err := metricsPlugin.NewFactory(metricsPlugin.FactoryConfigFromEnv())
if err != nil {
log.Fatalf("Cannot initialize metrics factory: %v", err)
}
Expand Down Expand Up @@ -179,12 +178,13 @@ func createMetricsQueryService(
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {
// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)

if err := metricsReaderFactory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
MetricsReader: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
MetricsImpl: services.MetricsReader,
},
},
},
Expand Down
9 changes: 9 additions & 0 deletions pkg/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -38,6 +39,7 @@ const BearerTokenKey = "bearer.token"
var (
_ StoragePlugin = (*grpcClient)(nil)
_ ArchiveStoragePlugin = (*grpcClient)(nil)
_ MetricsReaderPlugin = (*grpcClient)(nil)
_ PluginCapabilities = (*grpcClient)(nil)

// upgradeContext composites several steps of upgrading context
Expand All @@ -53,6 +55,7 @@ type grpcClient struct {
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
metricsReaderClient storage_v1.MetricsReaderPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
Expand All @@ -64,6 +67,7 @@ func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
metricsReaderClient: storage_v1.NewMetricsReaderPluginClient(c),
}
}

Expand Down Expand Up @@ -124,6 +128,10 @@ func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer {
return &archiveWriter{client: c.archiveWriterClient}
}

func (c *grpcClient) MetricsReader() metricsstore.Reader {
return &metricsReader{client: c.metricsReaderClient}
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{
Expand Down Expand Up @@ -286,6 +294,7 @@ func (c *grpcClient) Capabilities() (*Capabilities, error) {
ArchiveSpanReader: capabilities.ArchiveSpanReader,
ArchiveSpanWriter: capabilities.ArchiveSpanWriter,
StreamingSpanWriter: capabilities.StreamingSpanWriter,
MetricsReader: capabilities.MetricsReader,
}, nil
}

Expand Down
111 changes: 111 additions & 0 deletions pkg/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/gogo/protobuf/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -49,6 +52,8 @@ type GRPCHandlerStorageImpl struct {
ArchiveSpanWriter func() spanstore.Writer

StreamingSpanWriter func() spanstore.Writer

MetricsReader func() metricsstore.Reader
}

// NewGRPCHandler creates a handler given individual storage implementations.
Expand All @@ -61,6 +66,7 @@ func NewGRPCHandlerWithPlugins(
mainImpl StoragePlugin,
archiveImpl ArchiveStoragePlugin,
streamImpl StreamingSpanWriterPlugin,
metricsImpl MetricsReaderPlugin,
) *GRPCHandler {
impl := &GRPCHandlerStorageImpl{
SpanReader: mainImpl.SpanReader,
Expand All @@ -78,6 +84,9 @@ func NewGRPCHandlerWithPlugins(
if streamImpl != nil {
impl.StreamingSpanWriter = streamImpl.StreamingSpanWriter
}
if metricsImpl != nil {
impl.MetricsReader = metricsImpl.MetricsReader
}
return NewGRPCHandler(impl)
}

Expand All @@ -90,6 +99,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server) error {
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)
storage_v1.RegisterMetricsReaderPluginServer(ss, s)
return nil
}

Expand Down Expand Up @@ -266,6 +276,7 @@ func (s *GRPCHandler) Capabilities(ctx context.Context, request *storage_v1.Capa
ArchiveSpanReader: s.impl.ArchiveSpanReader() != nil,
ArchiveSpanWriter: s.impl.ArchiveSpanWriter() != nil,
StreamingSpanWriter: s.impl.StreamingSpanWriter() != nil,
MetricsReader: s.impl.MetricsReader() != nil,
}, nil
}

Expand Down Expand Up @@ -301,3 +312,103 @@ func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteS
}
return &storage_v1.WriteSpanResponse{}, nil
}

func (s *GRPCHandler) GetLatencies(ctx context.Context, request *storage_v1.GetLatenciesRequest) (*storage_v1.GetLatenciesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.LatenciesQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
Quantile: float64(request.Quantile),
}

mf, err := reader.GetLatencies(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetLatenciesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetCallRates(ctx context.Context, request *storage_v1.GetCallRatesRequest) (*storage_v1.GetCallRatesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.CallRateQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
}

mf, err := reader.GetCallRates(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetCallRatesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetErrorRates(ctx context.Context, request *storage_v1.GetErrorRatesRequest) (*storage_v1.GetErrorRatesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
}

mf, err := reader.GetErrorRates(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetErrorRatesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *storage_v1.GetMinStepDurationRequest) (*storage_v1.GetMinStepDurationResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}

minStep, err := reader.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{})
if err != nil {
return nil, err
}
return &storage_v1.GetMinStepDurationResponse{
MinStep: &types.Duration{
Seconds: int64(minStep / time.Second),
Nanos: int32(minStep % time.Second),
},
}, nil
}

func baseQueryParametersFromProto(proto *storage_v1.MetricsBaseQueryParameters) metricsstore.BaseQueryParameters {
pogo := metricsstore.BaseQueryParameters{
ServiceNames: proto.ServiceNames,
GroupByOperation: proto.GroupByOperation,
SpanKinds: proto.SpanKinds,
}

if proto.EndTime != nil {
endTime := time.Unix(proto.EndTime.Seconds, int64(proto.EndTime.Nanos))
pogo.EndTime = &endTime
}
if proto.Lookback != nil {
lookback := time.Duration(proto.Lookback.Seconds)*time.Second + time.Duration(proto.Lookback.Nanos)*time.Nanosecond
pogo.Lookback = &lookback
}
if proto.Step != nil {
step := time.Duration(proto.Step.Seconds)*time.Second + time.Duration(proto.Step.Nanos)*time.Nanosecond
pogo.Step = &step
}
if proto.RatePer != nil {
ratePer := time.Duration(proto.RatePer.Seconds)*time.Second + time.Duration(proto.RatePer.Nanos)*time.Nanosecond
pogo.RatePer = &ratePer
}

return pogo
}
7 changes: 7 additions & 0 deletions pkg/grpc/shared/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/go-plugin"

"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -53,6 +54,10 @@ type StreamingSpanWriterPlugin interface {
StreamingSpanWriter() spanstore.Writer
}

type MetricsReaderPlugin interface {
MetricsReader() metricsstore.Reader
}

// PluginCapabilities allow expose plugin its capabilities.
type PluginCapabilities interface {
Capabilities() (*Capabilities, error)
Expand All @@ -63,11 +68,13 @@ type Capabilities struct {
ArchiveSpanReader bool
ArchiveSpanWriter bool
StreamingSpanWriter bool
MetricsReader bool
}

// PluginServices defines services plugin can expose
type PluginServices struct {
Store StoragePlugin
ArchiveStore ArchiveStoragePlugin
StreamingSpanWriter StreamingSpanWriterPlugin
MetricsReader MetricsReaderPlugin
}
106 changes: 106 additions & 0 deletions pkg/grpc/shared/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package shared

import (
"context"
"time"

"github.com/gogo/protobuf/types"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

var _ metricsstore.Reader = (*metricsReader)(nil)

type metricsReader struct {
client storage_v1.MetricsReaderPluginClient
}

func (m *metricsReader) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
request := &storage_v1.GetLatenciesRequest{
BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters),
Quantile: float32(params.Quantile),
}

response, err := m.client.GetLatencies(upgradeContext(ctx), request)
if err != nil {
return nil, err
}
return response.MetricFamily, nil
}

func (m *metricsReader) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
request := &storage_v1.GetCallRatesRequest{
BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters),
}

response, err := m.client.GetCallRates(upgradeContext(ctx), request)
if err != nil {
return nil, err
}
return response.MetricFamily, nil
}

func (m *metricsReader) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
request := &storage_v1.GetErrorRatesRequest{
BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters),
}

response, err := m.client.GetErrorRates(upgradeContext(ctx), request)
if err != nil {
return nil, err
}
return response.MetricFamily, nil
}

func (m *metricsReader) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
response, err := m.client.GetMinStepDuration(upgradeContext(ctx), &storage_v1.GetMinStepDurationRequest{})
if err != nil {
return 0, err
}

minStep := time.Second
if response.MinStep != nil {
minStep = time.Duration(response.MinStep.Seconds)*time.Second + time.Duration(response.MinStep.Nanos)
}

return minStep, nil
}

func baseQueryParametersFromPogo(pogo metricsstore.BaseQueryParameters) *storage_v1.MetricsBaseQueryParameters {
proto := &storage_v1.MetricsBaseQueryParameters{
ServiceNames: pogo.ServiceNames,
GroupByOperation: pogo.GroupByOperation,
SpanKinds: pogo.SpanKinds,
}

if pogo.EndTime != nil {
proto.EndTime = &types.Timestamp{Seconds: pogo.EndTime.Unix(), Nanos: int32(pogo.EndTime.Nanosecond())}
}
if pogo.Lookback != nil {
proto.Lookback = &types.Duration{Seconds: int64(*pogo.Lookback / time.Second), Nanos: int32(*pogo.Lookback % time.Second)}
}
if pogo.Step != nil {
proto.Step = &types.Duration{Seconds: int64(*pogo.Step / time.Second), Nanos: int32(*pogo.Step % time.Second)}
}
if pogo.RatePer != nil {
proto.RatePer = &types.Duration{Seconds: int64(*pogo.RatePer / time.Second), Nanos: int32(*pogo.RatePer % time.Second)}
}

return proto
}

0 comments on commit 0707fd9

Please sign in to comment.