Skip to content

Commit

Permalink
feature: add streamingwriterspanplugin
Browse files Browse the repository at this point in the history
Signed-off-by: vuuihc <zshyou@gmail.com>
  • Loading branch information
vuuihc committed Apr 28, 2022
1 parent b891773 commit ab32fd2
Show file tree
Hide file tree
Showing 26 changed files with 1,071 additions and 124 deletions.
21 changes: 13 additions & 8 deletions examples/memstore-plugin/main.go
Expand Up @@ -16,7 +16,6 @@ package main

import (
"flag"
"path"
"strings"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
Expand All @@ -42,15 +41,17 @@ func main() {
flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file")
flag.Parse()

if configPath != "" {
viper.SetConfigFile(path.Base(configPath))
viper.AddConfigPath(path.Dir(configPath))
}

v := viper.New()
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_"))

if configPath != "" {
v.SetConfigFile(configPath)
if err := v.ReadInConfig(); err != nil {
panic(err)
}
}

opts := memory.Options{}
opts.InitFromViper(v)

Expand All @@ -71,10 +72,14 @@ func main() {
opentracing.SetGlobalTracer(tracer)

memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore())
grpc.ServeWithGRPCServer(&shared.PluginServices{
service := &shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
}, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
}
if opts.Configuration.EnableStreamingWriter {
service.StreamingSpanWriter = memStorePlugin
}
grpc.ServeWithGRPCServer(service, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{
googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
Expand Down
3 changes: 2 additions & 1 deletion pkg/memory/config/config.go
Expand Up @@ -16,5 +16,6 @@ package config

// Configuration describes the options to customize the storage behavior
type Configuration struct {
MaxTraces int `yaml:"max-traces" mapstructure:"max_traces"`
MaxTraces int `yaml:"max-traces" mapstructure:"max_traces"`
EnableStreamingWriter bool `yaml:"enable-streaming-writer" mapstructure:"enable_streaming_writer"`
}
2 changes: 2 additions & 0 deletions plugin/storage/grpc/README.md
Expand Up @@ -109,6 +109,8 @@ grpc.Serve(&shared.PluginServices{
})
```

To support writing span via client stream (which can enlarge the throughput), you must fill `StreamingSpanWriter` with the same plugin with `Store` field. Note that use streaming spanWriter may make the `save_by_svr` metric inaccurate, in which case users will need to pay attention to the metrics provided by plugin.

Running with a plugin
---------------------
A plugin can be run using the `all-in-one` application within the top level `cmd` package of the Jaeger project. To do this
Expand Down
10 changes: 8 additions & 2 deletions plugin/storage/grpc/config/config.go
Expand Up @@ -159,6 +159,11 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
Expand All @@ -171,8 +176,9 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
}, nil
Expand Down
13 changes: 10 additions & 3 deletions plugin/storage/grpc/factory.go
Expand Up @@ -38,9 +38,10 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities
}

var _ io.Closer = (*Factory)(nil)
Expand Down Expand Up @@ -81,6 +82,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -92,6 +94,11 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if f.capabilities != nil && f.streamingSpanWriter != nil {
if capabilities, err := f.capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter {
return f.streamingSpanWriter.StreamingSpanWriter(), nil
}
}
return f.store.SpanWriter(), nil
}

Expand Down
109 changes: 93 additions & 16 deletions plugin/storage/grpc/factory_test.go
Expand Up @@ -38,8 +38,9 @@ import (
var _ storage.Factory = new(Factory)

type mockPluginBuilder struct {
plugin *mockPlugin
err error
plugin *mockPlugin
writerType string
err error
}

func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) {
Expand All @@ -53,6 +54,9 @@ func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginS
ArchiveStore: b.plugin,
},
}
if b.writerType == "streaming" {
services.PluginServices.StreamingSpanWriter = b.plugin
}
if b.plugin.capabilities != nil {
services.Capabilities = b.plugin
}
Expand All @@ -65,12 +69,13 @@ func (b *mockPluginBuilder) Close() error {
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
streamingSpanWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
Expand All @@ -93,6 +98,10 @@ func (mp *mockPlugin) SpanWriter() spanstore.Writer {
return mp.spanWriter
}

func (mp *mockPlugin) StreamingSpanWriter() spanstore.Writer {
return mp.streamingSpanWriter
}

func (mp *mockPlugin) DependencyReader() dependencystore.Reader {
return mp.dependencyReader
}
Expand Down Expand Up @@ -143,16 +152,19 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
StreamingSpanWriter: true,
}, nil).Times(3)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
streamingSpanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

Expand All @@ -163,6 +175,9 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.streamingSpanWriter.StreamingSpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
Expand All @@ -173,15 +188,17 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -193,6 +210,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
Expand All @@ -210,6 +230,7 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -221,6 +242,9 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
Expand All @@ -232,6 +256,7 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -243,6 +268,9 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestWithConfiguration(t *testing.T) {
Expand Down Expand Up @@ -272,3 +300,52 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o, f.options)
assert.Equal(t, &o.Configuration, f.builder)
}

func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError).Once().
On("Capabilities").Return(&shared.Capabilities{}, nil).Once()

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
capabilities: capabilities,
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return error

writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return false
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/grpc.go
Expand Up @@ -37,6 +37,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc
shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/grpc/memory/plugin.go
Expand Up @@ -41,6 +41,10 @@ func (ns *storagePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) StreamingSpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/memory/plugin_test.go
Expand Up @@ -31,6 +31,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) {
assert.Equal(t, mainStorage, memStorePlugin.DependencyReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanWriter())
assert.Equal(t, mainStorage, memStorePlugin.StreamingSpanWriter())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter())

Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Expand Up @@ -146,6 +146,10 @@ service SpanWriterPlugin {
rpc Close(CloseWriterRequest) returns (CloseWriterResponse);
}

service StreamingSpanWriterPlugin {
rpc WriteSpanStream(stream WriteSpanRequest) returns (WriteSpanResponse);
}

service SpanReaderPlugin {
// spanstore/Reader
rpc GetTrace(GetTraceRequest) returns (stream SpansResponseChunk);
Expand Down Expand Up @@ -178,6 +182,7 @@ message CapabilitiesRequest {
message CapabilitiesResponse {
bool archiveSpanReader = 1;
bool archiveSpanWriter = 2;
bool streamingSpanWriter = 3;
}

service PluginCapabilities {
Expand Down

0 comments on commit ab32fd2

Please sign in to comment.