Permalink
Browse files

Support archive storage in the query-service (#604)

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information...
yurishkuro committed Jan 8, 2018
1 parent 8db7a11 commit 7f1b9a2e358a78fedc9be6e83c3b9b4c382f7af4
View
@@ -39,6 +39,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
istorage "github.com/jaegertracing/jaeger/storage"
)
func main() {
@@ -107,12 +108,16 @@ func main() {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}
apiHandlerOptions := []app.HandlerOption{
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer),
}
apiHandlerOptions = append(apiHandlerOptions, archiveOptions(storageFactory, logger)...)
apiHandler := app.NewAPIHandler(
spanReader,
dependencyReader,
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer))
apiHandlerOptions...)
r := mux.NewRouter()
apiHandler.RegisterRoutes(r)
registerStaticHandler(r, logger, queryOpts)
@@ -174,3 +179,26 @@ func registerStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *app.QueryOp
logger.Info("Static handler is not registered")
}
}
func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) []app.HandlerOption {
reader, err := storageFactory.CreateSpanReader()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return nil
}
writer, err := storageFactory.CreateSpanWriter()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return nil
}
return []app.HandlerOption{
app.HandlerOptions.ArchiveSpanReader(reader),
app.HandlerOptions.ArchiveSpanWriter(writer),
}
}
@@ -25,10 +25,16 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
const (
primaryStorageConfig = "cassandra"
archiveStorageConfig = "cassandra-archive"
)
// Factory implements storage.Factory for Cassandra backend.
type Factory struct {
Options *Options
@@ -38,13 +44,14 @@ type Factory struct {
primaryConfig config.SessionBuilder
primarySession cassandra.Session
// archiveSession cassandra.Session TODO
archiveConfig config.SessionBuilder
archiveSession cassandra.Session
}
// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: NewOptions("cassandra"), // TODO add "cassandra-archive" once supported
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}
@@ -57,6 +64,9 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
func (f *Factory) InitFromViper(v *viper.Viper) {
f.Options.InitFromViper(v)
f.primaryConfig = f.Options.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
}
// Initialize implements storage.Factory
@@ -68,7 +78,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return err
}
f.primarySession = primarySession
// TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604)
if f.archiveConfig != nil {
if archiveSession, err := f.archiveConfig.NewSession(); err == nil {
f.archiveSession = archiveSession
} else {
return err
}
}
return nil
}
@@ -86,3 +103,19 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, f.metricsFactory, f.logger), nil
}
// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.metricsFactory, f.logger), nil
}
// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger), nil
}
@@ -25,10 +25,12 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/storage"
)
var _ storage.Factory = new(Factory)
var _ storage.ArchiveFactory = new(Factory)
type mockSessionBuilder struct {
err error
@@ -44,7 +46,7 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) {
func TestCassandraFactory(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{})
command.ParseFlags([]string{"--cassandra-archive.enabled=true"})
f.InitFromViper(v)
// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
@@ -53,6 +55,10 @@ func TestCassandraFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")
f.primaryConfig = &mockSessionBuilder{}
f.archiveConfig = &mockSessionBuilder{err: errors.New("made-up error")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")
f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
_, err := f.CreateSpanReader()
@@ -63,4 +69,19 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateDependencyReader()
assert.NoError(t, err)
_, err = f.CreateArchiveSpanReader()
assert.EqualError(t, err, "Archive storage not configured")
_, err = f.CreateArchiveSpanWriter()
assert.EqualError(t, err, "Archive storage not configured")
f.archiveConfig = &mockSessionBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
_, err = f.CreateArchiveSpanReader()
assert.NoError(t, err)
_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}
@@ -26,6 +26,7 @@ import (
const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
@@ -65,6 +66,8 @@ type namespaceConfig struct {
config.Configuration
servers string
namespace string
primary bool
Enabled bool
}
// NewOptions creates a new Options struct.
@@ -78,12 +81,14 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
EnableHostVerification: true,
},
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_local",
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
},
servers: "127.0.0.1",
namespace: primaryNamespace,
primary: true,
Enabled: true,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
SpanStoreWriteCacheTTL: time.Hour * 12,
@@ -112,6 +117,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
}
func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
if !nsConfig.primary {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
false,
"Enable extra storage")
}
flagSet.Int(
nsConfig.namespace+suffixConnPerHost,
nsConfig.ConnectionsPerHost,
@@ -189,6 +200,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
}
func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
if !cfg.primary {
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
@@ -220,6 +234,9 @@ func (opt *Options) Get(namespace string) *config.Configuration {
nsCfg = &namespaceConfig{}
opt.others[namespace] = nsCfg
}
if !nsCfg.Enabled {
return nil
}
nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration)
if nsCfg.servers == "" {
nsCfg.servers = opt.primary.servers
@@ -19,6 +19,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/jaegertracing/jaeger/pkg/config"
)
@@ -31,13 +32,19 @@ func TestOptions(t *testing.T) {
assert.Equal(t, 2, primary.ConnectionsPerHost)
aux := opts.Get("archive")
assert.Nil(t, aux)
assert.NotNil(t, opts.others["archive"])
opts.others["archive"].Enabled = true
aux = opts.Get("archive")
require.NotNil(t, aux)
assert.Equal(t, primary.Keyspace, aux.Keyspace)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost)
}
func TestOptionsWithFlags(t *testing.T) {
opts := NewOptions("cas", "cas.aux")
opts := NewOptions("cas", "cas-aux")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--cas.keyspace=jaeger",
@@ -48,17 +55,19 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.port=4242",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
// a couple overrides
"--cas.aux.keyspace=jaeger-archive",
"--cas.aux.servers=3.3.3.3,4.4.4.4",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
"--cas-aux.servers=3.3.3.3,4.4.4.4",
})
opts.InitFromViper(v)
primary := opts.GetPrimary()
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
aux := opts.Get("cas.aux")
aux := opts.Get("cas-aux")
require.NotNil(t, aux)
assert.Equal(t, "jaeger-archive", aux.Keyspace)
assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers)
assert.Equal(t, 42, aux.ConnectionsPerHost)
View
@@ -131,3 +131,29 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}
}
}
// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanReader()
}
// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
factory, ok := f.factories[f.SpanStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanWriter()
}
Oops, something went wrong.

0 comments on commit 7f1b9a2

Please sign in to comment.