diff --git a/examples/memstore-plugin/main.go b/examples/memstore-plugin/main.go index 155a52adc84..30ef6b6610f 100644 --- a/examples/memstore-plugin/main.go +++ b/examples/memstore-plugin/main.go @@ -16,7 +16,6 @@ package main import ( "flag" - "path" "strings" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" @@ -42,15 +41,17 @@ func main() { flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file") flag.Parse() - if configPath != "" { - viper.SetConfigFile(path.Base(configPath)) - viper.AddConfigPath(path.Dir(configPath)) - } - v := viper.New() v.AutomaticEnv() v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) + if configPath != "" { + v.SetConfigFile(configPath) + if err := v.ReadInConfig(); err != nil { + panic(err) + } + } + opts := memory.Options{} opts.InitFromViper(v) @@ -71,10 +72,14 @@ func main() { opentracing.SetGlobalTracer(tracer) memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore()) - grpc.ServeWithGRPCServer(&shared.PluginServices{ + service := &shared.PluginServices{ Store: memStorePlugin, ArchiveStore: memStorePlugin, - }, func(options []googleGRPC.ServerOption) *googleGRPC.Server { + } + if opts.Configuration.EnableStreamingWriter { + service.StreamingSpanWriter = memStorePlugin + } + grpc.ServeWithGRPCServer(service, func(options []googleGRPC.ServerOption) *googleGRPC.Server { return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{ googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)), googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)), diff --git a/pkg/memory/config/config.go b/pkg/memory/config/config.go index ff5104152a2..d6563f4d94c 100644 --- a/pkg/memory/config/config.go +++ b/pkg/memory/config/config.go @@ -16,5 +16,6 @@ package config // Configuration describes the options to customize the storage behavior type Configuration struct { - MaxTraces int `yaml:"max-traces" mapstructure:"max_traces"` + MaxTraces int `yaml:"max-traces" mapstructure:"max_traces"` + EnableStreamingWriter bool `yaml:"enable-streaming-writer" mapstructure:"enable_streaming_writer"` } diff --git a/plugin/storage/grpc/README.md b/plugin/storage/grpc/README.md index 0efca2d4dd0..92bd9455272 100644 --- a/plugin/storage/grpc/README.md +++ b/plugin/storage/grpc/README.md @@ -109,6 +109,8 @@ grpc.Serve(&shared.PluginServices{ }) ``` +To support writing span via client stream (which can enlarge the throughput), you must fill `StreamingSpanWriter` with the same plugin with `Store` field. Note that use streaming spanWriter may make the `save_by_svr` metric inaccurate, in which case users will need to pay attention to the metrics provided by plugin. + Running with a plugin --------------------- A plugin can be run using the `all-in-one` application within the top level `cmd` package of the Jaeger project. To do this diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 0ce184ce192..5bbc6f2876c 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -159,6 +159,11 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"", raw, shared.StoragePluginIdentifier) } + streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin) + if !ok { + return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"", + raw, shared.StoragePluginIdentifier) + } capabilities, ok := raw.(shared.PluginCapabilities) if !ok { return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"", @@ -171,8 +176,9 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, return &ClientPluginServices{ PluginServices: shared.PluginServices{ - Store: storagePlugin, - ArchiveStore: archiveStoragePlugin, + Store: storagePlugin, + ArchiveStore: archiveStoragePlugin, + StreamingSpanWriter: streamingSpanWriterPlugin, }, Capabilities: capabilities, }, nil diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 093584aecbe..e8867059c5b 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -38,9 +38,10 @@ type Factory struct { builder config.PluginBuilder - store shared.StoragePlugin - archiveStore shared.ArchiveStoragePlugin - capabilities shared.PluginCapabilities + store shared.StoragePlugin + archiveStore shared.ArchiveStoragePlugin + streamingSpanWriter shared.StreamingSpanWriterPlugin + capabilities shared.PluginCapabilities } var _ io.Closer = (*Factory)(nil) @@ -81,6 +82,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.store = services.Store f.archiveStore = services.ArchiveStore f.capabilities = services.Capabilities + f.streamingSpanWriter = services.StreamingSpanWriter logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration)) return nil } @@ -92,6 +94,11 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { + if f.capabilities != nil && f.streamingSpanWriter != nil { + if capabilities, err := f.capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter { + return f.streamingSpanWriter.StreamingSpanWriter(), nil + } + } return f.store.SpanWriter(), nil } diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index f2d6ea10c2c..2379f71b2a1 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -38,8 +38,9 @@ import ( var _ storage.Factory = new(Factory) type mockPluginBuilder struct { - plugin *mockPlugin - err error + plugin *mockPlugin + writerType string + err error } func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) { @@ -53,6 +54,9 @@ func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginS ArchiveStore: b.plugin, }, } + if b.writerType == "streaming" { + services.PluginServices.StreamingSpanWriter = b.plugin + } if b.plugin.capabilities != nil { services.Capabilities = b.plugin } @@ -65,12 +69,13 @@ func (b *mockPluginBuilder) Close() error { } type mockPlugin struct { - spanReader spanstore.Reader - spanWriter spanstore.Writer - archiveReader spanstore.Reader - archiveWriter spanstore.Writer - capabilities shared.PluginCapabilities - dependencyReader dependencystore.Reader + spanReader spanstore.Reader + spanWriter spanstore.Writer + archiveReader spanstore.Reader + archiveWriter spanstore.Writer + streamingSpanWriter spanstore.Writer + capabilities shared.PluginCapabilities + dependencyReader dependencystore.Reader } func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) { @@ -93,6 +98,10 @@ func (mp *mockPlugin) SpanWriter() spanstore.Writer { return mp.spanWriter } +func (mp *mockPlugin) StreamingSpanWriter() spanstore.Writer { + return mp.streamingSpanWriter +} + func (mp *mockPlugin) DependencyReader() dependencystore.Reader { return mp.dependencyReader } @@ -143,16 +152,19 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) { capabilities := new(mocks.PluginCapabilities) capabilities.On("Capabilities"). Return(&shared.Capabilities{ - ArchiveSpanReader: true, - ArchiveSpanWriter: true, - }, nil) + ArchiveSpanReader: true, + ArchiveSpanWriter: true, + StreamingSpanWriter: true, + }, nil).Times(3) f.builder = &mockPluginBuilder{ plugin: &mockPlugin{ - capabilities: capabilities, - archiveWriter: new(spanStoreMocks.Writer), - archiveReader: new(spanStoreMocks.Reader), + capabilities: capabilities, + archiveWriter: new(spanStoreMocks.Writer), + archiveReader: new(spanStoreMocks.Reader), + streamingSpanWriter: new(spanStoreMocks.Writer), }, + writerType: "streaming", } assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -163,6 +175,9 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) { writer, err := f.CreateArchiveSpanWriter() assert.NoError(t, err) assert.NotNil(t, writer) + writer, err = f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.streamingSpanWriter.StreamingSpanWriter(), writer) } func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { @@ -173,8 +188,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { capabilities := new(mocks.PluginCapabilities) capabilities.On("Capabilities"). Return(&shared.Capabilities{ - ArchiveSpanReader: false, - ArchiveSpanWriter: false, + ArchiveSpanReader: false, + ArchiveSpanWriter: false, + StreamingSpanWriter: false, }, nil) f.builder = &mockPluginBuilder{ @@ -182,6 +198,7 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { capabilities: capabilities, archiveWriter: new(spanStoreMocks.Writer), archiveReader: new(spanStoreMocks.Reader), + spanWriter: new(spanStoreMocks.Writer), }, } assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -193,6 +210,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { writer, err := f.CreateArchiveSpanWriter() assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error()) assert.Nil(t, writer) + writer, err = f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) } func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { @@ -210,6 +230,7 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { capabilities: capabilities, archiveWriter: new(spanStoreMocks.Writer), archiveReader: new(spanStoreMocks.Reader), + spanWriter: new(spanStoreMocks.Writer), }, } assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -221,6 +242,9 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { writer, err := f.CreateArchiveSpanWriter() assert.EqualError(t, err, customError.Error()) assert.Nil(t, writer) + writer, err = f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) } func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { @@ -232,6 +256,7 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { plugin: &mockPlugin{ archiveWriter: new(spanStoreMocks.Writer), archiveReader: new(spanStoreMocks.Reader), + spanWriter: new(spanStoreMocks.Writer), }, } assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) @@ -243,6 +268,9 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { writer, err := f.CreateArchiveSpanWriter() assert.Equal(t, err, storage.ErrArchiveStorageNotSupported) assert.Nil(t, writer) + writer, err = f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) } func TestWithConfiguration(t *testing.T) { @@ -272,3 +300,52 @@ func TestInitFromOptions(t *testing.T) { assert.Equal(t, o, f.options) assert.Equal(t, &o.Configuration, f.builder) } + +func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) { + f := NewFactory() + v := viper.New() + f.InitFromViper(v, zap.NewNop()) + + f.builder = &mockPluginBuilder{ + plugin: &mockPlugin{ + archiveWriter: new(spanStoreMocks.Writer), + archiveReader: new(spanStoreMocks.Reader), + spanWriter: new(spanStoreMocks.Writer), + }, + writerType: "streaming", + } + assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + writer, err := f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) +} + +func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) { + f := NewFactory() + v := viper.New() + f.InitFromViper(v, zap.NewNop()) + + capabilities := new(mocks.PluginCapabilities) + customError := errors.New("made-up error") + capabilities.On("Capabilities"). + Return(nil, customError).Once(). + On("Capabilities").Return(&shared.Capabilities{}, nil).Once() + + f.builder = &mockPluginBuilder{ + plugin: &mockPlugin{ + archiveWriter: new(spanStoreMocks.Writer), + archiveReader: new(spanStoreMocks.Reader), + spanWriter: new(spanStoreMocks.Writer), + capabilities: capabilities, + }, + writerType: "streaming", + } + assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + writer, err := f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return error + + writer, err = f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return false +} diff --git a/plugin/storage/grpc/grpc.go b/plugin/storage/grpc/grpc.go index 562d1957391..09ffffcd399 100644 --- a/plugin/storage/grpc/grpc.go +++ b/plugin/storage/grpc/grpc.go @@ -37,6 +37,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{ Impl: services.Store, ArchiveImpl: services.ArchiveStore, + StreamImpl: services.StreamingSpanWriter, }, }, }, diff --git a/plugin/storage/grpc/memory/plugin.go b/plugin/storage/grpc/memory/plugin.go index 56f605b099d..b0d3b5c9e0e 100644 --- a/plugin/storage/grpc/memory/plugin.go +++ b/plugin/storage/grpc/memory/plugin.go @@ -41,6 +41,10 @@ func (ns *storagePlugin) SpanWriter() spanstore.Writer { return ns.store } +func (ns *storagePlugin) StreamingSpanWriter() spanstore.Writer { + return ns.store +} + func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader { return ns.archiveStore } diff --git a/plugin/storage/grpc/memory/plugin_test.go b/plugin/storage/grpc/memory/plugin_test.go index 11cc961eb74..fbcb893c17d 100644 --- a/plugin/storage/grpc/memory/plugin_test.go +++ b/plugin/storage/grpc/memory/plugin_test.go @@ -31,6 +31,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) { assert.Equal(t, mainStorage, memStorePlugin.DependencyReader()) assert.Equal(t, mainStorage, memStorePlugin.SpanReader()) assert.Equal(t, mainStorage, memStorePlugin.SpanWriter()) + assert.Equal(t, mainStorage, memStorePlugin.StreamingSpanWriter()) assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader()) assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter()) diff --git a/plugin/storage/grpc/proto/storage.proto b/plugin/storage/grpc/proto/storage.proto index 38c9df5b78c..51c804948fb 100644 --- a/plugin/storage/grpc/proto/storage.proto +++ b/plugin/storage/grpc/proto/storage.proto @@ -146,6 +146,10 @@ service SpanWriterPlugin { rpc Close(CloseWriterRequest) returns (CloseWriterResponse); } +service StreamingSpanWriterPlugin { + rpc WriteSpanStream(stream WriteSpanRequest) returns (WriteSpanResponse); +} + service SpanReaderPlugin { // spanstore/Reader rpc GetTrace(GetTraceRequest) returns (stream SpansResponseChunk); @@ -178,6 +182,7 @@ message CapabilitiesRequest { message CapabilitiesResponse { bool archiveSpanReader = 1; bool archiveSpanWriter = 2; + bool streamingSpanWriter = 3; } service PluginCapabilities { diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 2e1af1a246c..a13dfbea754 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -52,6 +52,7 @@ type grpcClient struct { archiveWriterClient storage_v1.ArchiveSpanWriterPluginClient capabilitiesClient storage_v1.PluginCapabilitiesClient depsReaderClient storage_v1.DependenciesReaderPluginClient + streamWriterClient storage_v1.StreamingSpanWriterPluginClient } func NewGRPCClient(c *grpc.ClientConn) *grpcClient { @@ -62,6 +63,7 @@ func NewGRPCClient(c *grpc.ClientConn) *grpcClient { archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), + streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c), } } @@ -110,6 +112,10 @@ func (c *grpcClient) SpanWriter() spanstore.Writer { return c } +func (c *grpcClient) StreamingSpanWriter() spanstore.Writer { + return newStreamingSpanWriter(c.streamWriterClient) +} + func (c *grpcClient) ArchiveSpanReader() spanstore.Reader { return &archiveReader{client: c.archiveReaderClient} } @@ -278,8 +284,9 @@ func (c *grpcClient) Capabilities() (*Capabilities, error) { } return &Capabilities{ - ArchiveSpanReader: capabilities.ArchiveSpanReader, - ArchiveSpanWriter: capabilities.ArchiveSpanWriter, + ArchiveSpanReader: capabilities.ArchiveSpanReader, + ArchiveSpanWriter: capabilities.ArchiveSpanWriter, + StreamingSpanWriter: capabilities.StreamingSpanWriter, }, nil } diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index 51e1252a5d9..010bf578a0b 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -78,6 +79,7 @@ type grpcClientTest struct { archiveWriter *grpcMocks.ArchiveSpanWriterPluginClient capabilities *grpcMocks.PluginCapabilitiesClient depsReader *grpcMocks.DependenciesReaderPluginClient + streamWriter *grpcMocks.StreamingSpanWriterPluginClient } func withGRPCClient(fn func(r *grpcClientTest)) { @@ -86,6 +88,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) { spanWriter := new(grpcMocks.SpanWriterPluginClient) archiveWriter := new(grpcMocks.ArchiveSpanWriterPluginClient) depReader := new(grpcMocks.DependenciesReaderPluginClient) + streamWriter := new(grpcMocks.StreamingSpanWriterPluginClient) capabilities := new(grpcMocks.PluginCapabilitiesClient) r := &grpcClientTest{ @@ -96,6 +99,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) { archiveWriterClient: archiveWriter, capabilitiesClient: capabilities, depsReaderClient: depReader, + streamWriterClient: streamWriter, }, spanReader: spanReader, spanWriter: spanWriter, @@ -103,10 +107,25 @@ func withGRPCClient(fn func(r *grpcClientTest)) { archiveWriter: archiveWriter, depsReader: depReader, capabilities: capabilities, + streamWriter: streamWriter, } fn(r) } +func TestNewGRPCClient(t *testing.T) { + conn := &grpc.ClientConn{} + client := NewGRPCClient(conn) + assert.NotNil(t, client) + + assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient) + assert.Implements(t, (*storage_v1.SpanWriterPluginClient)(nil), client.writerClient) + assert.Implements(t, (*storage_v1.ArchiveSpanReaderPluginClient)(nil), client.archiveReaderClient) + assert.Implements(t, (*storage_v1.ArchiveSpanWriterPluginClient)(nil), client.archiveWriterClient) + assert.Implements(t, (*storage_v1.PluginCapabilitiesClient)(nil), client.capabilitiesClient) + assert.Implements(t, (*storage_v1.DependenciesReaderPluginClient)(nil), client.depsReaderClient) + assert.Implements(t, (*storage_v1.StreamingSpanWriterPluginClient)(nil), client.streamWriterClient) +} + func TestContextUpgradeWithToken(t *testing.T) { testBearerToken := "test-bearer-token" ctx := bearertoken.ContextWithBearerToken(context.Background(), testBearerToken) @@ -294,7 +313,7 @@ func TestGRPCClientWriteSpan(t *testing.T) { Span: &mockTraceSpans[0], }).Return(&storage_v1.WriteSpanResponse{}, nil) - err := r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) + err := r.client.SpanWriter().WriteSpan(context.Background(), &mockTraceSpans[0]) assert.NoError(t, err) }) } @@ -361,6 +380,16 @@ func TestGrpcClientWriteArchiveSpan_Error(t *testing.T) { }) } +func TestGrpcClientStreamWriterWriteSpan(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamClient) + r.streamWriter.On("WriteSpanStream", mock.Anything).Return(stream, nil) + stream.On("Send", &storage_v1.WriteSpanRequest{Span: &mockTraceSpans[0]}).Return(nil) + err := r.client.StreamingSpanWriter().WriteSpan(context.Background(), &mockTraceSpans[0]) + assert.NoError(t, err) + }) +} + func TestGrpcClientGetArchiveTrace(t *testing.T) { withGRPCClient(func(r *grpcClientTest) { traceClient := new(grpcMocks.ArchiveSpanReaderPlugin_GetArchiveTraceClient) @@ -428,13 +457,14 @@ func TestGrpcClientGetArchiveTrace_StreamErrorTraceNotFound(t *testing.T) { func TestGrpcClientCapabilities(t *testing.T) { withGRPCClient(func(r *grpcClientTest) { r.capabilities.On("Capabilities", mock.Anything, &storage_v1.CapabilitiesRequest{}). - Return(&storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true}, nil) + Return(&storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true, StreamingSpanWriter: true}, nil) capabilities, err := r.client.Capabilities() assert.NoError(t, err) assert.Equal(t, &Capabilities{ - ArchiveSpanReader: true, - ArchiveSpanWriter: true, + ArchiveSpanReader: true, + ArchiveSpanWriter: true, + StreamingSpanWriter: true, }, capabilities) }) } @@ -447,8 +477,9 @@ func TestGrpcClientCapabilities_NotSupported(t *testing.T) { capabilities, err := r.client.Capabilities() assert.NoError(t, err) assert.Equal(t, &Capabilities{ - ArchiveSpanReader: false, - ArchiveSpanWriter: false, + ArchiveSpanReader: false, + ArchiveSpanWriter: false, + StreamingSpanWriter: false, }, capabilities) }) } diff --git a/plugin/storage/grpc/shared/grpc_server.go b/plugin/storage/grpc/shared/grpc_server.go index 45ad763b42e..e78beab94fd 100644 --- a/plugin/storage/grpc/shared/grpc_server.go +++ b/plugin/storage/grpc/shared/grpc_server.go @@ -33,6 +33,7 @@ const spanBatchSize = 1000 type grpcServer struct { Impl StoragePlugin ArchiveImpl ArchiveStoragePlugin + StreamImpl StreamingSpanWriterPlugin } // GetDependencies returns all interservice dependencies @@ -46,6 +47,21 @@ func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDepen }, nil } +// WriteSpanStream recieve the span from stream and save it +func (s *grpcServer) WriteSpanStream(stream storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamServer) error { + for { + in, err := stream.Recv() + if err == io.EOF { + break + } + err = s.StreamImpl.StreamingSpanWriter().WriteSpan(stream.Context(), in.Span) + if err != nil { + return err + } + } + return stream.SendAndClose(&storage_v1.WriteSpanResponse{}) +} + // WriteSpan saves the span func (s *grpcServer) WriteSpan(ctx context.Context, r *storage_v1.WriteSpanRequest) (*storage_v1.WriteSpanResponse, error) { err := s.Impl.SpanWriter().WriteSpan(ctx, r.Span) @@ -183,8 +199,9 @@ func (s *grpcServer) sendSpans(spans []*model.Span, sendFn func(*storage_v1.Span func (s *grpcServer) Capabilities(ctx context.Context, request *storage_v1.CapabilitiesRequest) (*storage_v1.CapabilitiesResponse, error) { return &storage_v1.CapabilitiesResponse{ - ArchiveSpanReader: s.ArchiveImpl != nil, - ArchiveSpanWriter: s.ArchiveImpl != nil, + ArchiveSpanReader: s.ArchiveImpl != nil, + ArchiveSpanWriter: s.ArchiveImpl != nil, + StreamingSpanWriter: s.StreamImpl != nil, }, nil } diff --git a/plugin/storage/grpc/shared/grpc_server_test.go b/plugin/storage/grpc/shared/grpc_server_test.go index fcd676874c0..db4c283cb1a 100644 --- a/plugin/storage/grpc/shared/grpc_server_test.go +++ b/plugin/storage/grpc/shared/grpc_server_test.go @@ -17,6 +17,7 @@ package shared import ( "context" "fmt" + "io" "testing" "time" @@ -40,6 +41,7 @@ type mockStoragePlugin struct { archiveReader *spanStoreMocks.Reader archiveWriter *spanStoreMocks.Writer depsReader *dependencyStoreMocks.Reader + streamWriter *spanStoreMocks.Writer } func (plugin *mockStoragePlugin) ArchiveSpanReader() spanstore.Reader { @@ -62,6 +64,10 @@ func (plugin *mockStoragePlugin) DependencyReader() dependencystore.Reader { return plugin.depsReader } +func (plugin *mockStoragePlugin) StreamingSpanWriter() spanstore.Writer { + return plugin.streamWriter +} + type grpcServerTest struct { server *grpcServer impl *mockStoragePlugin @@ -73,6 +79,7 @@ func withGRPCServer(fn func(r *grpcServerTest)) { archiveReader := new(spanStoreMocks.Reader) archiveWriter := new(spanStoreMocks.Writer) depReader := new(dependencyStoreMocks.Reader) + streamWriter := new(spanStoreMocks.Writer) impl := &mockStoragePlugin{ spanReader: spanReader, @@ -80,12 +87,14 @@ func withGRPCServer(fn func(r *grpcServerTest)) { archiveReader: archiveReader, archiveWriter: archiveWriter, depsReader: depReader, + streamWriter: streamWriter, } r := &grpcServerTest{ server: &grpcServer{ Impl: impl, ArchiveImpl: impl, + StreamImpl: impl, }, impl: impl, } @@ -221,6 +230,25 @@ func TestGRPCServerWriteSpan(t *testing.T) { }) } +func TestGRPCServerWriteSpanStream(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamServer) + stream.On("Recv").Return(&storage_v1.WriteSpanRequest{Span: &mockTraceSpans[0]}, nil).Twice(). + On("Recv").Return(nil, io.EOF).Once() + stream.On("SendAndClose", &storage_v1.WriteSpanResponse{}).Return(nil) + stream.On("Context").Return(context.Background()) + r.impl.streamWriter.On("WriteSpan", context.Background(), &mockTraceSpans[0]). + Return(fmt.Errorf("some error")).Once(). + On("WriteSpan", context.Background(), &mockTraceSpans[0]). + Return(nil) + + err := r.server.WriteSpanStream(stream) + assert.Error(t, err) + err = r.server.WriteSpanStream(stream) + assert.NoError(t, err) + }) +} + func TestGRPCServerGetDependencies(t *testing.T) { withGRPCServer(func(r *grpcServerTest) { lookback := time.Duration(1 * time.Second) @@ -370,7 +398,7 @@ func TestGRPCServerCapabilities(t *testing.T) { withGRPCServer(func(r *grpcServerTest) { capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{}) assert.NoError(t, err) - assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true}, capabilities) + assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true, StreamingSpanWriter: true}, capabilities) }) } @@ -380,6 +408,16 @@ func TestGRPCServerCapabilities_NoArchive(t *testing.T) { capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{}) assert.NoError(t, err) - assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: false, ArchiveSpanWriter: false}, capabilities) + assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: false, ArchiveSpanWriter: false, StreamingSpanWriter: true}, capabilities) + }) +} + +func TestGRPCServerCapabilities_NoStreamWriter(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + r.server.StreamImpl = nil + + capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{}) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true}, capabilities) }) } diff --git a/plugin/storage/grpc/shared/interface.go b/plugin/storage/grpc/shared/interface.go index 89cc0049b81..c6345a29b82 100644 --- a/plugin/storage/grpc/shared/interface.go +++ b/plugin/storage/grpc/shared/interface.go @@ -48,6 +48,11 @@ type ArchiveStoragePlugin interface { ArchiveSpanWriter() spanstore.Writer } +// StreamingSpanWriterPlugin is the interface we're exposing as a plugin. +type StreamingSpanWriterPlugin interface { + StreamingSpanWriter() spanstore.Writer +} + // PluginCapabilities allow expose plugin its capabilities. type PluginCapabilities interface { Capabilities() (*Capabilities, error) @@ -55,12 +60,14 @@ type PluginCapabilities interface { // Capabilities contains information about plugin capabilities type Capabilities struct { - ArchiveSpanReader bool - ArchiveSpanWriter bool + ArchiveSpanReader bool + ArchiveSpanWriter bool + StreamingSpanWriter bool } // PluginServices defines services plugin can expose type PluginServices struct { - Store StoragePlugin - ArchiveStore ArchiveStoragePlugin + Store StoragePlugin + ArchiveStore ArchiveStoragePlugin + StreamingSpanWriter StreamingSpanWriterPlugin } diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index 44884745d1d..89347fef5cf 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -34,6 +34,7 @@ type StorageGRPCPlugin struct { // Concrete implementation, This is only used for plugins that are written in Go. Impl StoragePlugin ArchiveImpl ArchiveStoragePlugin + StreamImpl StreamingSpanWriterPlugin } // RegisterHandlers registers the plugin with the server @@ -41,6 +42,7 @@ func (p *StorageGRPCPlugin) RegisterHandlers(s *grpc.Server) error { server := &grpcServer{ Impl: p.Impl, ArchiveImpl: p.ArchiveImpl, + StreamImpl: p.StreamImpl, } storage_v1.RegisterSpanReaderPluginServer(s, server) storage_v1.RegisterSpanWriterPluginServer(s, server) @@ -48,6 +50,7 @@ func (p *StorageGRPCPlugin) RegisterHandlers(s *grpc.Server) error { storage_v1.RegisterArchiveSpanWriterPluginServer(s, server) storage_v1.RegisterPluginCapabilitiesServer(s, server) storage_v1.RegisterDependenciesReaderPluginServer(s, server) + storage_v1.RegisterStreamingSpanWriterPluginServer(s, server) return nil } diff --git a/plugin/storage/grpc/shared/streaming_writer.go b/plugin/storage/grpc/shared/streaming_writer.go new file mode 100644 index 00000000000..d739a41d7f1 --- /dev/null +++ b/plugin/storage/grpc/shared/streaming_writer.go @@ -0,0 +1,103 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "errors" + "fmt" + + "go.uber.org/atomic" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var ( + _ spanstore.Writer = (*streamingSpanWriter)(nil) +) + +const ( + defaultMaxPoolSize = 1000 +) + +// streamingSpanWriter wraps storage_v1.StreamingSpanWriterPluginClient into spanstore.Writer +type streamingSpanWriter struct { + client storage_v1.StreamingSpanWriterPluginClient + streamPool chan storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient + closed *atomic.Bool +} + +func newStreamingSpanWriter(client storage_v1.StreamingSpanWriterPluginClient) *streamingSpanWriter { + s := &streamingSpanWriter{ + client: client, + closed: atomic.NewBool(false), + streamPool: make(chan storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient, defaultMaxPoolSize), + } + return s +} + +// WriteSpan write span into stream +func (s *streamingSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { + stream, err := s.getStream(ctx) + if err != nil { + return fmt.Errorf("plugin getStream error: %w", err) + } + if err := stream.Send(&storage_v1.WriteSpanRequest{Span: span}); err != nil { + return fmt.Errorf("plugin Send error: %w", err) + } + s.putStream(stream) + return nil +} + +func (s *streamingSpanWriter) Close() error { + if !s.closed.CAS(false, true) { + return errors.New("already closed") + } + close(s.streamPool) + for stream := range s.streamPool { + if _, err := stream.CloseAndRecv(); err != nil { + return err + } + } + return nil +} + +func (s *streamingSpanWriter) getStream(ctx context.Context) (storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient, error) { + select { + case st, ok := <-s.streamPool: + if ok { + return st, nil + } + return nil, fmt.Errorf("plugin is closed") + default: + return s.client.WriteSpanStream(ctx) + } +} + +func (s *streamingSpanWriter) putStream(stream storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient) error { + if s.closed.Load() { + _, err := stream.CloseAndRecv() + return err + } + select { + case s.streamPool <- stream: + return nil + default: + _, err := stream.CloseAndRecv() + return err + } +} diff --git a/plugin/storage/grpc/shared/streaming_writer_test.go b/plugin/storage/grpc/shared/streaming_writer_test.go new file mode 100644 index 00000000000..d726b24e9be --- /dev/null +++ b/plugin/storage/grpc/shared/streaming_writer_test.go @@ -0,0 +1,101 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + grpcMocks "github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks" +) + +type streamingSpanWriterTest struct { + client *streamingSpanWriter + streamingSpanWriter *grpcMocks.StreamingSpanWriterPluginClient +} + +func withStreamingWriterGRPCClient(fn func(r *streamingSpanWriterTest)) { + streamingWriterClient := new(grpcMocks.StreamingSpanWriterPluginClient) + r := &streamingSpanWriterTest{ + client: newStreamingSpanWriter(streamingWriterClient), + streamingSpanWriter: streamingWriterClient, + } + fn(r) +} + +func TestStreamClientWriteSpan(t *testing.T) { + withStreamingWriterGRPCClient(func(r *streamingSpanWriterTest) { + stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamClient) + stream.On("Send", &storage_v1.WriteSpanRequest{Span: &mockTraceSpans[0]}).Return(io.EOF).Once(). + On("Send", &storage_v1.WriteSpanRequest{Span: &mockTraceSpans[0]}).Return(nil).Twice() + r.streamingSpanWriter.On("WriteSpanStream", mock.Anything).Return(nil, status.Error(codes.DeadlineExceeded, "timeout")).Once(). + On("WriteSpanStream", mock.Anything).Return(stream, nil) + + err := r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) + assert.ErrorContains(t, err, "timeout") + err = r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) + assert.ErrorContains(t, err, "EOF") + err = r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) + assert.NoError(t, err) + err = r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) // get stream from pool should succeed + assert.NoError(t, err) + + stream.On("CloseAndRecv").Return(nil, status.Error(codes.DeadlineExceeded, "timeout")) + for i := 0; i < defaultMaxPoolSize; i++ { // putStream when pool is full should call CloseAndRecv + err = r.client.putStream(stream) + if i == defaultMaxPoolSize-1 { + assert.ErrorContains(t, err, "timeout", i) + } else { + assert.NoError(t, err, i) + } + } + }) +} + +func TestStreamClientClose(t *testing.T) { + withStreamingWriterGRPCClient(func(r *streamingSpanWriterTest) { + stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamClient) + stream.On("CloseAndRecv").Return(&storage_v1.WriteSpanResponse{}, nil).Once() + r.client.streamPool <- stream + + err := r.client.Close() + assert.NoError(t, err) + err = r.client.Close() + assert.ErrorContains(t, err, "already closed") + + err = r.client.WriteSpan(context.Background(), &mockTraceSpans[0]) // getStream from pool should fail when closed + assert.ErrorContains(t, err, "closed") + }) +} + +func TestStreamClientCloseFail(t *testing.T) { + withStreamingWriterGRPCClient(func(r *streamingSpanWriterTest) { + stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamClient) + stream.On("CloseAndRecv").Return(nil, status.Error(codes.DeadlineExceeded, "timeout")).Twice() + r.client.streamPool <- stream + + err := r.client.Close() + assert.ErrorContains(t, err, "timeout") + err = r.client.putStream(stream) + assert.ErrorContains(t, err, "timeout") // putStream after closed should call CloseAndRecv + }) +} diff --git a/plugin/storage/integration/fixtures/grpc_plugin_conf.yaml b/plugin/storage/integration/fixtures/grpc_plugin_conf.yaml new file mode 100644 index 00000000000..9585c34e9df --- /dev/null +++ b/plugin/storage/integration/fixtures/grpc_plugin_conf.yaml @@ -0,0 +1 @@ +memory.enable-streaming-writer: true \ No newline at end of file diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 53b7ebd8f1b..26e3dbb9e36 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -20,6 +20,7 @@ package integration import ( "net" "os" + "path" "sync" "testing" @@ -36,7 +37,10 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/memory" ) -const defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin" +const ( + defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin" + streamingPluginConfigPath = "fixtures/grpc_plugin_conf.yaml" +) type gRPCServer struct { errChan chan error @@ -138,24 +142,39 @@ func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { return s.initialize() } -func TestGRPCStorage(t *testing.T) { +func getPluginFlags(t *testing.T) []string { binaryPath := os.Getenv("PLUGIN_BINARY_PATH") if binaryPath == "" { t.Logf("PLUGIN_BINARY_PATH env var not set, using %s", defaultPluginBinaryPath) binaryPath = defaultPluginBinaryPath } + + return []string{ + "--grpc-storage-plugin.binary", binaryPath, + "--grpc-storage-plugin.log-level", "debug", + } +} + +func TestGRPCStorage(t *testing.T) { + flags := getPluginFlags(t) configPath := os.Getenv("PLUGIN_CONFIG_PATH") if configPath == "" { t.Log("PLUGIN_CONFIG_PATH env var not set") } + flags = append(flags, "--grpc-storage-plugin.configuration-file", configPath) - flags := []string{ - "--grpc-storage-plugin.binary", binaryPath, - "--grpc-storage-plugin.log-level", "debug", + s := &GRPCStorageIntegrationTestSuite{ + flags: flags, } - flags = append(flags, - "--grpc-storage-plugin.configuration-file", configPath, - ) + require.NoError(t, s.initialize()) + s.IntegrationTestAll(t) +} + +func TestGRPCStreamingWriter(t *testing.T) { + flags := getPluginFlags(t) + wd, err := os.Getwd() + require.NoError(t, err) + flags = append(flags, "--grpc-storage-plugin.configuration-file", path.Join(wd, streamingPluginConfigPath)) s := &GRPCStorageIntegrationTestSuite{ flags: flags, diff --git a/plugin/storage/memory/options.go b/plugin/storage/memory/options.go index d2ec03d21b4..a18bec6ae17 100644 --- a/plugin/storage/memory/options.go +++ b/plugin/storage/memory/options.go @@ -22,7 +22,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/memory/config" ) -const limit = "memory.max-traces" +const ( + limit = "memory.max-traces" + enableStreamingWriter = "memory.enable-streaming-writer" +) // Options stores the configuration entries for this storage type Options struct { @@ -32,9 +35,11 @@ type Options struct { // AddFlags from this storage to the CLI func AddFlags(flagSet *flag.FlagSet) { flagSet.Int(limit, 0, "The maximum amount of traces to store in memory. The default number of traces is unbounded.") + flagSet.Bool(enableStreamingWriter, false, "The switch that controls whether to enable streaming span writer, the default is false") } // InitFromViper initializes the options struct with values from Viper func (opt *Options) InitFromViper(v *viper.Viper) { opt.Configuration.MaxTraces = v.GetInt(limit) + opt.Configuration.EnableStreamingWriter = v.GetBool(enableStreamingWriter) } diff --git a/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginClient.go b/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginClient.go new file mode 100644 index 00000000000..df393963213 --- /dev/null +++ b/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginClient.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.10.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + grpc "google.golang.org/grpc" + + mock "github.com/stretchr/testify/mock" + + storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" +) + +// StreamingSpanWriterPluginClient is an autogenerated mock type for the StreamingSpanWriterPluginClient type +type StreamingSpanWriterPluginClient struct { + mock.Mock +} + +// WriteSpanStream provides a mock function with given fields: ctx, opts +func (_m *StreamingSpanWriterPluginClient) WriteSpanStream(ctx context.Context, opts ...grpc.CallOption) (storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient + if rf, ok := ret.Get(0).(func(context.Context, ...grpc.CallOption) storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient); ok { + r0 = rf(ctx, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamClient) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, ...grpc.CallOption) error); ok { + r1 = rf(ctx, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginServer.go b/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginServer.go new file mode 100644 index 00000000000..cf4b24cc63e --- /dev/null +++ b/proto-gen/storage_v1/mocks/StreamingSpanWriterPluginServer.go @@ -0,0 +1,27 @@ +// Code generated by mockery v2.10.4. DO NOT EDIT. + +package mocks + +import ( + storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + mock "github.com/stretchr/testify/mock" +) + +// StreamingSpanWriterPluginServer is an autogenerated mock type for the StreamingSpanWriterPluginServer type +type StreamingSpanWriterPluginServer struct { + mock.Mock +} + +// WriteSpanStream provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPluginServer) WriteSpanStream(_a0 storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamServer) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(storage_v1.StreamingSpanWriterPlugin_WriteSpanStreamServer) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamClient.go b/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamClient.go new file mode 100644 index 00000000000..5f8beaa25ba --- /dev/null +++ b/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamClient.go @@ -0,0 +1,151 @@ +// Code generated by mockery v2.10.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + metadata "google.golang.org/grpc/metadata" + + storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" +) + +// StreamingSpanWriterPlugin_WriteSpanStreamClient is an autogenerated mock type for the StreamingSpanWriterPlugin_WriteSpanStreamClient type +type StreamingSpanWriterPlugin_WriteSpanStreamClient struct { + mock.Mock +} + +// CloseAndRecv provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) CloseAndRecv() (*storage_v1.WriteSpanResponse, error) { + ret := _m.Called() + + var r0 *storage_v1.WriteSpanResponse + if rf, ok := ret.Get(0).(func() *storage_v1.WriteSpanResponse); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.WriteSpanResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CloseSend provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Context provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// Header provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Send provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) Send(_a0 *storage_v1.WriteSpanRequest) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.WriteSpanRequest) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Trailer provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamClient) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +} diff --git a/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamServer.go b/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamServer.go new file mode 100644 index 00000000000..02efac90131 --- /dev/null +++ b/proto-gen/storage_v1/mocks/StreamingSpanWriterPlugin_WriteSpanStreamServer.go @@ -0,0 +1,131 @@ +// Code generated by mockery v2.10.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + metadata "google.golang.org/grpc/metadata" + + storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" +) + +// StreamingSpanWriterPlugin_WriteSpanStreamServer is an autogenerated mock type for the StreamingSpanWriterPlugin_WriteSpanStreamServer type +type StreamingSpanWriterPlugin_WriteSpanStreamServer struct { + mock.Mock +} + +// Context provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// Recv provides a mock function with given fields: +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) Recv() (*storage_v1.WriteSpanRequest, error) { + ret := _m.Called() + + var r0 *storage_v1.WriteSpanRequest + if rf, ok := ret.Get(0).(func() *storage_v1.WriteSpanRequest); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.WriteSpanRequest) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendAndClose provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) SendAndClose(_a0 *storage_v1.WriteSpanResponse) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.WriteSpanResponse) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendHeader provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) SendHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetHeader provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) SetHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetTrailer provides a mock function with given fields: _a0 +func (_m *StreamingSpanWriterPlugin_WriteSpanStreamServer) SetTrailer(_a0 metadata.MD) { + _m.Called(_a0) +} diff --git a/proto-gen/storage_v1/storage.pb.go b/proto-gen/storage_v1/storage.pb.go index 473e3966217..5564abb7ed4 100644 --- a/proto-gen/storage_v1/storage.pb.go +++ b/proto-gen/storage_v1/storage.pb.go @@ -920,6 +920,7 @@ var xxx_messageInfo_CapabilitiesRequest proto.InternalMessageInfo type CapabilitiesResponse struct { ArchiveSpanReader bool `protobuf:"varint,1,opt,name=archiveSpanReader,proto3" json:"archiveSpanReader,omitempty"` ArchiveSpanWriter bool `protobuf:"varint,2,opt,name=archiveSpanWriter,proto3" json:"archiveSpanWriter,omitempty"` + StreamingSpanWriter bool `protobuf:"varint,3,opt,name=streamingSpanWriter,proto3" json:"streamingSpanWriter,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -972,6 +973,13 @@ func (m *CapabilitiesResponse) GetArchiveSpanWriter() bool { return false } +func (m *CapabilitiesResponse) GetStreamingSpanWriter() bool { + if m != nil { + return m.StreamingSpanWriter + } + return false +} + func init() { proto.RegisterType((*GetDependenciesRequest)(nil), "jaeger.storage.v1.GetDependenciesRequest") proto.RegisterType((*GetDependenciesResponse)(nil), "jaeger.storage.v1.GetDependenciesResponse") @@ -998,75 +1006,77 @@ func init() { func init() { proto.RegisterFile("storage.proto", fileDescriptor_0d2c4ccf1453ffdb) } var fileDescriptor_0d2c4ccf1453ffdb = []byte{ - // 1074 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4f, 0x73, 0xdb, 0x44, - 0x14, 0x47, 0x89, 0x5d, 0xdb, 0xcf, 0x4e, 0x49, 0xd6, 0x2e, 0x15, 0x82, 0xc6, 0x41, 0xd0, 0x24, - 0x30, 0x20, 0x13, 0x73, 0x80, 0x81, 0x32, 0x50, 0x27, 0xa9, 0x27, 0x40, 0xa1, 0xa8, 0x99, 0x76, - 0x86, 0x42, 0x3d, 0x6b, 0x6b, 0x51, 0x44, 0xac, 0x95, 0xaa, 0x3f, 0x1e, 0xfb, 0xc0, 0x8d, 0x0f, - 0xc0, 0x91, 0x13, 0x57, 0xbe, 0x07, 0xa7, 0x1e, 0x39, 0x73, 0x08, 0x4c, 0xae, 0x7c, 0x09, 0x46, - 0xbb, 0x2b, 0x59, 0xb2, 0x34, 0x49, 0x9a, 0xc9, 0x4d, 0xfb, 0xf6, 0xb7, 0xbf, 0xf7, 0xff, 0x3d, - 0xc1, 0x8a, 0x1f, 0x38, 0x1e, 0x36, 0x89, 0xe6, 0x7a, 0x4e, 0xe0, 0xa0, 0xb5, 0x9f, 0x30, 0x31, - 0x89, 0xa7, 0xc5, 0xd2, 0xc9, 0x8e, 0xd2, 0x32, 0x1d, 0xd3, 0x61, 0xb7, 0x9d, 0xe8, 0x8b, 0x03, - 0x95, 0xb6, 0xe9, 0x38, 0xe6, 0x98, 0x74, 0xd8, 0x69, 0x18, 0xfe, 0xd8, 0x09, 0x2c, 0x9b, 0xf8, - 0x01, 0xb6, 0x5d, 0x01, 0x58, 0x5f, 0x04, 0x18, 0xa1, 0x87, 0x03, 0xcb, 0xa1, 0xe2, 0xbe, 0x6e, - 0x3b, 0x06, 0x19, 0xf3, 0x83, 0xfa, 0xbb, 0x04, 0xaf, 0xf4, 0x49, 0xb0, 0x47, 0x5c, 0x42, 0x0d, - 0x42, 0x47, 0x16, 0xf1, 0x75, 0xf2, 0x2c, 0x24, 0x7e, 0x80, 0x76, 0x01, 0xfc, 0x00, 0x7b, 0xc1, - 0x20, 0x52, 0x20, 0x4b, 0x1b, 0xd2, 0x76, 0xbd, 0xab, 0x68, 0x9c, 0x5c, 0x8b, 0xc9, 0xb5, 0xc3, - 0x58, 0x7b, 0xaf, 0xfa, 0xfc, 0xa4, 0xfd, 0xd2, 0xaf, 0xff, 0xb4, 0x25, 0xbd, 0xc6, 0xde, 0x45, - 0x37, 0xe8, 0x33, 0xa8, 0x12, 0x6a, 0x70, 0x8a, 0xa5, 0x17, 0xa0, 0xa8, 0x10, 0x6a, 0x44, 0x72, - 0x75, 0x08, 0x37, 0x73, 0xf6, 0xf9, 0xae, 0x43, 0x7d, 0x82, 0xfa, 0xd0, 0x30, 0x52, 0x72, 0x59, - 0xda, 0x58, 0xde, 0xae, 0x77, 0x6f, 0x69, 0x22, 0x92, 0xd8, 0xb5, 0x06, 0x93, 0xae, 0x96, 0x3c, - 0x9d, 0x7d, 0x65, 0xd1, 0xe3, 0x5e, 0x29, 0x52, 0xa1, 0x67, 0x1e, 0xaa, 0x9f, 0xc0, 0xea, 0x63, - 0xcf, 0x0a, 0xc8, 0x43, 0x17, 0xd3, 0xd8, 0xfb, 0x2d, 0x28, 0xf9, 0x2e, 0xa6, 0xc2, 0xef, 0xe6, - 0x02, 0x29, 0x43, 0x32, 0x80, 0xda, 0x84, 0xb5, 0xd4, 0x63, 0x6e, 0x9a, 0xda, 0x02, 0xb4, 0x3b, - 0x76, 0x7c, 0xc2, 0x6e, 0x3c, 0xc1, 0xa9, 0xde, 0x80, 0x66, 0x46, 0x2a, 0xc0, 0x14, 0x5e, 0xee, - 0x93, 0xe0, 0xd0, 0xc3, 0x23, 0x12, 0x6b, 0x7f, 0x02, 0xd5, 0x20, 0x3a, 0x0f, 0x2c, 0x83, 0x59, - 0xd0, 0xe8, 0x7d, 0x1e, 0xd9, 0xfd, 0xf7, 0x49, 0xfb, 0x3d, 0xd3, 0x0a, 0x8e, 0xc2, 0xa1, 0x36, - 0x72, 0xec, 0x0e, 0xb7, 0x29, 0x02, 0x5a, 0xd4, 0x14, 0xa7, 0x0e, 0xcf, 0x2e, 0x63, 0x3b, 0xd8, - 0x3b, 0x3d, 0x69, 0x57, 0xc4, 0xa7, 0x5e, 0x61, 0x8c, 0x07, 0x46, 0x64, 0x5c, 0x9f, 0x04, 0x0f, - 0x89, 0x37, 0xb1, 0x46, 0x49, 0xba, 0xd5, 0x1d, 0x68, 0x66, 0xa4, 0x22, 0xc8, 0x0a, 0x54, 0x7d, - 0x21, 0x63, 0x01, 0xae, 0xe9, 0xc9, 0x59, 0xbd, 0x0f, 0xad, 0x3e, 0x09, 0xbe, 0x71, 0x09, 0xaf, - 0xaf, 0xa4, 0x72, 0x64, 0xa8, 0x08, 0x0c, 0x33, 0xbe, 0xa6, 0xc7, 0x47, 0xf4, 0x1a, 0xd4, 0xa2, - 0xa0, 0x0d, 0x8e, 0x2d, 0x6a, 0xb0, 0x7a, 0x88, 0xe8, 0x5c, 0x4c, 0xbf, 0xb4, 0xa8, 0xa1, 0xde, - 0x81, 0x5a, 0xc2, 0x85, 0x10, 0x94, 0x28, 0xb6, 0x63, 0x02, 0xf6, 0x7d, 0xf6, 0xeb, 0x9f, 0xe1, - 0xc6, 0x82, 0x31, 0xc2, 0x83, 0x4d, 0xb8, 0xee, 0xc4, 0xd2, 0xaf, 0xb1, 0x9d, 0xf8, 0xb1, 0x20, - 0x45, 0x77, 0x00, 0x12, 0x89, 0x2f, 0x2f, 0xb1, 0x62, 0x7a, 0x5d, 0xcb, 0xb5, 0xa5, 0x96, 0xa8, - 0xd0, 0x53, 0x78, 0xf5, 0x8f, 0x12, 0xb4, 0x58, 0xa4, 0xbf, 0x0d, 0x89, 0x37, 0x7b, 0x80, 0x3d, - 0x6c, 0x93, 0x80, 0x78, 0x3e, 0x7a, 0x03, 0x1a, 0xc2, 0xfb, 0x41, 0xca, 0xa1, 0xba, 0x90, 0x45, - 0xaa, 0xd1, 0xed, 0x94, 0x85, 0x1c, 0xc4, 0x9d, 0x5b, 0xc9, 0x58, 0x88, 0xf6, 0xa1, 0x14, 0x60, - 0xd3, 0x97, 0x97, 0x99, 0x69, 0x3b, 0x05, 0xa6, 0x15, 0x19, 0xa0, 0x1d, 0x62, 0xd3, 0xdf, 0xa7, - 0x81, 0x37, 0xd3, 0xd9, 0x73, 0xf4, 0x05, 0x5c, 0x9f, 0xf7, 0xf5, 0xc0, 0xb6, 0xa8, 0x5c, 0x7a, - 0x81, 0xc6, 0x6c, 0x24, 0xbd, 0x7d, 0xdf, 0xa2, 0x8b, 0x5c, 0x78, 0x2a, 0x97, 0x2f, 0xc7, 0x85, - 0xa7, 0xe8, 0x1e, 0x34, 0xe2, 0x49, 0xc5, 0xac, 0xba, 0xc6, 0x98, 0x5e, 0xcd, 0x31, 0xed, 0x09, - 0x10, 0x27, 0xfa, 0x2d, 0x22, 0xaa, 0xc7, 0x0f, 0x23, 0x9b, 0x32, 0x3c, 0x78, 0x2a, 0x57, 0x2e, - 0xc3, 0x83, 0xa7, 0xe8, 0x16, 0x00, 0x0d, 0xed, 0x01, 0xeb, 0x1a, 0x5f, 0xae, 0x6e, 0x48, 0xdb, - 0x65, 0xbd, 0x46, 0x43, 0x9b, 0x05, 0xd9, 0x57, 0x3e, 0x84, 0x5a, 0x12, 0x59, 0xb4, 0x0a, 0xcb, - 0xc7, 0x64, 0x26, 0x72, 0x1b, 0x7d, 0xa2, 0x16, 0x94, 0x27, 0x78, 0x1c, 0xc6, 0xa9, 0xe4, 0x87, - 0x8f, 0x97, 0x3e, 0x92, 0x54, 0x1d, 0xd6, 0xee, 0x59, 0xd4, 0xe0, 0x34, 0x71, 0xcb, 0x7c, 0x0a, - 0xe5, 0x67, 0x51, 0xde, 0xc4, 0xbc, 0xd9, 0xba, 0x60, 0x72, 0x75, 0xfe, 0x4a, 0xdd, 0x07, 0x14, - 0xcd, 0x9f, 0xa4, 0xe8, 0x77, 0x8f, 0x42, 0x7a, 0x8c, 0x3a, 0x50, 0x8e, 0xda, 0x23, 0x9e, 0x8c, - 0x45, 0x43, 0x4c, 0xcc, 0x43, 0x8e, 0x53, 0x0f, 0xa1, 0x99, 0x98, 0x76, 0xb0, 0x77, 0x55, 0xc6, - 0x4d, 0xa0, 0x95, 0x65, 0x15, 0x8d, 0xf9, 0x14, 0x6a, 0xf1, 0x90, 0xe3, 0x26, 0x36, 0x7a, 0x77, - 0x2f, 0x3b, 0xe5, 0xaa, 0x09, 0x7b, 0x55, 0x8c, 0x39, 0x9f, 0x8d, 0x5b, 0xec, 0xe2, 0xa1, 0x35, - 0xb6, 0x82, 0xf9, 0x5e, 0x53, 0x3d, 0x68, 0x65, 0xc5, 0xc2, 0x9c, 0x77, 0x61, 0x0d, 0x7b, 0xa3, - 0x23, 0x6b, 0x22, 0x46, 0x39, 0x36, 0x88, 0xc7, 0x3c, 0xae, 0xea, 0xf9, 0x8b, 0x05, 0x34, 0x9f, - 0xe8, 0x2c, 0xd7, 0x59, 0x34, 0xbf, 0xe8, 0xfe, 0x29, 0xc1, 0xea, 0xfc, 0xf8, 0x60, 0x1c, 0x9a, - 0x16, 0x45, 0x8f, 0xa0, 0x96, 0x6c, 0x0e, 0xf4, 0x66, 0x41, 0x50, 0x17, 0x97, 0x92, 0xf2, 0xd6, - 0xd9, 0x20, 0xe1, 0xc8, 0x23, 0x28, 0xb3, 0x35, 0x83, 0x6e, 0x17, 0xc0, 0xf3, 0x6b, 0x49, 0xd9, - 0x3c, 0x0f, 0xc6, 0x79, 0xbb, 0xff, 0x2d, 0x73, 0x27, 0x78, 0x04, 0x84, 0x13, 0x8f, 0xa1, 0x1a, - 0x2f, 0x2f, 0xa4, 0x16, 0x10, 0x2d, 0x6c, 0x36, 0xa5, 0xc8, 0xa6, 0x7c, 0xe9, 0xbe, 0x2f, 0xa1, - 0xef, 0xa1, 0x9e, 0xda, 0x47, 0x85, 0xbe, 0xe4, 0xb7, 0x58, 0xa1, 0x2f, 0x45, 0x6b, 0x6d, 0x08, - 0x2b, 0x99, 0x6d, 0x81, 0xb6, 0x8a, 0x1f, 0xe6, 0x96, 0x9b, 0xb2, 0x7d, 0x3e, 0x50, 0xe8, 0x78, - 0x02, 0x30, 0x6f, 0x74, 0x54, 0x94, 0xbb, 0xdc, 0x1c, 0xb8, 0x78, 0x78, 0x06, 0xd0, 0x48, 0x37, - 0x15, 0xda, 0x3c, 0x8b, 0x7e, 0xde, 0xcb, 0xca, 0xd6, 0xb9, 0x38, 0x91, 0xed, 0x29, 0xdc, 0xbc, - 0xbb, 0x58, 0xc7, 0x22, 0xe7, 0x3f, 0x88, 0xff, 0xa5, 0xd4, 0xfd, 0x15, 0xd6, 0x6f, 0x77, 0x96, - 0xd1, 0x9c, 0xa9, 0xb6, 0xa7, 0xec, 0x57, 0x49, 0xdc, 0x5e, 0x7d, 0xd1, 0x75, 0x7f, 0x91, 0x40, - 0xce, 0xfe, 0x6b, 0xa6, 0x94, 0x1f, 0x31, 0xe5, 0xe9, 0x6b, 0xf4, 0x76, 0xb1, 0xf2, 0x82, 0xdf, - 0x69, 0xe5, 0x9d, 0x8b, 0x40, 0x45, 0x04, 0x42, 0x40, 0x5c, 0x67, 0x7a, 0x50, 0x45, 0x29, 0xcf, - 0x9c, 0x0b, 0xfb, 0x36, 0x3f, 0xf0, 0x0a, 0x53, 0x5e, 0x34, 0x01, 0x7b, 0xf2, 0xf3, 0xd3, 0x75, - 0xe9, 0xaf, 0xd3, 0x75, 0xe9, 0xdf, 0xd3, 0x75, 0xe9, 0x3b, 0x10, 0xf0, 0xc1, 0x64, 0x67, 0x78, - 0x8d, 0x6d, 0xcd, 0x0f, 0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x16, 0x1d, 0x1e, 0xa6, 0xb5, 0x0c, - 0x00, 0x00, + // 1117 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4f, 0x6f, 0xdc, 0xc4, + 0x1b, 0xfe, 0x39, 0xd9, 0x6d, 0x76, 0xdf, 0xdd, 0xb4, 0xc9, 0xec, 0xf6, 0x57, 0xd7, 0xd0, 0x24, + 0x18, 0x9a, 0x04, 0x04, 0xde, 0x66, 0x39, 0x80, 0xa0, 0x08, 0x9a, 0x3f, 0x8d, 0x02, 0x14, 0x8a, + 0x13, 0xb5, 0x12, 0x85, 0xac, 0x66, 0xe3, 0xc1, 0x19, 0xb2, 0x1e, 0x6f, 0xed, 0xf1, 0x2a, 0x11, + 0xe2, 0xc6, 0x07, 0xe0, 0xc8, 0x89, 0x13, 0x12, 0xdf, 0x83, 0x53, 0x8f, 0x9c, 0x39, 0x04, 0x94, + 0x2b, 0x5f, 0x02, 0x79, 0x66, 0xec, 0xd8, 0x6b, 0x2b, 0x49, 0xa3, 0xdc, 0x3c, 0xef, 0x3c, 0xf3, + 0xbc, 0xff, 0x66, 0x9e, 0xd7, 0x30, 0x1d, 0x72, 0x3f, 0xc0, 0x2e, 0xb1, 0x86, 0x81, 0xcf, 0x7d, + 0x34, 0xfb, 0x3d, 0x26, 0x2e, 0x09, 0xac, 0xc4, 0x3a, 0x5a, 0x31, 0xda, 0xae, 0xef, 0xfa, 0x62, + 0xb7, 0x13, 0x7f, 0x49, 0xa0, 0x31, 0xef, 0xfa, 0xbe, 0x3b, 0x20, 0x1d, 0xb1, 0xea, 0x47, 0xdf, + 0x75, 0x38, 0xf5, 0x48, 0xc8, 0xb1, 0x37, 0x54, 0x80, 0xb9, 0x71, 0x80, 0x13, 0x05, 0x98, 0x53, + 0x9f, 0xa9, 0xfd, 0x86, 0xe7, 0x3b, 0x64, 0x20, 0x17, 0xe6, 0xaf, 0x1a, 0xfc, 0x7f, 0x93, 0xf0, + 0x75, 0x32, 0x24, 0xcc, 0x21, 0x6c, 0x8f, 0x92, 0xd0, 0x26, 0xcf, 0x23, 0x12, 0x72, 0xb4, 0x06, + 0x10, 0x72, 0x1c, 0xf0, 0x5e, 0xec, 0x40, 0xd7, 0x16, 0xb4, 0xe5, 0x46, 0xd7, 0xb0, 0x24, 0xb9, + 0x95, 0x90, 0x5b, 0x3b, 0x89, 0xf7, 0xd5, 0xda, 0x8b, 0xe3, 0xf9, 0xff, 0xfd, 0xfc, 0xf7, 0xbc, + 0x66, 0xd7, 0xc5, 0xb9, 0x78, 0x07, 0x7d, 0x0c, 0x35, 0xc2, 0x1c, 0x49, 0x31, 0xf1, 0x12, 0x14, + 0x53, 0x84, 0x39, 0xb1, 0xdd, 0xec, 0xc3, 0xad, 0x42, 0x7c, 0xe1, 0xd0, 0x67, 0x21, 0x41, 0x9b, + 0xd0, 0x74, 0x32, 0x76, 0x5d, 0x5b, 0x98, 0x5c, 0x6e, 0x74, 0xef, 0x58, 0xaa, 0x92, 0x78, 0x48, + 0x7b, 0xa3, 0xae, 0x95, 0x1e, 0x3d, 0xfa, 0x9c, 0xb2, 0x83, 0xd5, 0x4a, 0xec, 0xc2, 0xce, 0x1d, + 0x34, 0x3f, 0x84, 0x99, 0xa7, 0x01, 0xe5, 0x64, 0x7b, 0x88, 0x59, 0x92, 0xfd, 0x12, 0x54, 0xc2, + 0x21, 0x66, 0x2a, 0xef, 0xd6, 0x18, 0xa9, 0x40, 0x0a, 0x80, 0xd9, 0x82, 0xd9, 0xcc, 0x61, 0x19, + 0x9a, 0xd9, 0x06, 0xb4, 0x36, 0xf0, 0x43, 0x22, 0x76, 0x02, 0xc5, 0x69, 0xde, 0x84, 0x56, 0xce, + 0xaa, 0xc0, 0x0c, 0x6e, 0x6c, 0x12, 0xbe, 0x13, 0xe0, 0x3d, 0x92, 0x78, 0x7f, 0x06, 0x35, 0x1e, + 0xaf, 0x7b, 0xd4, 0x11, 0x11, 0x34, 0x57, 0x3f, 0x89, 0xe3, 0xfe, 0xeb, 0x78, 0xfe, 0x1d, 0x97, + 0xf2, 0xfd, 0xa8, 0x6f, 0xed, 0xf9, 0x5e, 0x47, 0xc6, 0x14, 0x03, 0x29, 0x73, 0xd5, 0xaa, 0x23, + 0xbb, 0x2b, 0xd8, 0xb6, 0xd6, 0x4f, 0x8e, 0xe7, 0xa7, 0xd4, 0xa7, 0x3d, 0x25, 0x18, 0xb7, 0x9c, + 0x38, 0xb8, 0x4d, 0xc2, 0xb7, 0x49, 0x30, 0xa2, 0x7b, 0x69, 0xbb, 0xcd, 0x15, 0x68, 0xe5, 0xac, + 0xaa, 0xc8, 0x06, 0xd4, 0x42, 0x65, 0x13, 0x05, 0xae, 0xdb, 0xe9, 0xda, 0x7c, 0x04, 0xed, 0x4d, + 0xc2, 0xbf, 0x1c, 0x12, 0x79, 0xbf, 0xd2, 0x9b, 0xa3, 0xc3, 0x94, 0xc2, 0x88, 0xe0, 0xeb, 0x76, + 0xb2, 0x44, 0xaf, 0x40, 0x3d, 0x2e, 0x5a, 0xef, 0x80, 0x32, 0x47, 0xdc, 0x87, 0x98, 0x6e, 0x88, + 0xd9, 0x67, 0x94, 0x39, 0xe6, 0x7d, 0xa8, 0xa7, 0x5c, 0x08, 0x41, 0x85, 0x61, 0x2f, 0x21, 0x10, + 0xdf, 0x67, 0x9f, 0xfe, 0x11, 0x6e, 0x8e, 0x05, 0xa3, 0x32, 0x58, 0x84, 0xeb, 0x7e, 0x62, 0xfd, + 0x02, 0x7b, 0x69, 0x1e, 0x63, 0x56, 0x74, 0x1f, 0x20, 0xb5, 0x84, 0xfa, 0x84, 0xb8, 0x4c, 0xaf, + 0x5a, 0x85, 0x67, 0x69, 0xa5, 0x2e, 0xec, 0x0c, 0xde, 0xfc, 0xbd, 0x02, 0x6d, 0x51, 0xe9, 0xaf, + 0x22, 0x12, 0x1c, 0x3d, 0xc6, 0x01, 0xf6, 0x08, 0x27, 0x41, 0x88, 0x5e, 0x83, 0xa6, 0xca, 0xbe, + 0x97, 0x49, 0xa8, 0xa1, 0x6c, 0xb1, 0x6b, 0x74, 0x37, 0x13, 0xa1, 0x04, 0xc9, 0xe4, 0xa6, 0x73, + 0x11, 0xa2, 0x0d, 0xa8, 0x70, 0xec, 0x86, 0xfa, 0xa4, 0x08, 0x6d, 0xa5, 0x24, 0xb4, 0xb2, 0x00, + 0xac, 0x1d, 0xec, 0x86, 0x1b, 0x8c, 0x07, 0x47, 0xb6, 0x38, 0x8e, 0x3e, 0x85, 0xeb, 0xa7, 0xef, + 0xba, 0xe7, 0x51, 0xa6, 0x57, 0x5e, 0xe2, 0x61, 0x36, 0xd3, 0xb7, 0xfd, 0x88, 0xb2, 0x71, 0x2e, + 0x7c, 0xa8, 0x57, 0x2f, 0xc7, 0x85, 0x0f, 0xd1, 0x43, 0x68, 0x26, 0x4a, 0x25, 0xa2, 0xba, 0x26, + 0x98, 0x6e, 0x17, 0x98, 0xd6, 0x15, 0x48, 0x12, 0xfd, 0x12, 0x13, 0x35, 0x92, 0x83, 0x71, 0x4c, + 0x39, 0x1e, 0x7c, 0xa8, 0x4f, 0x5d, 0x86, 0x07, 0x1f, 0xa2, 0x3b, 0x00, 0x2c, 0xf2, 0x7a, 0xe2, + 0xd5, 0x84, 0x7a, 0x6d, 0x41, 0x5b, 0xae, 0xda, 0x75, 0x16, 0x79, 0xa2, 0xc8, 0xa1, 0xf1, 0x1e, + 0xd4, 0xd3, 0xca, 0xa2, 0x19, 0x98, 0x3c, 0x20, 0x47, 0xaa, 0xb7, 0xf1, 0x27, 0x6a, 0x43, 0x75, + 0x84, 0x07, 0x51, 0xd2, 0x4a, 0xb9, 0xf8, 0x60, 0xe2, 0x7d, 0xcd, 0xb4, 0x61, 0xf6, 0x21, 0x65, + 0x8e, 0xa4, 0x49, 0x9e, 0xcc, 0x47, 0x50, 0x7d, 0x1e, 0xf7, 0x4d, 0xe9, 0xcd, 0xd2, 0x05, 0x9b, + 0x6b, 0xcb, 0x53, 0xe6, 0x06, 0xa0, 0x58, 0x7f, 0xd2, 0x4b, 0xbf, 0xb6, 0x1f, 0xb1, 0x03, 0xd4, + 0x81, 0x6a, 0xfc, 0x3c, 0x12, 0x65, 0x2c, 0x13, 0x31, 0xa5, 0x87, 0x12, 0x67, 0xee, 0x40, 0x2b, + 0x0d, 0x6d, 0x6b, 0xfd, 0xaa, 0x82, 0x1b, 0x41, 0x3b, 0xcf, 0xaa, 0x1e, 0xe6, 0x2e, 0xd4, 0x13, + 0x91, 0x93, 0x21, 0x36, 0x57, 0x1f, 0x5c, 0x56, 0xe5, 0x6a, 0x29, 0x7b, 0x4d, 0xc9, 0x5c, 0x28, + 0xe4, 0x16, 0x0f, 0x71, 0x9f, 0x0e, 0x28, 0x3f, 0x9d, 0x6b, 0xe6, 0x6f, 0x1a, 0xb4, 0xf3, 0x76, + 0x15, 0xcf, 0xdb, 0x30, 0x8b, 0x83, 0xbd, 0x7d, 0x3a, 0x52, 0x5a, 0x8e, 0x1d, 0x12, 0x88, 0x94, + 0x6b, 0x76, 0x71, 0x63, 0x0c, 0x2d, 0x25, 0x5d, 0x34, 0x3b, 0x8f, 0x96, 0x1b, 0xe8, 0x1e, 0xb4, + 0x42, 0x1e, 0x10, 0xec, 0x51, 0xe6, 0x66, 0xf0, 0x93, 0x02, 0x5f, 0xb6, 0xd5, 0xfd, 0x43, 0x83, + 0x99, 0xd3, 0xe5, 0xe3, 0x41, 0xe4, 0x52, 0x86, 0x9e, 0x40, 0x3d, 0x1d, 0x36, 0xe8, 0xf5, 0x92, + 0x3e, 0x8c, 0xcf, 0x31, 0xe3, 0x8d, 0xb3, 0x41, 0x2a, 0xf5, 0x27, 0x50, 0x15, 0x93, 0x09, 0xdd, + 0x2d, 0x81, 0x17, 0x27, 0x99, 0xb1, 0x78, 0x1e, 0x4c, 0xf2, 0x76, 0x7f, 0x80, 0xdb, 0xdb, 0xc5, + 0xdc, 0x54, 0x32, 0xbb, 0x70, 0x23, 0x8d, 0x44, 0xa2, 0xae, 0x30, 0xa5, 0x65, 0xad, 0xfb, 0xef, + 0xa4, 0xac, 0xa0, 0x6c, 0x98, 0x72, 0xfa, 0x14, 0x6a, 0xc9, 0xb0, 0x45, 0x66, 0x09, 0xd1, 0xd8, + 0x24, 0x36, 0xca, 0x0a, 0x52, 0x7c, 0x6a, 0xf7, 0x34, 0xf4, 0x0d, 0x34, 0x32, 0xf3, 0xb3, 0xb4, + 0x90, 0xc5, 0xa9, 0x5b, 0x5a, 0xc8, 0xb2, 0x31, 0xdc, 0x87, 0xe9, 0xdc, 0x74, 0x43, 0x4b, 0xe5, + 0x07, 0x0b, 0xc3, 0xd8, 0x58, 0x3e, 0x1f, 0xa8, 0x7c, 0x3c, 0x03, 0x38, 0x15, 0x26, 0x54, 0x56, + 0xe5, 0x82, 0x6e, 0x5d, 0xbc, 0x3c, 0x3d, 0x68, 0x66, 0x45, 0x00, 0x2d, 0x9e, 0x45, 0x7f, 0xaa, + 0x3d, 0xc6, 0xd2, 0xb9, 0x38, 0x75, 0xd5, 0x0e, 0xe1, 0xd6, 0x83, 0xf1, 0x67, 0xa7, 0x7a, 0xfe, + 0xad, 0xfa, 0xbf, 0xcb, 0xec, 0x5f, 0xe1, 0x4d, 0xeb, 0x1e, 0xe5, 0x3c, 0xe7, 0x6e, 0xdb, 0xae, + 0xf8, 0xb5, 0x53, 0xbb, 0x57, 0x7f, 0xe9, 0xba, 0x3f, 0x69, 0xa0, 0xe7, 0xff, 0x8d, 0x33, 0xce, + 0xf7, 0x85, 0xf3, 0xec, 0x36, 0x7a, 0xb3, 0xdc, 0x79, 0xc9, 0xef, 0xbf, 0xf1, 0xd6, 0x45, 0xa0, + 0xaa, 0x02, 0x11, 0x20, 0xe9, 0x33, 0xab, 0xab, 0x71, 0xcb, 0x73, 0xeb, 0x52, 0xd1, 0x28, 0x0a, + 0x74, 0x69, 0xcb, 0xcb, 0x04, 0x7b, 0x55, 0x7f, 0x71, 0x32, 0xa7, 0xfd, 0x79, 0x32, 0xa7, 0xfd, + 0x73, 0x32, 0xa7, 0x7d, 0x0d, 0x0a, 0xde, 0x1b, 0xad, 0xf4, 0xaf, 0x89, 0x29, 0xff, 0xee, 0x7f, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x6c, 0xa3, 0xc0, 0x72, 0x65, 0x0d, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1187,6 +1197,112 @@ var _SpanWriterPlugin_serviceDesc = grpc.ServiceDesc{ Metadata: "storage.proto", } +// StreamingSpanWriterPluginClient is the client API for StreamingSpanWriterPlugin service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StreamingSpanWriterPluginClient interface { + WriteSpanStream(ctx context.Context, opts ...grpc.CallOption) (StreamingSpanWriterPlugin_WriteSpanStreamClient, error) +} + +type streamingSpanWriterPluginClient struct { + cc *grpc.ClientConn +} + +func NewStreamingSpanWriterPluginClient(cc *grpc.ClientConn) StreamingSpanWriterPluginClient { + return &streamingSpanWriterPluginClient{cc} +} + +func (c *streamingSpanWriterPluginClient) WriteSpanStream(ctx context.Context, opts ...grpc.CallOption) (StreamingSpanWriterPlugin_WriteSpanStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_StreamingSpanWriterPlugin_serviceDesc.Streams[0], "/jaeger.storage.v1.StreamingSpanWriterPlugin/WriteSpanStream", opts...) + if err != nil { + return nil, err + } + x := &streamingSpanWriterPluginWriteSpanStreamClient{stream} + return x, nil +} + +type StreamingSpanWriterPlugin_WriteSpanStreamClient interface { + Send(*WriteSpanRequest) error + CloseAndRecv() (*WriteSpanResponse, error) + grpc.ClientStream +} + +type streamingSpanWriterPluginWriteSpanStreamClient struct { + grpc.ClientStream +} + +func (x *streamingSpanWriterPluginWriteSpanStreamClient) Send(m *WriteSpanRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *streamingSpanWriterPluginWriteSpanStreamClient) CloseAndRecv() (*WriteSpanResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(WriteSpanResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StreamingSpanWriterPluginServer is the server API for StreamingSpanWriterPlugin service. +type StreamingSpanWriterPluginServer interface { + WriteSpanStream(StreamingSpanWriterPlugin_WriteSpanStreamServer) error +} + +// UnimplementedStreamingSpanWriterPluginServer can be embedded to have forward compatible implementations. +type UnimplementedStreamingSpanWriterPluginServer struct { +} + +func (*UnimplementedStreamingSpanWriterPluginServer) WriteSpanStream(srv StreamingSpanWriterPlugin_WriteSpanStreamServer) error { + return status.Errorf(codes.Unimplemented, "method WriteSpanStream not implemented") +} + +func RegisterStreamingSpanWriterPluginServer(s *grpc.Server, srv StreamingSpanWriterPluginServer) { + s.RegisterService(&_StreamingSpanWriterPlugin_serviceDesc, srv) +} + +func _StreamingSpanWriterPlugin_WriteSpanStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(StreamingSpanWriterPluginServer).WriteSpanStream(&streamingSpanWriterPluginWriteSpanStreamServer{stream}) +} + +type StreamingSpanWriterPlugin_WriteSpanStreamServer interface { + SendAndClose(*WriteSpanResponse) error + Recv() (*WriteSpanRequest, error) + grpc.ServerStream +} + +type streamingSpanWriterPluginWriteSpanStreamServer struct { + grpc.ServerStream +} + +func (x *streamingSpanWriterPluginWriteSpanStreamServer) SendAndClose(m *WriteSpanResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *streamingSpanWriterPluginWriteSpanStreamServer) Recv() (*WriteSpanRequest, error) { + m := new(WriteSpanRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _StreamingSpanWriterPlugin_serviceDesc = grpc.ServiceDesc{ + ServiceName: "jaeger.storage.v1.StreamingSpanWriterPlugin", + HandlerType: (*StreamingSpanWriterPluginServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WriteSpanStream", + Handler: _StreamingSpanWriterPlugin_WriteSpanStream_Handler, + ClientStreams: true, + }, + }, + Metadata: "storage.proto", +} + // SpanReaderPluginClient is the client API for SpanReaderPlugin service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. @@ -2525,6 +2641,16 @@ func (m *CapabilitiesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.StreamingSpanWriter { + i-- + if m.StreamingSpanWriter { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x18 + } if m.ArchiveSpanWriter { i-- if m.ArchiveSpanWriter { @@ -2884,6 +3010,9 @@ func (m *CapabilitiesResponse) Size() (n int) { if m.ArchiveSpanWriter { n += 2 } + if m.StreamingSpanWriter { + n += 2 + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -4761,6 +4890,26 @@ func (m *CapabilitiesResponse) Unmarshal(dAtA []byte) error { } } m.ArchiveSpanWriter = bool(v != 0) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamingSpanWriter", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.StreamingSpanWriter = bool(v != 0) default: iNdEx = preIndex skippy, err := skipStorage(dAtA[iNdEx:])