Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new grpc storage configuration to align with OTEL #5331

Merged
merged 30 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4da9754
add more grpc config options
akagami-harsh Apr 5, 2024
7371b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 5, 2024
5c07802
Merge branch 'main' into grpc-config
akagami-harsh Apr 6, 2024
3986d8a
reuse otel's client config
akagami-harsh Apr 7, 2024
6cfb2dd
fix
akagami-harsh Apr 7, 2024
45467be
fix
akagami-harsh Apr 7, 2024
940f02c
log not supported when using auth
akagami-harsh Apr 8, 2024
9edf36a
fix
akagami-harsh Apr 8, 2024
3695b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 8, 2024
b4ba240
fix
akagami-harsh Apr 8, 2024
e4dbf8e
Merge branch 'main' into grpc-config
akagami-harsh Apr 15, 2024
03b0f4e
Merge branch 'main' into grpc-config
akagami-harsh May 9, 2024
947501e
add config V2
akagami-harsh May 10, 2024
5eb9218
Merge branch 'main' into grpc-config
akagami-harsh May 12, 2024
e476ece
fix
akagami-harsh May 12, 2024
54736cc
fix
akagami-harsh May 13, 2024
e14a3f2
fix
akagami-harsh May 13, 2024
15be346
fix
akagami-harsh May 16, 2024
71baf51
fix
akagami-harsh May 16, 2024
2139a46
Merge branch 'main' into grpc-config
akagami-harsh May 16, 2024
c4a34b0
add ToOtelClientConfig()
akagami-harsh May 19, 2024
ba44b7f
fix
akagami-harsh May 19, 2024
a2143b3
add remote conn in *ClientPluginServices
akagami-harsh May 19, 2024
148000c
refactor
yurishkuro May 20, 2024
14f9c94
finish-tests
yurishkuro May 20, 2024
ff9ea18
cleanup
yurishkuro May 20, 2024
dd8701e
simplify
yurishkuro May 20, 2024
cd77995
add tests
akagami-harsh May 20, 2024
7c6e513
fix
akagami-harsh May 20, 2024
f3c2ccf
Merge branch 'main' into grpc-config
akagami-harsh May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/jaeger/config-remote-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ extensions:
jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
endpoint: localhost:17271
timeout: 5s
tls:
insecure: true

receivers:
otlp:
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
grpcStarter := &starter[grpcCfg.ConfigV2, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/tlscfg/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"time"

"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -135,6 +136,22 @@ func (p Options) loadCertPool() (*x509.CertPool, error) {
return certPool, nil
}

func (o *Options) ToOtelClientConfig(v2 *configtls.ClientConfig) configtls.ClientConfig {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
v2.Insecure = !o.Enabled

v2.ServerName = o.ServerName
v2.InsecureSkipVerify = o.SkipHostVerify
v2.CAFile = o.CAPath
v2.CertFile = o.CertPath
v2.KeyFile = o.KeyPath
v2.CipherSuites = o.CipherSuites
v2.MinVersion = o.MinVersion
v2.MaxVersion = o.MaxVersion
v2.ReloadInterval = o.ReloadInterval

return *v2
}

func addCertToPool(caPath string, certPool *x509.CertPool) error {
caPEM, err := os.ReadFile(filepath.Clean(caPath))
if err != nil {
Expand Down
68 changes: 42 additions & 26 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package config

import (
"errors"
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -37,72 +39,86 @@ type Configuration struct {
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
}

type ConfigV2 struct {
TenancyOpts tenancy.Options
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
configgrpc.ClientConfig `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
}

func (c *Configuration) translateToConfigV2() *ConfigV2 {
v2 := &ConfigV2{}
v2.Endpoint = c.RemoteServerAddr
v2.Timeout = c.RemoteConnectTimeout
v2.TLSSetting = c.RemoteTLS.ToOtelClientConfig(&v2.TLSSetting)

remoteConn *grpc.ClientConn
return v2
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
remoteConn *grpc.ClientConn
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
// TODO this interface should be removed and the building capability moved to Factory.
type PluginBuilder interface {
Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error)
Close() error
}

// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
return c.buildRemote(logger, tracerProvider, grpc.NewClient)
v2Cfg := c.translateToConfigV2()
return v2Cfg.Build(logger, tracerProvider)
}

type newClientFn func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
s, err := newRemoteStorage(c, tracerProvider)
if err != nil {
return nil, err
}
return s, nil
}

func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider, newClient newClientFn) (*ClientPluginServices, error) {
func newRemoteStorage(c *ConfigV2, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
}
if c.RemoteTLS.Enabled {
tlsCfg, err := c.RemoteTLS.Config(logger)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
var err error
c.remoteConn, err = newClient(c.RemoteServerAddr, opts...)

remoteConn, err := c.ToClientConn(context.Background(), componenttest.NewNopHost(), component.TelemetrySettings{}, opts...)
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}

grpcClient := shared.NewGRPCClient(c.remoteConn)
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
remoteConn: remoteConn,
}, nil
}

func (c *Configuration) Close() error {
var errs []error
func (c *ClientPluginServices) Close() error {
if c.remoteConn != nil {
errs = append(errs, c.remoteConn.Close())
err := c.remoteConn.Close()
if err != nil {
return err
}
}
errs = append(errs, c.RemoteTLS.Close())
return errors.Join(errs...)
return nil
}
10 changes: 3 additions & 7 deletions plugin/storage/grpc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@
package config

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"go.opentelemetry.io/otel/trace/noop"
)

func TestBuildRemoteNewClientError(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
c := &Configuration{}
_, err := c.buildRemote(zap.NewNop(), nil, func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
})
c := &ConfigV2{}
_, err := newRemoteStorage(c, noop.NewTracerProvider())
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
}
28 changes: 19 additions & 9 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Factory struct {
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities
services *config.ClientPluginServices
}

// NewFactory creates a new Factory.
Expand All @@ -62,12 +64,12 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
cfg config.ConfigV2,
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.configureFromOptions(Options{Configuration: cfg})
f.configureFromOptions(Options{ConfigV2: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
Expand All @@ -92,23 +94,25 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
func (f *Factory) configureFromOptions(opts Options) {
f.options = opts
f.builder = &f.options.Configuration
f.builder = &f.options.ConfigV2
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

services, err := f.builder.Build(logger, f.tracerProvider)
var err error
f.services, err = f.builder.Build(logger, f.tracerProvider)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}

f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
logger.Info("Remote storage configuration", zap.Any("configuration", f.options.Configuration))
f.store = f.services.Store
f.archiveStore = f.services.ArchiveStore
f.capabilities = f.services.Capabilities
f.streamingSpanWriter = f.services.StreamingSpanWriter
logger.Info("Remote storage configuration", zap.Any("configuration", f.options.ConfigV2))
return nil
}

Expand Down Expand Up @@ -165,5 +169,11 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
// Close closes the resources held by the factory
func (f *Factory) Close() error {
// TODO Close should move into Services type, instead of being in the Config.
return f.builder.Close()
var errs []error
err := f.services.Close()
if err != nil {
errs = append(errs, err)
}
errs = append(errs, f.options.Configuration.RemoteTLS.Close())
return errors.Join(errs...)
}
15 changes: 12 additions & 3 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -148,6 +150,9 @@ func TestGRPCStorageFactory(t *testing.T) {
}

func TestGRPCStorageFactoryWithConfig(t *testing.T) {
cfg := grpcConfig.ConfigV2{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage")
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err, "failed to listen")

Expand All @@ -159,9 +164,13 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) {
}()
defer s.Stop()

cfg := grpcConfig.Configuration{
RemoteServerAddr: lis.Addr().String(),
RemoteConnectTimeout: 1 * time.Second,
cfg = grpcConfig.ConfigV2{
ClientConfig: configgrpc.ClientConfig{
Endpoint: lis.Addr().String(),
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1 * time.Second,
},
}
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
// to bind them to command line flags
type Options struct {
Configuration config.Configuration `mapstructure:",squash"`
ConfigV2 config.ConfigV2 `mapstructure:",squash"`
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
Expand Down
Loading