Skip to content

Commit

Permalink
feat: Implement gRPC metrics plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Marble <jacobmarble@gmail.com>
  • Loading branch information
jacobmarble committed May 3, 2023
1 parent 21de208 commit 4872990
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 9 deletions.
8 changes: 4 additions & 4 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func main() {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
metricsReaderFactory, err := metricsPlugin.NewFactory(metricsPlugin.FactoryConfigFromEnv())
if err != nil {
log.Fatalf("Cannot initialize metrics factory: %v", err)
}
Expand Down Expand Up @@ -179,12 +178,13 @@ func createMetricsQueryService(
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {
// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)

if err := metricsReaderFactory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
MetricsReader: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
MetricsImpl: services.MetricsReader,
},
},
},
Expand Down
9 changes: 9 additions & 0 deletions pkg/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -38,6 +39,7 @@ const BearerTokenKey = "bearer.token"
var (
_ StoragePlugin = (*grpcClient)(nil)
_ ArchiveStoragePlugin = (*grpcClient)(nil)
_ MetricsReaderPlugin = (*grpcClient)(nil)
_ PluginCapabilities = (*grpcClient)(nil)

// upgradeContext composites several steps of upgrading context
Expand All @@ -53,6 +55,7 @@ type grpcClient struct {
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
metricsReaderClient storage_v1.MetricsReaderPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
Expand All @@ -64,6 +67,7 @@ func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
metricsReaderClient: storage_v1.NewMetricsReaderPluginClient(c),
}
}

Expand Down Expand Up @@ -124,6 +128,10 @@ func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer {
return &archiveWriter{client: c.archiveWriterClient}
}

func (c *grpcClient) MetricsReader() metricsstore.Reader {
return &metricsReader{client: c.metricsReaderClient}
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{
Expand Down Expand Up @@ -286,6 +294,7 @@ func (c *grpcClient) Capabilities() (*Capabilities, error) {
ArchiveSpanReader: capabilities.ArchiveSpanReader,
ArchiveSpanWriter: capabilities.ArchiveSpanWriter,
StreamingSpanWriter: capabilities.StreamingSpanWriter,
MetricsReader: capabilities.MetricsReader,
}, nil
}

Expand Down
113 changes: 113 additions & 0 deletions pkg/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/gogo/protobuf/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -49,6 +52,8 @@ type GRPCHandlerStorageImpl struct {
ArchiveSpanWriter func() spanstore.Writer

StreamingSpanWriter func() spanstore.Writer

MetricsReader func() metricsstore.Reader
}

// NewGRPCHandler creates a handler given individual storage implementations.
Expand All @@ -61,6 +66,7 @@ func NewGRPCHandlerWithPlugins(
mainImpl StoragePlugin,
archiveImpl ArchiveStoragePlugin,
streamImpl StreamingSpanWriterPlugin,
metricsImpl MetricsReaderPlugin,
) *GRPCHandler {
impl := &GRPCHandlerStorageImpl{
SpanReader: mainImpl.SpanReader,
Expand All @@ -70,6 +76,8 @@ func NewGRPCHandlerWithPlugins(
ArchiveSpanReader: func() spanstore.Reader { return nil },
ArchiveSpanWriter: func() spanstore.Writer { return nil },
StreamingSpanWriter: func() spanstore.Writer { return nil },

MetricsReader: func() metricsstore.Reader { return nil },
}
if archiveImpl != nil {
impl.ArchiveSpanReader = archiveImpl.ArchiveSpanReader
Expand All @@ -78,6 +86,9 @@ func NewGRPCHandlerWithPlugins(
if streamImpl != nil {
impl.StreamingSpanWriter = streamImpl.StreamingSpanWriter
}
if metricsImpl != nil {
impl.MetricsReader = metricsImpl.MetricsReader
}
return NewGRPCHandler(impl)
}

Expand All @@ -90,6 +101,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server) error {
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)
storage_v1.RegisterMetricsReaderPluginServer(ss, s)
return nil
}

Expand Down Expand Up @@ -266,6 +278,7 @@ func (s *GRPCHandler) Capabilities(ctx context.Context, request *storage_v1.Capa
ArchiveSpanReader: s.impl.ArchiveSpanReader() != nil,
ArchiveSpanWriter: s.impl.ArchiveSpanWriter() != nil,
StreamingSpanWriter: s.impl.StreamingSpanWriter() != nil,
MetricsReader: s.impl.MetricsReader() != nil,
}, nil
}

Expand Down Expand Up @@ -301,3 +314,103 @@ func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteS
}
return &storage_v1.WriteSpanResponse{}, nil
}

func (s *GRPCHandler) GetLatencies(ctx context.Context, request *storage_v1.GetLatenciesRequest) (*storage_v1.GetLatenciesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.LatenciesQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
Quantile: float64(request.Quantile),
}

mf, err := reader.GetLatencies(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetLatenciesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetCallRates(ctx context.Context, request *storage_v1.GetCallRatesRequest) (*storage_v1.GetCallRatesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.CallRateQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
}

mf, err := reader.GetCallRates(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetCallRatesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetErrorRates(ctx context.Context, request *storage_v1.GetErrorRatesRequest) (*storage_v1.GetErrorRatesResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
params := &metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters),
}

mf, err := reader.GetErrorRates(ctx, params)
if err != nil {
return nil, err
}
return &storage_v1.GetErrorRatesResponse{
MetricFamily: mf,
}, nil
}

func (s *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *storage_v1.GetMinStepDurationRequest) (*storage_v1.GetMinStepDurationResponse, error) {
reader := s.impl.MetricsReader()
if reader == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}

minStep, err := reader.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{})
if err != nil {
return nil, err
}
return &storage_v1.GetMinStepDurationResponse{
MinStep: &types.Duration{
Seconds: int64(minStep / time.Second),
Nanos: int32(minStep % time.Second),
},
}, nil
}

func baseQueryParametersFromProto(proto *storage_v1.MetricsBaseQueryParameters) metricsstore.BaseQueryParameters {
pogo := metricsstore.BaseQueryParameters{
ServiceNames: proto.ServiceNames,
GroupByOperation: proto.GroupByOperation,
SpanKinds: proto.SpanKinds,
}

if proto.EndTime != nil {
endTime := time.Unix(proto.EndTime.Seconds, int64(proto.EndTime.Nanos)).UTC()
pogo.EndTime = &endTime
}
if proto.Lookback != nil {
lookback := time.Duration(proto.Lookback.Seconds)*time.Second + time.Duration(proto.Lookback.Nanos)*time.Nanosecond
pogo.Lookback = &lookback
}
if proto.Step != nil {
step := time.Duration(proto.Step.Seconds)*time.Second + time.Duration(proto.Step.Nanos)*time.Nanosecond
pogo.Step = &step
}
if proto.RatePer != nil {
ratePer := time.Duration(proto.RatePer.Seconds)*time.Second + time.Duration(proto.RatePer.Nanos)*time.Nanosecond
pogo.RatePer = &ratePer
}

return pogo
}

0 comments on commit 4872990

Please sign in to comment.