Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Elasticsearch archive storage optional #1334

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Expand Up @@ -55,6 +55,7 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}
Expand All @@ -80,6 +81,7 @@ type ClientBuilder interface {
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
IsEnabled() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -233,6 +235,11 @@ func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// IsEnabled determines whether storage is enabled
func (c *Configuration) IsEnabled() bool {
return c.Enabled
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
Expand Down
21 changes: 15 additions & 6 deletions plugin/storage/es/factory.go
Expand Up @@ -82,11 +82,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return errors.Wrap(err, "failed to create primary Elasticsearch client")
}
f.primaryClient = primaryClient
archiveClient, err := f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
if f.archiveConfig.IsEnabled() {
f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
}
}
f.archiveClient = archiveClient
return nil
}

Expand Down Expand Up @@ -125,12 +126,20 @@ func loadTagsFromFile(filePath string) ([]string, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

func createSpanReader(
Expand Down
36 changes: 31 additions & 5 deletions plugin/storage/es/factory_test.go
Expand Up @@ -60,7 +60,7 @@ func TestElasticsearchFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error")

f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2")}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration:escfg.Configuration{Enabled:true}}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2")

f.archiveConfig = &mockClientBuilder{}
Expand Down Expand Up @@ -127,12 +127,12 @@ func TestLoadTagsFromFile(t *testing.T) {

func TestFactory_LoadMapping(t *testing.T) {
spanMapping, serviceMapping := GetMappings(10, 0)
tests := []struct{
name string
tests := []struct {
name string
toTest string
}{
{name: "jaeger-span.json", toTest:spanMapping},
{name: "jaeger-service.json", toTest:serviceMapping},
{name: "jaeger-span.json", toTest: spanMapping},
{name: "jaeger-service.json", toTest: serviceMapping},
}
for _, test := range tests {
mapping := loadMapping(test.name)
Expand All @@ -149,3 +149,29 @@ func TestFactory_LoadMapping(t *testing.T) {
assert.Equal(t, expectedMapping, test.toTest)
}
}

func TestArchiveDisabled(t *testing.T) {
f := NewFactory()
f.Options.Get(archiveNamespace).Enabled = false
w, err := f.CreateArchiveSpanWriter()
assert.Nil(t, w)
assert.Nil(t, err)
r, err := f.CreateArchiveSpanReader()
assert.Nil(t, r)
assert.Nil(t, err)
}

func TestArchiveEnabled2(t *testing.T) {
f := NewFactory()
f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{}
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
f.Options.Get(archiveNamespace).Enabled = true
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
}
9 changes: 9 additions & 0 deletions plugin/storage/es/options.go
Expand Up @@ -49,6 +49,7 @@ const (
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixEnabled = ".enabled"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
TagDotReplacement: "@",
Enabled: true,
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -206,6 +208,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
"(experimental) Use read and write aliases for indices. Use this option with Elasticsearch rollover "+
"API. It requires an external component to create aliases before startup and then performing its management. "+
"Note that "+nsConfig.namespace+suffixMaxSpanAge+" is not taken into the account and has to be substituted by external component managing read alias.")
if nsConfig.namespace == archiveNamespace {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if this needs to be conditional? The primary namespace is enabled by default, so it just means a user could disable the primary namespace, which is unlikely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to show this flag for the primary NS. We do the same for c*

flagSet.Bool(
nsConfig.namespace+suffixEnabled,
nsConfig.Enabled,
"Enable extra storage")
}
}

// InitFromViper initializes Options with properties from viper
Expand Down Expand Up @@ -240,6 +248,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}

// GetPrimary returns primary configuration.
Expand Down