Skip to content

Commit

Permalink
Add grcp handler
Browse files Browse the repository at this point in the history
Signed-off-by: albertteoh <albert.teoh@logz.io>
  • Loading branch information
albertteoh committed Jun 15, 2021
1 parent c9d6957 commit 2698575
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 36 deletions.
32 changes: 32 additions & 0 deletions cmd/query/app/default_params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Contains default parameter values used by handlers when optional request parameters are missing.

package app

import (
"time"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

var (
defaultDependencyLookbackDuration = time.Hour * 24
defaultTraceQueryLookbackDuration = time.Hour * 24 * 2
defaultMetricsQueryLookbackDuration = time.Hour
defaultMetricsQueryStepDuration = 5 * time.Second
defaultMetricsQueryRateDuration = 10 * time.Minute
defaultMetricsSpanKinds = []string{metrics.SpanKind_SPAN_KIND_SERVER.String()}
)
151 changes: 142 additions & 9 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"context"
"errors"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
Expand All @@ -24,8 +25,11 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" //force gogo codec registration
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -35,21 +39,30 @@ const (
msgTraceNotFound = "trace not found"
)

var (
errGRPCMetricsQueryDisabled = status.Error(codes.Unimplemented, "metrics querying is currently disabled")
errMissingServiceNames = status.Error(codes.InvalidArgument, "please provide at least one service name")
errMissingQuantile = status.Error(codes.InvalidArgument, "please provide a quantile between (0, 1]")
)

// GRPCHandler implements the gRPC endpoint of the query service.
type GRPCHandler struct {
queryService *querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
logger *zap.Logger
tracer opentracing.Tracer
clock clock
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
func NewGRPCHandler(queryService *querysvc.QueryService, metricsQueryService querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer, clock clock) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
tracer: tracer,
queryService: queryService,
metricsQueryService: metricsQueryService,
logger: logger,
tracer: tracer,
clock: clock,
}

return gH
}

Expand Down Expand Up @@ -177,3 +190,123 @@ func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependen

return &api_v2.GetDependenciesResponse{Dependencies: dependencies}, nil
}

// GetLatencies is the gRPC handler to fetch latency metrics.
func (g *GRPCHandler) GetLatencies(ctx context.Context, r *metrics.GetLatenciesRequest) (*metrics.GetMetricsResponse, error) {
// Check for cases where clients do not provide the Quantile, which defaults to the float64's zero value.
if r.Quantile == 0 {
return nil, errMissingQuantile
}
bqp, err := g.newBaseQueryParameters(r.BaseRequest)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
queryParams := metricsstore.LatenciesQueryParameters{
BaseQueryParameters: bqp,
Quantile: r.Quantile,
}
m, err := g.metricsQueryService.GetLatencies(ctx, &queryParams)
if err := g.handleErr("failed to fetch latencies", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetCallRates is the gRPC handler to fetch call rate metrics.
func (g *GRPCHandler) GetCallRates(ctx context.Context, r *metrics.GetCallRatesRequest) (*metrics.GetMetricsResponse, error) {
bqp, err := g.newBaseQueryParameters(r.BaseRequest)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
queryParams := metricsstore.CallRateQueryParameters{
BaseQueryParameters: bqp,
}
m, err := g.metricsQueryService.GetCallRates(ctx, &queryParams)
if err := g.handleErr("failed to fetch call rates", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetErrorRates is the gRPC handler to fetch error rate metrics.
func (g *GRPCHandler) GetErrorRates(ctx context.Context, r *metrics.GetErrorRatesRequest) (*metrics.GetMetricsResponse, error) {
bqp, err := g.newBaseQueryParameters(r.BaseRequest)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
queryParams := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: bqp,
}
m, err := g.metricsQueryService.GetErrorRates(ctx, &queryParams)
if err := g.handleErr("failed to fetch error rates", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetMinStepDuration is the gRPC handler to fetch the minimum step duration supported by the underlying metrics store.
func (g *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *metrics.GetMinStepDurationRequest) (*metrics.GetMinStepDurationResponse, error) {
minStep, err := g.metricsQueryService.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{})
if err := g.handleErr("failed to fetch min step duration", err); err != nil {
return nil, err
}
return &metrics.GetMinStepDurationResponse{MinStep: minStep}, nil
}

func (g *GRPCHandler) handleErr(msg string, err error) error {
if err == nil {
return nil
}
g.logger.Error(msg, zap.Error(err))

// Avoid wrapping "expected" errors with an "Internal Server" error.
switch {
case errors.Is(err, disabled.ErrDisabled):
return errGRPCMetricsQueryDisabled
case errors.Is(err, errMissingServiceNames), errors.Is(err, errMissingQuantile):
return err
}

// Received an "unexpected" error.
return status.Errorf(codes.Internal, "%s: %v", msg, err)
}

func (g *GRPCHandler) newBaseQueryParameters(r *metrics.MetricsQueryBaseRequest) (bqp metricsstore.BaseQueryParameters, err error) {
if r == nil || len(r.ServiceNames) == 0 {
return bqp, errMissingServiceNames
}

// Copy non-nullable params.
bqp.GroupByOperation = r.GroupByOperation
bqp.ServiceNames = r.ServiceNames

// Initialize nullable params with defaults.
defaultEndTime := g.clock.Now()
bqp.EndTime = &defaultEndTime
bqp.Lookback = &defaultMetricsQueryLookbackDuration
bqp.RatePer = &defaultMetricsQueryRateDuration
bqp.SpanKinds = defaultMetricsSpanKinds
bqp.Step = &defaultMetricsQueryStepDuration

// ... and override defaults with any provided request params.
if r.EndTime != nil {
bqp.EndTime = r.EndTime
}
if r.Lookback != nil {
bqp.Lookback = r.Lookback
}
if r.Step != nil {
bqp.Step = r.Step
}
if r.RatePer != nil {
bqp.RatePer = r.RatePer
}
if len(r.SpanKinds) > 0 {
spanKinds := make([]string, len(r.SpanKinds))
for i, v := range r.SpanKinds {
spanKinds[i] = v.String()
}
bqp.SpanKinds = spanKinds
}
return bqp, nil
}
Loading

0 comments on commit 2698575

Please sign in to comment.