diff --git a/cmd/query/main.go b/cmd/query/main.go index b6d7c8a6a5e..b74090de784 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -39,6 +39,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/plugin/storage" + istorage "github.com/jaegertracing/jaeger/storage" ) func main() { @@ -107,12 +108,16 @@ func main() { logger.Fatal("Failed to create dependency reader", zap.Error(err)) } + apiHandlerOptions := []app.HandlerOption{ + app.HandlerOptions.Prefix(queryOpts.Prefix), + app.HandlerOptions.Logger(logger), + app.HandlerOptions.Tracer(tracer), + } + apiHandlerOptions = append(apiHandlerOptions, archiveOptions(storageFactory, logger)...) apiHandler := app.NewAPIHandler( spanReader, dependencyReader, - app.HandlerOptions.Prefix(queryOpts.Prefix), - app.HandlerOptions.Logger(logger), - app.HandlerOptions.Tracer(tracer)) + apiHandlerOptions...) r := mux.NewRouter() apiHandler.RegisterRoutes(r) registerStaticHandler(r, logger, queryOpts) @@ -174,3 +179,26 @@ func registerStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *app.QueryOp logger.Info("Static handler is not registered") } } + +func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) []app.HandlerOption { + reader, err := storageFactory.CreateSpanReader() + if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported { + return nil + } + if err != nil { + logger.Error("Cannot init archive storage reader", zap.Error(err)) + return nil + } + writer, err := storageFactory.CreateSpanWriter() + if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported { + return nil + } + if err != nil { + logger.Error("Cannot init archive storage writer", zap.Error(err)) + return nil + } + return []app.HandlerOption{ + app.HandlerOptions.ArchiveSpanReader(reader), + app.HandlerOptions.ArchiveSpanWriter(writer), + } +} diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 00e84e4fbdd..1f19c787a12 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -25,10 +25,16 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra/config" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" + "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" ) +const ( + primaryStorageConfig = "cassandra" + archiveStorageConfig = "cassandra-archive" +) + // Factory implements storage.Factory for Cassandra backend. type Factory struct { Options *Options @@ -38,13 +44,14 @@ type Factory struct { primaryConfig config.SessionBuilder primarySession cassandra.Session - // archiveSession cassandra.Session TODO + archiveConfig config.SessionBuilder + archiveSession cassandra.Session } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - Options: NewOptions("cassandra"), // TODO add "cassandra-archive" once supported + Options: NewOptions(primaryStorageConfig, archiveStorageConfig), } } @@ -57,6 +64,9 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { func (f *Factory) InitFromViper(v *viper.Viper) { f.Options.InitFromViper(v) f.primaryConfig = f.Options.GetPrimary() + if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { + f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error + } } // Initialize implements storage.Factory @@ -68,7 +78,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return err } f.primarySession = primarySession - // TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604) + + if f.archiveConfig != nil { + if archiveSession, err := f.archiveConfig.NewSession(); err == nil { + f.archiveSession = archiveSession + } else { + return err + } + } return nil } @@ -86,3 +103,19 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, f.metricsFactory, f.logger), nil } + +// CreateArchiveSpanReader implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { + if f.archiveSession == nil { + return nil, storage.ErrArchiveStorageNotConfigured + } + return cSpanStore.NewSpanReader(f.archiveSession, f.metricsFactory, f.logger), nil +} + +// CreateArchiveSpanWriter implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { + if f.archiveSession == nil { + return nil, storage.ErrArchiveStorageNotConfigured + } + return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger), nil +} diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 8b39f6a546a..1fc0c43adba 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -25,10 +25,12 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/storage" ) var _ storage.Factory = new(Factory) +var _ storage.ArchiveFactory = new(Factory) type mockSessionBuilder struct { err error @@ -44,7 +46,7 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { func TestCassandraFactory(t *testing.T) { f := NewFactory() v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{}) + command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) f.InitFromViper(v) // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, @@ -53,6 +55,10 @@ func TestCassandraFactory(t *testing.T) { assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") f.primaryConfig = &mockSessionBuilder{} + f.archiveConfig = &mockSessionBuilder{err: errors.New("made-up error")} + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + + f.archiveConfig = nil assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err := f.CreateSpanReader() @@ -63,4 +69,19 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateDependencyReader() assert.NoError(t, err) + + _, err = f.CreateArchiveSpanReader() + assert.EqualError(t, err, "Archive storage not configured") + + _, err = f.CreateArchiveSpanWriter() + assert.EqualError(t, err, "Archive storage not configured") + + f.archiveConfig = &mockSessionBuilder{} + assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + + _, err = f.CreateArchiveSpanReader() + assert.NoError(t, err) + + _, err = f.CreateArchiveSpanWriter() + assert.NoError(t, err) } diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index d6a84cffc1d..a80faaef976 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -26,6 +26,7 @@ import ( const ( // session settings + suffixEnabled = ".enabled" suffixConnPerHost = ".connections-per-host" suffixMaxRetryAttempts = ".max-retry-attempts" suffixTimeout = ".timeout" @@ -65,6 +66,8 @@ type namespaceConfig struct { config.Configuration servers string namespace string + primary bool + Enabled bool } // NewOptions creates a new Options struct. @@ -78,12 +81,14 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { EnableHostVerification: true, }, MaxRetryAttempts: 3, - Keyspace: "jaeger_v1_local", + Keyspace: "jaeger_v1_test", ProtoVersion: 4, ConnectionsPerHost: 2, }, servers: "127.0.0.1", namespace: primaryNamespace, + primary: true, + Enabled: true, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), SpanStoreWriteCacheTTL: time.Hour * 12, @@ -112,6 +117,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { } func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { + if !nsConfig.primary { + flagSet.Bool( + nsConfig.namespace+suffixEnabled, + false, + "Enable extra storage") + } flagSet.Int( nsConfig.namespace+suffixConnPerHost, nsConfig.ConnectionsPerHost, @@ -189,6 +200,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) { } func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { + if !cfg.primary { + cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) + } cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) @@ -220,6 +234,9 @@ func (opt *Options) Get(namespace string) *config.Configuration { nsCfg = &namespaceConfig{} opt.others[namespace] = nsCfg } + if !nsCfg.Enabled { + return nil + } nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration) if nsCfg.servers == "" { nsCfg.servers = opt.primary.servers diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 54394896a57..8bc68010a64 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -31,13 +32,19 @@ func TestOptions(t *testing.T) { assert.Equal(t, 2, primary.ConnectionsPerHost) aux := opts.Get("archive") + assert.Nil(t, aux) + + assert.NotNil(t, opts.others["archive"]) + opts.others["archive"].Enabled = true + aux = opts.Get("archive") + require.NotNil(t, aux) assert.Equal(t, primary.Keyspace, aux.Keyspace) assert.Equal(t, primary.Servers, aux.Servers) assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost) } func TestOptionsWithFlags(t *testing.T) { - opts := NewOptions("cas", "cas.aux") + opts := NewOptions("cas", "cas-aux") v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ "--cas.keyspace=jaeger", @@ -48,9 +55,10 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.port=4242", "--cas.proto-version=3", "--cas.socket-keep-alive=42s", - // a couple overrides - "--cas.aux.keyspace=jaeger-archive", - "--cas.aux.servers=3.3.3.3,4.4.4.4", + // enable aux with a couple overrides + "--cas-aux.enabled=true", + "--cas-aux.keyspace=jaeger-archive", + "--cas-aux.servers=3.3.3.3,4.4.4.4", }) opts.InitFromViper(v) @@ -58,7 +66,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "jaeger", primary.Keyspace) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) - aux := opts.Get("cas.aux") + aux := opts.Get("cas-aux") + require.NotNil(t, aux) assert.Equal(t, "jaeger-archive", aux.Keyspace) assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) assert.Equal(t, 42, aux.ConnectionsPerHost) diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index c6c7659aa36..d3213595658 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -131,3 +131,29 @@ func (f *Factory) InitFromViper(v *viper.Viper) { } } } + +// CreateArchiveSpanReader implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { + factory, ok := f.factories[f.SpanStorageType] + if !ok { + return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType) + } + archive, ok := factory.(storage.ArchiveFactory) + if !ok { + return nil, storage.ErrArchiveStorageNotSupported + } + return archive.CreateArchiveSpanReader() +} + +// CreateArchiveSpanWriter implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { + factory, ok := f.factories[f.SpanStorageType] + if !ok { + return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType) + } + archive, ok := factory.(storage.ArchiveFactory) + if !ok { + return nil, storage.ErrArchiveStorageNotSupported + } + return archive.CreateArchiveSpanWriter() +} diff --git a/plugin/storage/factory_test.go b/plugin/storage/factory_test.go index 6884613a647..7034081a0ab 100644 --- a/plugin/storage/factory_test.go +++ b/plugin/storage/factory_test.go @@ -32,6 +32,7 @@ import ( ) var _ storage.Factory = new(Factory) +var _ storage.ArchiveFactory = new(Factory) func defaultCfg() FactoryConfig { return FactoryConfig{ @@ -113,6 +114,38 @@ func TestCreate(t *testing.T) { d, err := f.CreateDependencyReader() assert.Equal(t, depReader, d) assert.EqualError(t, err, "dep-reader-error") + + _, err = f.CreateArchiveSpanReader() + assert.EqualError(t, err, "Archive storage not supported") + + _, err = f.CreateArchiveSpanWriter() + assert.EqualError(t, err, "Archive storage not supported") +} + +func TestCreateArchive(t *testing.T) { + f, err := NewFactory(defaultCfg()) + require.NoError(t, err) + assert.NotEmpty(t, f.factories[cassandraStorageType]) + + mock := &struct { + mocks.Factory + mocks.ArchiveFactory + }{} + f.factories[cassandraStorageType] = mock + + archiveSpanReader := new(spanStoreMocks.Reader) + archiveSpanWriter := new(spanStoreMocks.Writer) + + mock.ArchiveFactory.On("CreateArchiveSpanReader").Return(archiveSpanReader, errors.New("archive-span-reader-error")) + mock.ArchiveFactory.On("CreateArchiveSpanWriter").Return(archiveSpanWriter, errors.New("archive-span-writer-error")) + + ar, err := f.CreateArchiveSpanReader() + assert.Equal(t, archiveSpanReader, ar) + assert.EqualError(t, err, "archive-span-reader-error") + + aw, err := f.CreateArchiveSpanWriter() + assert.Equal(t, archiveSpanWriter, aw) + assert.EqualError(t, err, "archive-span-writer-error") } func TestCreateError(t *testing.T) { @@ -123,7 +156,8 @@ func TestCreateError(t *testing.T) { delete(f.factories, cassandraStorageType) expectedErr := "No cassandra backend registered for span store" - { // scope the vars to avoid bugs in the test + // scope the vars to avoid bugs in the test + { r, err := f.CreateSpanReader() assert.Nil(t, r) assert.EqualError(t, err, expectedErr) @@ -140,6 +174,18 @@ func TestCreateError(t *testing.T) { assert.Nil(t, d) assert.EqualError(t, err, expectedErr) } + + { + r, err := f.CreateArchiveSpanReader() + assert.Nil(t, r) + assert.EqualError(t, err, expectedErr) + } + + { + w, err := f.CreateArchiveSpanWriter() + assert.Nil(t, w) + assert.EqualError(t, err, expectedErr) + } } type configurable struct { diff --git a/storage/factory.go b/storage/factory.go index 9228defbbec..836fa502a47 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -15,6 +15,8 @@ package storage import ( + "errors" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" @@ -42,3 +44,20 @@ type Factory interface { // CreateDependencyReader creates a dependencystore.Reader. CreateDependencyReader() (dependencystore.Reader, error) } + +var ( + // ErrArchiveStorageNotConfigured can be returned by the ArchiveFactory when the archive storage is not configured. + ErrArchiveStorageNotConfigured = errors.New("Archive storage not configured") + + // ErrArchiveStorageNotSupported can be returned by the ArchiveFactory when the archive storage is not supported by the backend. + ErrArchiveStorageNotSupported = errors.New("Archive storage not supported") +) + +// ArchiveFactory is an additional interface that can be implemented by a factory to support trace archiving. +type ArchiveFactory interface { + // CreateArchiveSpanReader creates a spanstore.Reader. + CreateArchiveSpanReader() (spanstore.Reader, error) + + // CreateArchiveSpanWriter creates a spanstore.Writer. + CreateArchiveSpanWriter() (spanstore.Writer, error) +} diff --git a/storage/mocks/ArchiveFactory.go b/storage/mocks/ArchiveFactory.go new file mode 100644 index 00000000000..d0139b3cc75 --- /dev/null +++ b/storage/mocks/ArchiveFactory.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import spanstore "github.com/jaegertracing/jaeger/storage/spanstore" +import storage "github.com/jaegertracing/jaeger/storage" + +// ArchiveFactory is an autogenerated mock type for the ArchiveFactory type +type ArchiveFactory struct { + mock.Mock +} + +// CreateArchiveSpanReader provides a mock function with given fields: +func (_m *ArchiveFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { + ret := _m.Called() + + var r0 spanstore.Reader + if rf, ok := ret.Get(0).(func() spanstore.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spanstore.Reader) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateArchiveSpanWriter provides a mock function with given fields: +func (_m *ArchiveFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) { + ret := _m.Called() + + var r0 spanstore.Writer + if rf, ok := ret.Get(0).(func() spanstore.Writer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spanstore.Writer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +var _ storage.ArchiveFactory = (*ArchiveFactory)(nil)