Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support archive storage in the query-service #604

Merged
merged 2 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
istorage "github.com/jaegertracing/jaeger/storage"
)

func main() {
Expand Down Expand Up @@ -107,12 +108,16 @@ func main() {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

apiHandlerOptions := []app.HandlerOption{
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer),
}
apiHandlerOptions = append(apiHandlerOptions, archiveOptions(storageFactory, logger)...)
apiHandler := app.NewAPIHandler(
spanReader,
dependencyReader,
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer))
apiHandlerOptions...)
r := mux.NewRouter()
apiHandler.RegisterRoutes(r)
registerStaticHandler(r, logger, queryOpts)
Expand Down Expand Up @@ -174,3 +179,26 @@ func registerStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *app.QueryOp
logger.Info("Static handler is not registered")
}
}

func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) []app.HandlerOption {
reader, err := storageFactory.CreateSpanReader()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this fail silently when ErrArchiveStorageNotConfigured is false; meaning it's configured; and when ErrArchiveStorageNotSupported is true?
Shouldn't we return the ErrArchiveStorageNotSupported when someone mistakenly configures archive storage on a storage that doesn't support archiving?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not possible to configure archiving on a storage that doesn't support it. For example, with Cassandra you need to pass --cassandra-archive.enabled=true.

}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return nil
}
writer, err := storageFactory.CreateSpanWriter()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return nil
}
return []app.HandlerOption{
app.HandlerOptions.ArchiveSpanReader(reader),
app.HandlerOptions.ArchiveSpanWriter(writer),
}
}
39 changes: 36 additions & 3 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
primaryStorageConfig = "cassandra"
archiveStorageConfig = "cassandra-archive"
)

// Factory implements storage.Factory for Cassandra backend.
type Factory struct {
Options *Options
Expand All @@ -38,13 +44,14 @@ type Factory struct {

primaryConfig config.SessionBuilder
primarySession cassandra.Session
// archiveSession cassandra.Session TODO
archiveConfig config.SessionBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that the archiveConfig can be set to a higher default consistency level than the primaryConfig. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fully configurable by user. Given how marginal this feature is overall, I don't think introducing custom defaults per storage class is worth the complexity in the code.

archiveSession cassandra.Session
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: NewOptions("cassandra"), // TODO add "cassandra-archive" once supported
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}

Expand All @@ -57,6 +64,9 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
func (f *Factory) InitFromViper(v *viper.Viper) {
f.Options.InitFromViper(v)
f.primaryConfig = f.Options.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
}

// Initialize implements storage.Factory
Expand All @@ -68,7 +78,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return err
}
f.primarySession = primarySession
// TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604)

if f.archiveConfig != nil {
if archiveSession, err := f.archiveConfig.NewSession(); err == nil {
f.archiveSession = archiveSession
} else {
return err
}
}
return nil
}

Expand All @@ -86,3 +103,19 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, f.metricsFactory, f.logger), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.metricsFactory, f.logger), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused, is f.Options.SpanStoreWriteCacheTTL shared between both the archive and primary span writers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the moment, yes. The retention period of archive storage is expected to be much longer that for primary, so the short cache TTL for the primary is fine for archive too. Strictly speaking we don't even need this cache for archive, but whatever.

}
23 changes: 22 additions & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ppppphhhhh

"github.com/jaegertracing/jaeger/storage"
)

var _ storage.Factory = new(Factory)
var _ storage.ArchiveFactory = new(Factory)

type mockSessionBuilder struct {
err error
Expand All @@ -44,7 +46,7 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) {
func TestCassandraFactory(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{})
command.ParseFlags([]string{"--cassandra-archive.enabled=true"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
Expand All @@ -53,6 +55,10 @@ func TestCassandraFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.primaryConfig = &mockSessionBuilder{}
f.archiveConfig = &mockSessionBuilder{err: errors.New("made-up error")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err := f.CreateSpanReader()
Expand All @@ -63,4 +69,19 @@ func TestCassandraFactory(t *testing.T) {

_, err = f.CreateDependencyReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanReader()
assert.EqualError(t, err, "Archive storage not configured")

_, err = f.CreateArchiveSpanWriter()
assert.EqualError(t, err, "Archive storage not configured")

f.archiveConfig = &mockSessionBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}
19 changes: 18 additions & 1 deletion plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
Expand Down Expand Up @@ -65,6 +66,8 @@ type namespaceConfig struct {
config.Configuration
servers string
namespace string
primary bool
Enabled bool
}

// NewOptions creates a new Options struct.
Expand All @@ -78,12 +81,14 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
EnableHostVerification: true,
},
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_local",
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
},
servers: "127.0.0.1",
namespace: primaryNamespace,
primary: true,
Enabled: true,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
SpanStoreWriteCacheTTL: time.Hour * 12,
Expand Down Expand Up @@ -112,6 +117,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
if !nsConfig.primary {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
false,
"Enable extra storage")
}
flagSet.Int(
nsConfig.namespace+suffixConnPerHost,
nsConfig.ConnectionsPerHost,
Expand Down Expand Up @@ -189,6 +200,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
if !cfg.primary {
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
Expand Down Expand Up @@ -220,6 +234,9 @@ func (opt *Options) Get(namespace string) *config.Configuration {
nsCfg = &namespaceConfig{}
opt.others[namespace] = nsCfg
}
if !nsCfg.Enabled {
return nil
}
nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration)
if nsCfg.servers == "" {
nsCfg.servers = opt.primary.servers
Expand Down
19 changes: 14 additions & 5 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
)
Expand All @@ -31,13 +32,19 @@ func TestOptions(t *testing.T) {
assert.Equal(t, 2, primary.ConnectionsPerHost)

aux := opts.Get("archive")
assert.Nil(t, aux)

assert.NotNil(t, opts.others["archive"])
opts.others["archive"].Enabled = true
aux = opts.Get("archive")
require.NotNil(t, aux)
assert.Equal(t, primary.Keyspace, aux.Keyspace)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost)
}

func TestOptionsWithFlags(t *testing.T) {
opts := NewOptions("cas", "cas.aux")
opts := NewOptions("cas", "cas-aux")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--cas.keyspace=jaeger",
Expand All @@ -48,17 +55,19 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.port=4242",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
// a couple overrides
"--cas.aux.keyspace=jaeger-archive",
"--cas.aux.servers=3.3.3.3,4.4.4.4",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
"--cas-aux.servers=3.3.3.3,4.4.4.4",
})
opts.InitFromViper(v)

primary := opts.GetPrimary()
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)

aux := opts.Get("cas.aux")
aux := opts.Get("cas-aux")
require.NotNil(t, aux)
assert.Equal(t, "jaeger-archive", aux.Keyspace)
assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers)
assert.Equal(t, 42, aux.ConnectionsPerHost)
Expand Down
26 changes: 26 additions & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,29 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}
}
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanStorageType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that the span store and archive span store both have to be using the same storage type? I think that's a fair assumption

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be changed in the future by introducing another env var.

if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanReader()
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
factory, ok := f.factories[f.SpanStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanWriter()
}
Loading