Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement gRPC storage plugin for metrics #4414

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ proto: init-submodules proto-prepare-otel
$(PROTOC) \
$(PROTO_INCLUDES) \
-Iplugin/storage/grpc/proto \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/storage_v1 \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS),Mopenmetrics.proto=github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics:$(PWD)/proto-gen/storage_v1 \
plugin/storage/grpc/proto/storage.proto

$(PROTOC) \
Expand Down
8 changes: 4 additions & 4 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func main() {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
metricsReaderFactory, err := metricsPlugin.NewFactory(metricsPlugin.FactoryConfigFromEnv())
if err != nil {
log.Fatalf("Cannot initialize metrics factory: %v", err)
}
Expand Down Expand Up @@ -179,12 +178,13 @@ func createMetricsQueryService(
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {
// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)

if err := metricsReaderFactory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
Expand Down
8 changes: 7 additions & 1 deletion plugin/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/plugin/metrics/grpc"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/metricsstore"
Expand All @@ -33,6 +34,7 @@
disabledStorageType = ""

prometheusStorageType = "prometheus"
grpcStorageType = "grpc-plugin"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have --metrics-storage-type=grpc-plugin but then factory flags under --grpc-metrics.*? I can't think if a better name, but at least we should keep these in sync, so let's go with grpc-metrics

)

// AllStorageTypes defines all available storage backends.
Expand Down Expand Up @@ -67,6 +69,8 @@
switch factoryType {
case prometheusStorageType:
return prometheus.NewFactory(), nil
case grpcStorageType:
return grpc.NewFactory(), nil

Check warning on line 73 in plugin/metrics/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/factory.go#L72-L73

Added lines #L72 - L73 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a test?

case disabledStorageType:
return disabled.NewFactory(), nil
}
Expand All @@ -76,7 +80,9 @@
// Initialize implements storage.MetricsFactory.
func (f *Factory) Initialize(logger *zap.Logger) error {
for _, factory := range f.factories {
factory.Initialize(logger)
if err := factory.Initialize(logger); err != nil {
return err
}

Check warning on line 85 in plugin/metrics/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/factory.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}
return nil
}
Expand Down
99 changes: 99 additions & 0 deletions plugin/metrics/grpc/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"flag"
"fmt"
"io"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

var (
_ plugin.Configurable = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
)

// Factory implements storage.MetricsFactory
type Factory struct {
options Options
logger *zap.Logger

builder config.PluginBuilder

metricsReader shared.MetricsReaderPlugin
capabilities shared.PluginCapabilities
}

// NewFactory creates a new Factory
func NewFactory() *Factory {
return &Factory{}

Check warning on line 50 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)

Check warning on line 55 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
if err := f.options.InitFromViper(v); err != nil {
logger.Fatal("unable to initialize gRPC metrics factory", zap.Error(err))
}
f.builder = &f.options.Configuration

Check warning on line 63 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L59-L63

Added lines #L59 - L63 were not covered by tests
}

// Initialize implements storage.MetricsFactory
func (f *Factory) Initialize(logger *zap.Logger) error {
f.logger = logger

services, err := f.builder.Build(logger)
if err != nil {
return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err)
}

Check warning on line 73 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L67-L73

Added lines #L67 - L73 were not covered by tests

f.metricsReader = services.MetricsReader
f.capabilities = services.Capabilities
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil

Check warning on line 78 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L75-L78

Added lines #L75 - L78 were not covered by tests
}

// CreateMetricsReader implements storage.MetricsFactory
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
if f.capabilities == nil {
return nil, storage.ErrMetricsStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.MetricsReader {
return nil, storage.ErrMetricsStorageNotSupported
}
return f.metricsReader.MetricsReader(), nil

Check warning on line 93 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L82-L93

Added lines #L82 - L93 were not covered by tests
}

// Close closes the resources held by the factory
func (f *Factory) Close() error {
return f.builder.Close()

Check warning on line 98 in plugin/metrics/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/factory.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}
17 changes: 17 additions & 0 deletions plugin/metrics/grpc/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

// TODO write tests
69 changes: 69 additions & 0 deletions plugin/metrics/grpc/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"flag"
"fmt"
"time"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

const (
remotePrefix = "grpc-metrics"
remoteServer = remotePrefix + ".server"
remoteConnectionTimeout = remotePrefix + ".connection-timeout"
defaultConnectionTimeout = time.Duration(5 * time.Second)
)

type Options struct {
Configuration config.Configuration `mapstructure:",squash"`
}

func NewOptions() *Options {
return &Options{}

Check warning on line 41 in plugin/metrics/grpc/options.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/options.go#L40-L41

Added lines #L40 - L41 were not covered by tests
}

func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
return tlscfg.ClientFlagsConfig{
Prefix: remotePrefix,
}

Check warning on line 47 in plugin/metrics/grpc/options.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/options.go#L44-L47

Added lines #L44 - L47 were not covered by tests
}

// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
tlsFlagsConfig().AddFlags(flagSet)

flagSet.String(remoteServer, "", "The remote metrics gRPC server address as host:port")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote metrics gRPC server connection timeout")

Check warning on line 55 in plugin/metrics/grpc/options.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/options.go#L51-L55

Added lines #L51 - L55 were not covered by tests
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) error {
opt.Configuration.RemoteServerAddr = v.GetString(remoteServer)
var err error
opt.Configuration.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err)
}
opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
opt.Configuration.TenancyOpts = tenancy.InitFromViper(v)
return nil

Check warning on line 68 in plugin/metrics/grpc/options.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/grpc/options.go#L59-L68

Added lines #L59 - L68 were not covered by tests
}
17 changes: 17 additions & 0 deletions plugin/metrics/grpc/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

// TODO write tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests?

1 change: 1 addition & 0 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
MetricsReader: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
MetricsImpl: services.MetricsReader,

Check warning on line 41 in plugin/storage/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/grpc.go#L41

Added line #L41 was not covered by tests
},
},
},
Expand Down
52 changes: 52 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

import "model.proto";
import "openmetrics.proto";

// Enable gogoprotobuf extensions (https://github.com/gogo/protobuf/blob/master/extensions.md).
// Enable custom Marshal method.
Expand Down Expand Up @@ -174,6 +175,56 @@ service DependenciesReaderPlugin {
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
}

service MetricsReaderPlugin {
// metricsstore/Reader
rpc GetLatencies(GetLatenciesRequest) returns (GetLatenciesResponse);
rpc GetCallRates(GetCallRatesRequest) returns (GetCallRatesResponse);
rpc GetErrorRates(GetErrorRatesRequest) returns (GetErrorRatesResponse);
rpc GetMinStepDuration(GetMinStepDurationRequest) returns (GetMinStepDurationResponse);
}

message MetricsBaseQueryParameters {
repeated string service_names = 1;
bool group_by_operation = 2;
google.protobuf.Timestamp end_time = 3;
google.protobuf.Duration lookback = 4;
google.protobuf.Duration step = 5;
google.protobuf.Duration rate_per = 6;
repeated string span_kinds = 7;
}

message GetLatenciesRequest {
MetricsBaseQueryParameters base_query_parameters = 1;
float quantile = 2;
}

message GetLatenciesResponse {
jaeger.api_v2.metrics.MetricFamily metric_family = 1;
}

message GetCallRatesRequest {
MetricsBaseQueryParameters base_query_parameters = 1;
}

message GetCallRatesResponse {
jaeger.api_v2.metrics.MetricFamily metric_family = 1;
}

message GetErrorRatesRequest {
MetricsBaseQueryParameters base_query_parameters = 1;
}

message GetErrorRatesResponse {
jaeger.api_v2.metrics.MetricFamily metric_family = 1;
}

message GetMinStepDurationRequest {
}

message GetMinStepDurationResponse {
google.protobuf.Duration min_step = 1;
}

// empty; extensible in the future
message CapabilitiesRequest {

Expand All @@ -183,6 +234,7 @@ message CapabilitiesResponse {
bool archiveSpanReader = 1;
bool archiveSpanWriter = 2;
bool streamingSpanWriter = 3;
bool metricsReader = 4;
}

service PluginCapabilities {
Expand Down