Skip to content

Commit

Permalink
Introduce storage factory framework and composable CLI (#625)
Browse files Browse the repository at this point in the history
* deprecate --span-storage.type CLI flag
* instead use env variable SPAN_STORAGE (and optionally DEPENDENCY_STORAGE)
* CLI options only include flags relevant to the selected storage type(s)
( introduce storage.Factory interface for creating different storage components like spanstore.Reader
* introduce meta-factory plugin/storage.Factory that instantiates concrete factories based on the above env vars
* concrete factories may implement plugin.Configurable interface that allows them to define custom CLI flags and to be initialized from Viper
* remove a lot of duplicated code for storage initialization in collector/query/standalone
* add new cobra command env that prints help about configuring via env vars
  • Loading branch information
yurishkuro committed Jan 7, 2018
1 parent d2f85a4 commit 8db7a11
Show file tree
Hide file tree
Showing 41 changed files with 1,288 additions and 750 deletions.
3 changes: 2 additions & 1 deletion cmd/agent/app/flags.go
Expand Up @@ -66,7 +66,7 @@ func AddFlags(flags *flag.FlagSet) {
}

// InitFromViper initializes Builder with properties retrieved from Viper.
func (b *Builder) InitFromViper(v *viper.Viper) {
func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
b.Metrics.InitFromViper(v)

for _, processor := range defaultProcessors {
Expand All @@ -84,4 +84,5 @@ func (b *Builder) InitFromViper(v *viper.Viper) {
}
b.HTTPServer.HostPort = v.GetString(httpServerHostPort)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
return b
}
33 changes: 2 additions & 31 deletions cmd/builder/builder_options.go
Expand Up @@ -17,24 +17,16 @@ package builder
import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore/memory"
)

// TODO combine with SharedFlags

// BasicOptions is a set of basic building blocks for most Jaeger executables
type BasicOptions struct {
// Logger is a generic logger used by most executables
Logger *zap.Logger
// MetricsFactory is the basic metrics factory used by most executables
MetricsFactory metrics.Factory
// MemoryStore is the memory store (as reader and writer) that will be used if required
MemoryStore *memory.Store
// CassandraSessionBuilder is the cassandra session builder
CassandraSessionBuilder cascfg.SessionBuilder
// ElasticClientBuilder is the elasticsearch client builder
ElasticClientBuilder escfg.ClientBuilder
}

// Option is a function that sets some option on StorageBuilder.
Expand All @@ -57,27 +49,6 @@ func (BasicOptions) MetricsFactoryOption(metricsFactory metrics.Factory) Option
}
}

// CassandraSessionOption creates an Option that adds Cassandra session builder.
func (BasicOptions) CassandraSessionOption(sessionBuilder cascfg.SessionBuilder) Option {
return func(b *BasicOptions) {
b.CassandraSessionBuilder = sessionBuilder
}
}

// ElasticClientOption creates an Option that adds ElasticSearch client builder.
func (BasicOptions) ElasticClientOption(clientBuilder escfg.ClientBuilder) Option {
return func(b *BasicOptions) {
b.ElasticClientBuilder = clientBuilder
}
}

// MemoryStoreOption creates an Option that adds a memory store
func (BasicOptions) MemoryStoreOption(memoryStore *memory.Store) Option {
return func(b *BasicOptions) {
b.MemoryStore = memoryStore
}
}

// ApplyOptions takes a set of options and creates a populated BasicOptions struct
func ApplyOptions(opts ...Option) BasicOptions {
o := BasicOptions{}
Expand Down
10 changes: 0 additions & 10 deletions cmd/builder/builder_options_test.go
Expand Up @@ -20,24 +20,14 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore/memory"
"github.com/uber/jaeger-lib/metrics"
)

func TestApplyOptions(t *testing.T) {
opts := ApplyOptions(
Options.CassandraSessionOption(&cascfg.Configuration{}),
Options.LoggerOption(zap.NewNop()),
Options.MetricsFactoryOption(metrics.NullFactory),
Options.MemoryStoreOption(memory.NewStore()),
Options.ElasticClientOption(&escfg.Configuration{
Servers: []string{"127.0.0.1"},
}),
)
assert.NotNil(t, opts.CassandraSessionBuilder)
assert.NotNil(t, opts.ElasticClientBuilder)
assert.NotNil(t, opts.Logger)
assert.NotNil(t, opts.MetricsFactory)
}
Expand Down
5 changes: 0 additions & 5 deletions cmd/collector/app/builder/builder_flags.go
Expand Up @@ -16,7 +16,6 @@ package builder

import (
"flag"
"time"

"github.com/spf13/viper"

Expand All @@ -39,8 +38,6 @@ type CollectorOptions struct {
QueueSize int
// NumWorkers is the number of internal workers in a collector
NumWorkers int
// WriteCacheTTL denotes how often to check and re-write a service or operation name
WriteCacheTTL time.Duration
// CollectorPort is the port that the collector service listens in on for tchannel requests
CollectorPort int
// CollectorHTTPPort is the port that the collector service listens in on for http requests
Expand All @@ -55,7 +52,6 @@ type CollectorOptions struct {
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorQueueSize, app.DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Duration(collectorWriteCacheTTL, time.Hour*12, "The duration to wait before rewriting an existing service or operation name")
flags.Int(collectorPort, 14267, "The tchannel port for the collector service")
flags.Int(collectorHTTPPort, 14268, "The http port for the collector service")
flags.Int(collectorZipkinHTTPort, 0, "The http port for the Zipkin collector service e.g. 9411")
Expand All @@ -66,7 +62,6 @@ func AddFlags(flags *flag.FlagSet) {
func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.WriteCacheTTL = v.GetDuration(collectorWriteCacheTTL)
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
Expand Down
61 changes: 2 additions & 59 deletions cmd/collector/app/builder/span_handler_builder.go
Expand Up @@ -24,12 +24,7 @@ import (
basicB "github.com/jaegertracing/jaeger/cmd/builder"
"github.com/jaegertracing/jaeger/cmd/collector/app"
zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/model"
cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
casSpanstore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
esSpanstore "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -48,71 +43,19 @@ type SpanHandlerBuilder struct {
}

// NewSpanHandlerBuilder returns new SpanHandlerBuilder with configured span storage.
func NewSpanHandlerBuilder(cOpts *CollectorOptions, sFlags *flags.SharedFlags, opts ...basicB.Option) (*SpanHandlerBuilder, error) {
func NewSpanHandlerBuilder(cOpts *CollectorOptions, spanWriter spanstore.Writer, opts ...basicB.Option) (*SpanHandlerBuilder, error) {
options := basicB.ApplyOptions(opts...)

spanHb := &SpanHandlerBuilder{
collectorOpts: cOpts,
logger: options.Logger,
metricsFactory: options.MetricsFactory,
}

var err error
if sFlags.SpanStorage.Type == flags.CassandraStorageType {
if options.CassandraSessionBuilder == nil {
return nil, errMissingCassandraConfig
}
spanHb.spanWriter, err = spanHb.initCassStore(options.CassandraSessionBuilder)
} else if sFlags.SpanStorage.Type == flags.MemoryStorageType {
if options.MemoryStore == nil {
return nil, errMissingMemoryStore
}
spanHb.spanWriter = options.MemoryStore
} else if sFlags.SpanStorage.Type == flags.ESStorageType {
if options.ElasticClientBuilder == nil {
return nil, errMissingElasticSearchConfig
}
spanHb.spanWriter, err = spanHb.initElasticStore(options.ElasticClientBuilder)
} else {
return nil, flags.ErrUnsupportedStorageType
}

if err != nil {
return nil, err
spanWriter: spanWriter,
}

return spanHb, nil
}

func (spanHb *SpanHandlerBuilder) initCassStore(builder cascfg.SessionBuilder) (spanstore.Writer, error) {
session, err := builder.NewSession()
if err != nil {
return nil, err
}

return casSpanstore.NewSpanWriter(
session,
spanHb.collectorOpts.WriteCacheTTL,
spanHb.metricsFactory,
spanHb.logger,
), nil
}

func (spanHb *SpanHandlerBuilder) initElasticStore(esBuilder escfg.ClientBuilder) (spanstore.Writer, error) {
client, err := esBuilder.NewClient()
if err != nil {
return nil, err
}

return esSpanstore.NewSpanWriter(
client,
spanHb.logger,
spanHb.metricsFactory,
esBuilder.GetNumShards(),
esBuilder.GetNumReplicas(),
), nil
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (spanHb *SpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.JaegerBatchesHandler) {
hostname, _ := os.Hostname()
Expand Down
138 changes: 4 additions & 134 deletions cmd/collector/app/builder/span_handler_builder_test.go
Expand Up @@ -24,127 +24,23 @@ import (

"github.com/jaegertracing/jaeger/cmd/builder"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/cassandra"
cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/es"
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
esMocks "github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore/memory"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type mockSessionBuilder struct {
}

func (*mockSessionBuilder) NewSession() (cassandra.Session, error) {
return &mocks.Session{}, nil
}

type mockEsBuilder struct {
escfg.Configuration
}

func (mck *mockEsBuilder) NewClient() (es.Client, error) {
return &esMocks.Client{}, nil
}

func TestNewSpanHandlerBuilder(t *testing.T) {
v, command := config.Viperize(flags.AddFlags)
v, command := config.Viperize(flags.AddFlags, AddFlags)

command.ParseFlags([]string{})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(
cOpts,
sFlags,
builder.Options.LoggerOption(zap.NewNop()),
builder.Options.MetricsFactoryOption(metrics.NullFactory),
builder.Options.CassandraSessionOption(&mockSessionBuilder{}),
)
require.NoError(t, err)
assert.NotNil(t, handler)
zipkin, jaeger := handler.BuildHandlers()
assert.NotNil(t, zipkin)
assert.NotNil(t, jaeger)
}

func TestNewSpanHandlerBuilderCassandraNoSession(t *testing.T) {
v, command := config.Viperize(flags.AddFlags)

command.ParseFlags([]string{})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)
spanWriter := memory.NewStore()

handler, err := NewSpanHandlerBuilder(
cOpts,
sFlags,
spanWriter,
builder.Options.LoggerOption(zap.NewNop()),
builder.Options.MetricsFactoryOption(metrics.NullFactory),
builder.Options.CassandraSessionOption(&cascfg.Configuration{}),
)
require.Error(t, err)
assert.Nil(t, handler)
}

func TestNewSpanHandlerBuilderCassandraNotConfigured(t *testing.T) {
v, _ := config.Viperize(AddFlags, flags.AddFlags)
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(cOpts, sFlags)
assert.Error(t, err)
assert.Nil(t, handler)
}

func TestNewSpanHandlerBuilderBadStorageTypeFailure(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=sneh"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(cOpts, sFlags)
assert.Error(t, err)
assert.Nil(t, handler)
}

func TestNewSpanHandlerBuilderMemoryNotSet(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=memory"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(cOpts, sFlags)
assert.Error(t, err)
assert.Nil(t, handler)
}

func TestNewSpanHandlerBuilderMemorySet(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=memory"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(cOpts, sFlags, builder.Options.MemoryStoreOption(memory.NewStore()))
require.NoError(t, err)
assert.NotNil(t, handler)
jHandler, zHandler := handler.BuildHandlers()
assert.NotNil(t, jHandler)
assert.NotNil(t, zHandler)
}

func TestNewSpanHandlerBuilderElasticSearch(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=elasticsearch"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(
cOpts,
sFlags,
builder.Options.LoggerOption(zap.NewNop()),
builder.Options.ElasticClientOption(&mockEsBuilder{}),
)
require.NoError(t, err)
assert.NotNil(t, handler)
Expand All @@ -153,32 +49,6 @@ func TestNewSpanHandlerBuilderElasticSearch(t *testing.T) {
assert.NotNil(t, jaeger)
}

func TestNewSpanHandlerBuilderElasticSearchNoClient(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=elasticsearch"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)

handler, err := NewSpanHandlerBuilder(
cOpts,
sFlags,
builder.Options.LoggerOption(zap.NewNop()),
builder.Options.ElasticClientOption(&escfg.Configuration{}),
)
require.Error(t, err)
assert.Nil(t, handler)
}

func TestNewSpanHandlerBuilderElasticSearchFailure(t *testing.T) {
v, command := config.Viperize(AddFlags, flags.AddFlags)
command.ParseFlags([]string{"test", "--span-storage.type=elasticsearch"})
sFlags := new(flags.SharedFlags).InitFromViper(v)
cOpts := new(CollectorOptions).InitFromViper(v)
handler, err := NewSpanHandlerBuilder(cOpts, sFlags)
assert.EqualError(t, err, "ElasticSearch not configured")
assert.Nil(t, handler)
}

func TestDefaultSpanFilter(t *testing.T) {
assert.True(t, defaultSpanFilter(nil))
}

0 comments on commit 8db7a11

Please sign in to comment.