From 0707fd94447cb887352324f2fac6de5707c3837f Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Wed, 26 Apr 2023 20:07:49 -0700 Subject: [PATCH] feat: implement gRPC metrics plugin --- cmd/query/main.go | 8 +-- pkg/grpc/config/config.go | 1 + pkg/grpc/grpc.go | 1 + pkg/grpc/shared/grpc_client.go | 9 +++ pkg/grpc/shared/grpc_handler.go | 111 ++++++++++++++++++++++++++++++++ pkg/grpc/shared/interface.go | 7 ++ pkg/grpc/shared/metrics.go | 106 ++++++++++++++++++++++++++++++ pkg/grpc/shared/plugin.go | 3 +- plugin/metrics/factory.go | 8 ++- plugin/metrics/grpc/factory.go | 99 ++++++++++++++++++++++++++++ plugin/metrics/grpc/options.go | 69 ++++++++++++++++++++ storage/factory.go | 3 + 12 files changed, 419 insertions(+), 6 deletions(-) create mode 100644 pkg/grpc/shared/metrics.go create mode 100644 plugin/metrics/grpc/factory.go create mode 100644 plugin/metrics/grpc/options.go diff --git a/cmd/query/main.go b/cmd/query/main.go index af6b6116378..a84a70554e5 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -55,8 +55,7 @@ func main() { log.Fatalf("Cannot initialize storage factory: %v", err) } - fc := metricsPlugin.FactoryConfigFromEnv() - metricsReaderFactory, err := metricsPlugin.NewFactory(fc) + metricsReaderFactory, err := metricsPlugin.NewFactory(metricsPlugin.FactoryConfigFromEnv()) if err != nil { log.Fatalf("Cannot initialize metrics factory: %v", err) } @@ -179,12 +178,13 @@ func createMetricsQueryService( logger *zap.Logger, metricsReaderMetricsFactory metrics.Factory, ) (querysvc.MetricsQueryService, error) { + // Ensure default parameter values are loaded correctly. + metricsReaderFactory.InitFromViper(v, logger) + if err := metricsReaderFactory.Initialize(logger); err != nil { return nil, fmt.Errorf("failed to init metrics reader factory: %w", err) } - // Ensure default parameter values are loaded correctly. - metricsReaderFactory.InitFromViper(v, logger) reader, err := metricsReaderFactory.CreateMetricsReader() if err != nil { return nil, fmt.Errorf("failed to create metrics reader: %w", err) diff --git a/pkg/grpc/config/config.go b/pkg/grpc/config/config.go index beeac6acd8f..854737fd19c 100644 --- a/pkg/grpc/config/config.go +++ b/pkg/grpc/config/config.go @@ -118,6 +118,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, Store: grpcClient, ArchiveStore: grpcClient, StreamingSpanWriter: grpcClient, + MetricsReader: grpcClient, }, Capabilities: grpcClient, }, nil diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 0ea75452413..32386894e0a 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -38,6 +38,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc Impl: services.Store, ArchiveImpl: services.ArchiveStore, StreamImpl: services.StreamingSpanWriter, + MetricsImpl: services.MetricsReader, }, }, }, diff --git a/pkg/grpc/shared/grpc_client.go b/pkg/grpc/shared/grpc_client.go index 4d230cec9af..44d45c5e1d9 100644 --- a/pkg/grpc/shared/grpc_client.go +++ b/pkg/grpc/shared/grpc_client.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/proto-gen/storage_v1" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -38,6 +39,7 @@ const BearerTokenKey = "bearer.token" var ( _ StoragePlugin = (*grpcClient)(nil) _ ArchiveStoragePlugin = (*grpcClient)(nil) + _ MetricsReaderPlugin = (*grpcClient)(nil) _ PluginCapabilities = (*grpcClient)(nil) // upgradeContext composites several steps of upgrading context @@ -53,6 +55,7 @@ type grpcClient struct { capabilitiesClient storage_v1.PluginCapabilitiesClient depsReaderClient storage_v1.DependenciesReaderPluginClient streamWriterClient storage_v1.StreamingSpanWriterPluginClient + metricsReaderClient storage_v1.MetricsReaderPluginClient } func NewGRPCClient(c *grpc.ClientConn) *grpcClient { @@ -64,6 +67,7 @@ func NewGRPCClient(c *grpc.ClientConn) *grpcClient { capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c), + metricsReaderClient: storage_v1.NewMetricsReaderPluginClient(c), } } @@ -124,6 +128,10 @@ func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer { return &archiveWriter{client: c.archiveWriterClient} } +func (c *grpcClient) MetricsReader() metricsstore.Reader { + return &metricsReader{client: c.metricsReaderClient} +} + // GetTrace takes a traceID and returns a Trace associated with that traceID func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ @@ -286,6 +294,7 @@ func (c *grpcClient) Capabilities() (*Capabilities, error) { ArchiveSpanReader: capabilities.ArchiveSpanReader, ArchiveSpanWriter: capabilities.ArchiveSpanWriter, StreamingSpanWriter: capabilities.StreamingSpanWriter, + MetricsReader: capabilities.MetricsReader, }, nil } diff --git a/pkg/grpc/shared/grpc_handler.go b/pkg/grpc/shared/grpc_handler.go index 32c1434d2ef..6891078d38d 100644 --- a/pkg/grpc/shared/grpc_handler.go +++ b/pkg/grpc/shared/grpc_handler.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "io" + "time" + "github.com/gogo/protobuf/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -26,6 +28,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/storage_v1" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -49,6 +52,8 @@ type GRPCHandlerStorageImpl struct { ArchiveSpanWriter func() spanstore.Writer StreamingSpanWriter func() spanstore.Writer + + MetricsReader func() metricsstore.Reader } // NewGRPCHandler creates a handler given individual storage implementations. @@ -61,6 +66,7 @@ func NewGRPCHandlerWithPlugins( mainImpl StoragePlugin, archiveImpl ArchiveStoragePlugin, streamImpl StreamingSpanWriterPlugin, + metricsImpl MetricsReaderPlugin, ) *GRPCHandler { impl := &GRPCHandlerStorageImpl{ SpanReader: mainImpl.SpanReader, @@ -78,6 +84,9 @@ func NewGRPCHandlerWithPlugins( if streamImpl != nil { impl.StreamingSpanWriter = streamImpl.StreamingSpanWriter } + if metricsImpl != nil { + impl.MetricsReader = metricsImpl.MetricsReader + } return NewGRPCHandler(impl) } @@ -90,6 +99,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server) error { storage_v1.RegisterPluginCapabilitiesServer(ss, s) storage_v1.RegisterDependenciesReaderPluginServer(ss, s) storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s) + storage_v1.RegisterMetricsReaderPluginServer(ss, s) return nil } @@ -266,6 +276,7 @@ func (s *GRPCHandler) Capabilities(ctx context.Context, request *storage_v1.Capa ArchiveSpanReader: s.impl.ArchiveSpanReader() != nil, ArchiveSpanWriter: s.impl.ArchiveSpanWriter() != nil, StreamingSpanWriter: s.impl.StreamingSpanWriter() != nil, + MetricsReader: s.impl.MetricsReader() != nil, }, nil } @@ -301,3 +312,103 @@ func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteS } return &storage_v1.WriteSpanResponse{}, nil } + +func (s *GRPCHandler) GetLatencies(ctx context.Context, request *storage_v1.GetLatenciesRequest) (*storage_v1.GetLatenciesResponse, error) { + reader := s.impl.MetricsReader() + if reader == nil { + return nil, status.Error(codes.Unimplemented, "not implemented") + } + params := &metricsstore.LatenciesQueryParameters{ + BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters), + Quantile: float64(request.Quantile), + } + + mf, err := reader.GetLatencies(ctx, params) + if err != nil { + return nil, err + } + return &storage_v1.GetLatenciesResponse{ + MetricFamily: mf, + }, nil +} + +func (s *GRPCHandler) GetCallRates(ctx context.Context, request *storage_v1.GetCallRatesRequest) (*storage_v1.GetCallRatesResponse, error) { + reader := s.impl.MetricsReader() + if reader == nil { + return nil, status.Error(codes.Unimplemented, "not implemented") + } + params := &metricsstore.CallRateQueryParameters{ + BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters), + } + + mf, err := reader.GetCallRates(ctx, params) + if err != nil { + return nil, err + } + return &storage_v1.GetCallRatesResponse{ + MetricFamily: mf, + }, nil +} + +func (s *GRPCHandler) GetErrorRates(ctx context.Context, request *storage_v1.GetErrorRatesRequest) (*storage_v1.GetErrorRatesResponse, error) { + reader := s.impl.MetricsReader() + if reader == nil { + return nil, status.Error(codes.Unimplemented, "not implemented") + } + params := &metricsstore.ErrorRateQueryParameters{ + BaseQueryParameters: baseQueryParametersFromProto(request.BaseQueryParameters), + } + + mf, err := reader.GetErrorRates(ctx, params) + if err != nil { + return nil, err + } + return &storage_v1.GetErrorRatesResponse{ + MetricFamily: mf, + }, nil +} + +func (s *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *storage_v1.GetMinStepDurationRequest) (*storage_v1.GetMinStepDurationResponse, error) { + reader := s.impl.MetricsReader() + if reader == nil { + return nil, status.Error(codes.Unimplemented, "not implemented") + } + + minStep, err := reader.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{}) + if err != nil { + return nil, err + } + return &storage_v1.GetMinStepDurationResponse{ + MinStep: &types.Duration{ + Seconds: int64(minStep / time.Second), + Nanos: int32(minStep % time.Second), + }, + }, nil +} + +func baseQueryParametersFromProto(proto *storage_v1.MetricsBaseQueryParameters) metricsstore.BaseQueryParameters { + pogo := metricsstore.BaseQueryParameters{ + ServiceNames: proto.ServiceNames, + GroupByOperation: proto.GroupByOperation, + SpanKinds: proto.SpanKinds, + } + + if proto.EndTime != nil { + endTime := time.Unix(proto.EndTime.Seconds, int64(proto.EndTime.Nanos)) + pogo.EndTime = &endTime + } + if proto.Lookback != nil { + lookback := time.Duration(proto.Lookback.Seconds)*time.Second + time.Duration(proto.Lookback.Nanos)*time.Nanosecond + pogo.Lookback = &lookback + } + if proto.Step != nil { + step := time.Duration(proto.Step.Seconds)*time.Second + time.Duration(proto.Step.Nanos)*time.Nanosecond + pogo.Step = &step + } + if proto.RatePer != nil { + ratePer := time.Duration(proto.RatePer.Seconds)*time.Second + time.Duration(proto.RatePer.Nanos)*time.Nanosecond + pogo.RatePer = &ratePer + } + + return pogo +} diff --git a/pkg/grpc/shared/interface.go b/pkg/grpc/shared/interface.go index c6345a29b82..e036de60de3 100644 --- a/pkg/grpc/shared/interface.go +++ b/pkg/grpc/shared/interface.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -53,6 +54,10 @@ type StreamingSpanWriterPlugin interface { StreamingSpanWriter() spanstore.Writer } +type MetricsReaderPlugin interface { + MetricsReader() metricsstore.Reader +} + // PluginCapabilities allow expose plugin its capabilities. type PluginCapabilities interface { Capabilities() (*Capabilities, error) @@ -63,6 +68,7 @@ type Capabilities struct { ArchiveSpanReader bool ArchiveSpanWriter bool StreamingSpanWriter bool + MetricsReader bool } // PluginServices defines services plugin can expose @@ -70,4 +76,5 @@ type PluginServices struct { Store StoragePlugin ArchiveStore ArchiveStoragePlugin StreamingSpanWriter StreamingSpanWriterPlugin + MetricsReader MetricsReaderPlugin } diff --git a/pkg/grpc/shared/metrics.go b/pkg/grpc/shared/metrics.go new file mode 100644 index 00000000000..a9e3e87c3b9 --- /dev/null +++ b/pkg/grpc/shared/metrics.go @@ -0,0 +1,106 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "time" + + "github.com/gogo/protobuf/types" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + "github.com/jaegertracing/jaeger/storage/metricsstore" +) + +var _ metricsstore.Reader = (*metricsReader)(nil) + +type metricsReader struct { + client storage_v1.MetricsReaderPluginClient +} + +func (m *metricsReader) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) { + request := &storage_v1.GetLatenciesRequest{ + BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters), + Quantile: float32(params.Quantile), + } + + response, err := m.client.GetLatencies(upgradeContext(ctx), request) + if err != nil { + return nil, err + } + return response.MetricFamily, nil +} + +func (m *metricsReader) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) { + request := &storage_v1.GetCallRatesRequest{ + BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters), + } + + response, err := m.client.GetCallRates(upgradeContext(ctx), request) + if err != nil { + return nil, err + } + return response.MetricFamily, nil +} + +func (m *metricsReader) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) { + request := &storage_v1.GetErrorRatesRequest{ + BaseQueryParameters: baseQueryParametersFromPogo(params.BaseQueryParameters), + } + + response, err := m.client.GetErrorRates(upgradeContext(ctx), request) + if err != nil { + return nil, err + } + return response.MetricFamily, nil +} + +func (m *metricsReader) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) { + response, err := m.client.GetMinStepDuration(upgradeContext(ctx), &storage_v1.GetMinStepDurationRequest{}) + if err != nil { + return 0, err + } + + minStep := time.Second + if response.MinStep != nil { + minStep = time.Duration(response.MinStep.Seconds)*time.Second + time.Duration(response.MinStep.Nanos) + } + + return minStep, nil +} + +func baseQueryParametersFromPogo(pogo metricsstore.BaseQueryParameters) *storage_v1.MetricsBaseQueryParameters { + proto := &storage_v1.MetricsBaseQueryParameters{ + ServiceNames: pogo.ServiceNames, + GroupByOperation: pogo.GroupByOperation, + SpanKinds: pogo.SpanKinds, + } + + if pogo.EndTime != nil { + proto.EndTime = &types.Timestamp{Seconds: pogo.EndTime.Unix(), Nanos: int32(pogo.EndTime.Nanosecond())} + } + if pogo.Lookback != nil { + proto.Lookback = &types.Duration{Seconds: int64(*pogo.Lookback / time.Second), Nanos: int32(*pogo.Lookback % time.Second)} + } + if pogo.Step != nil { + proto.Step = &types.Duration{Seconds: int64(*pogo.Step / time.Second), Nanos: int32(*pogo.Step % time.Second)} + } + if pogo.RatePer != nil { + proto.RatePer = &types.Duration{Seconds: int64(*pogo.RatePer / time.Second), Nanos: int32(*pogo.RatePer % time.Second)} + } + + return proto +} diff --git a/pkg/grpc/shared/plugin.go b/pkg/grpc/shared/plugin.go index b4a3fabf93e..323ac5b2d09 100644 --- a/pkg/grpc/shared/plugin.go +++ b/pkg/grpc/shared/plugin.go @@ -34,11 +34,12 @@ type StorageGRPCPlugin struct { Impl StoragePlugin ArchiveImpl ArchiveStoragePlugin StreamImpl StreamingSpanWriterPlugin + MetricsImpl MetricsReaderPlugin } // RegisterHandlers registers the plugin with the server func (p *StorageGRPCPlugin) RegisterHandlers(s *grpc.Server) error { - handler := NewGRPCHandlerWithPlugins(p.Impl, p.ArchiveImpl, p.StreamImpl) + handler := NewGRPCHandlerWithPlugins(p.Impl, p.ArchiveImpl, p.StreamImpl, p.MetricsImpl) return handler.Register(s) } diff --git a/plugin/metrics/factory.go b/plugin/metrics/factory.go index a9f8e23fc59..a31316f05b6 100644 --- a/plugin/metrics/factory.go +++ b/plugin/metrics/factory.go @@ -23,6 +23,7 @@ import ( "github.com/jaegertracing/jaeger/plugin" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" + "github.com/jaegertracing/jaeger/plugin/metrics/grpc" "github.com/jaegertracing/jaeger/plugin/metrics/prometheus" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/metricsstore" @@ -33,6 +34,7 @@ const ( disabledStorageType = "" prometheusStorageType = "prometheus" + grpcStorageType = "grpc-plugin" ) // AllStorageTypes defines all available storage backends. @@ -67,6 +69,8 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.MetricsFactory, switch factoryType { case prometheusStorageType: return prometheus.NewFactory(), nil + case grpcStorageType: + return grpc.NewFactory(), nil case disabledStorageType: return disabled.NewFactory(), nil } @@ -76,7 +80,9 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.MetricsFactory, // Initialize implements storage.MetricsFactory. func (f *Factory) Initialize(logger *zap.Logger) error { for _, factory := range f.factories { - factory.Initialize(logger) + if err := factory.Initialize(logger); err != nil { + return err + } } return nil } diff --git a/plugin/metrics/grpc/factory.go b/plugin/metrics/grpc/factory.go new file mode 100644 index 00000000000..25462b72679 --- /dev/null +++ b/plugin/metrics/grpc/factory.go @@ -0,0 +1,99 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "flag" + "fmt" + "io" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/grpc/config" + "github.com/jaegertracing/jaeger/pkg/grpc/shared" + "github.com/jaegertracing/jaeger/plugin" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/metricsstore" +) + +var ( + _ plugin.Configurable = (*Factory)(nil) + _ io.Closer = (*Factory)(nil) +) + +// Factory implements storage.MetricsFactory +type Factory struct { + options Options + logger *zap.Logger + + builder config.PluginBuilder + + metricsReader shared.MetricsReaderPlugin + capabilities shared.PluginCapabilities +} + +// NewFactory creates a new Factory +func NewFactory() *Factory { + return &Factory{} +} + +// AddFlags implements plugin.Configurable +func (f *Factory) AddFlags(flagSet *flag.FlagSet) { + f.options.AddFlags(flagSet) +} + +// InitFromViper implements plugin.Configurable +func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { + if err := f.options.InitFromViper(v); err != nil { + logger.Fatal("unable to initialize gRPC metrics factory", zap.Error(err)) + } + f.builder = &f.options.Configuration +} + +// Initialize implements storage.MetricsFactory +func (f *Factory) Initialize(logger *zap.Logger) error { + f.logger = logger + + services, err := f.builder.Build(logger) + if err != nil { + return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err) + } + + f.metricsReader = services.MetricsReader + f.capabilities = services.Capabilities + logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration)) + return nil +} + +// CreateMetricsReader implements storage.MetricsFactory +func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) { + if f.capabilities == nil { + return nil, storage.ErrMetricsStorageNotSupported + } + capabilities, err := f.capabilities.Capabilities() + if err != nil { + return nil, err + } + if capabilities == nil || !capabilities.MetricsReader { + return nil, storage.ErrMetricsStorageNotSupported + } + return f.metricsReader.MetricsReader(), nil +} + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + return f.builder.Close() +} diff --git a/plugin/metrics/grpc/options.go b/plugin/metrics/grpc/options.go new file mode 100644 index 00000000000..08bab1390a9 --- /dev/null +++ b/plugin/metrics/grpc/options.go @@ -0,0 +1,69 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "flag" + "fmt" + "time" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/grpc/config" + "github.com/jaegertracing/jaeger/pkg/tenancy" +) + +const ( + remotePrefix = "grpc-metrics" + remoteServer = remotePrefix + ".server" + remoteConnectionTimeout = remotePrefix + ".connection-timeout" + defaultConnectionTimeout = time.Duration(5 * time.Second) +) + +type Options struct { + Configuration config.Configuration `mapstructure:",squash"` +} + +func NewOptions() *Options { + return &Options{} +} + +func tlsFlagsConfig() tlscfg.ClientFlagsConfig { + return tlscfg.ClientFlagsConfig{ + Prefix: remotePrefix, + } +} + +// AddFlags adds flags for Options +func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + tlsFlagsConfig().AddFlags(flagSet) + + flagSet.String(remoteServer, "", "The remote metrics gRPC server address as host:port") + flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote metrics gRPC server connection timeout") +} + +// InitFromViper initializes Options with properties from viper +func (opt *Options) InitFromViper(v *viper.Viper) error { + opt.Configuration.RemoteServerAddr = v.GetString(remoteServer) + var err error + opt.Configuration.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v) + if err != nil { + return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err) + } + opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) + opt.Configuration.TenancyOpts = tenancy.InitFromViper(v) + return nil +} diff --git a/storage/factory.go b/storage/factory.go index f6d3394840b..11aa52977f2 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -75,6 +75,9 @@ type ArchiveFactory interface { CreateArchiveSpanWriter() (spanstore.Writer, error) } +// ErrMetricsStorageNotSupported can be returned by the MetricsFactory when the metrics storage is not supported by the backend. +var ErrMetricsStorageNotSupported = errors.New("metrics storage not supported") + // MetricsFactory defines an interface for a factory that can create implementations of different metrics storage components. // Implementations are also encouraged to implement plugin.Configurable interface. //