Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert MetricsQueryService to interface #3089

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -116,7 +115,7 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsReader, err := createMetricsReader(metricsReaderFactory, v, logger)
metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}
Expand Down Expand Up @@ -171,7 +170,7 @@ by default uses only in-memory database.`,
// query
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsReader,
spanReader, dependencyReader, metricsQueryService,
metricsFactory,
)

Expand Down Expand Up @@ -244,13 +243,12 @@ func startQuery(
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
depReader dependencystore.Reader,
metricsReader metricsstore.Reader,
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
mqs := querysvc.NewMetricsQueryService(metricsReader)
server, err := queryApp.NewServer(svc.Logger, qs, mqs, qOpts, opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down Expand Up @@ -289,7 +287,7 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
return closer
}

func createMetricsReader(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (metricsstore.Reader, error) {
func createMetricsQueryService(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (querysvc.MetricsQueryService, error) {
if err := factory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}
Expand Down
39 changes: 3 additions & 36 deletions cmd/query/app/querysvc/metrics_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,9 @@

package querysvc

import (
"context"
"time"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)
import "github.com/jaegertracing/jaeger/storage/metricsstore"

// MetricsQueryService provides a means of querying R.E.D metrics from an underlying metrics store.
type MetricsQueryService struct {
metricsReader metricsstore.Reader
}

// NewMetricsQueryService returns a new MetricsQueryService.
func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
return &MetricsQueryService{
metricsReader: reader,
}
}

// GetLatencies is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
return mqs.metricsReader.GetLatencies(ctx, params)
}

// GetCallRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
return mqs.metricsReader.GetCallRates(ctx, params)
}

// GetErrorRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
return mqs.metricsReader.GetErrorRates(ctx, params)
}

// GetMinStepDuration is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
return mqs.metricsReader.GetMinStepDuration(ctx, params)
type MetricsQueryService interface {
metricsstore.Reader
}
96 changes: 0 additions & 96 deletions cmd/query/app/querysvc/metrics_query_service_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
Expand Down Expand Up @@ -94,7 +94,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand All @@ -118,7 +118,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc
return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
// TODO: Add HandlerOptions.MetricsQueryService
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
Expand Down
27 changes: 11 additions & 16 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -77,7 +77,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -90,7 +90,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -331,8 +331,7 @@ func TestServerHTTPTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -492,8 +491,7 @@ func TestServerGRPCTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -547,12 +545,12 @@ func TestServerGRPCTLS(t *testing.T) {

}
func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
opentracing.NoopTracer{})

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
opentracing.NoopTracer{})

Expand All @@ -578,7 +576,7 @@ func TestServerInUseHostPort(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
&querysvc.QueryService{},
querysvc.NewMetricsQueryService(nil),
nil,
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
Expand Down Expand Up @@ -611,8 +609,7 @@ func TestServerSinglePort(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -661,10 +658,9 @@ func TestServerGracefulExit(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}

server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
Expand All @@ -690,9 +686,8 @@ func TestServerHandlesPortZero(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
Expand Down
10 changes: 3 additions & 7 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,12 @@ func main() {
}
}

func createMetricsQueryService(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (*querysvc.MetricsQueryService, error) {
func createMetricsQueryService(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (querysvc.MetricsQueryService, error) {
if err := factory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics factory: %w", err)
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
factory.InitFromViper(v)
metricsReader, err := factory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
}
return querysvc.NewMetricsQueryService(metricsReader), nil
return factory.CreateMetricsReader()
}