diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index a80faaef976..9b0fbfcd9a0 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -33,6 +33,7 @@ const ( suffixServers = ".servers" suffixPort = ".port" suffixKeyspace = ".keyspace" + suffixConsistency = ".consistency" suffixProtoVer = ".proto-version" suffixSocketKeepAlive = ".socket-keep-alive" suffixUsername = ".username" @@ -147,6 +148,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixKeyspace, nsConfig.Keyspace, "The Cassandra keyspace for Jaeger data") + flagSet.String( + nsConfig.namespace+suffixConsistency, + nsConfig.Consistency, + "The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)") flagSet.Int( nsConfig.namespace+suffixProtoVer, nsConfig.ProtoVersion, @@ -209,6 +214,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { cfg.servers = v.GetString(cfg.namespace + suffixServers) cfg.Port = v.GetInt(cfg.namespace + suffixPort) cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) + cfg.Consistency = v.GetString(cfg.namespace + suffixConsistency) cfg.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) cfg.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) cfg.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 8bc68010a64..9e817808ad0 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -53,6 +53,7 @@ func TestOptionsWithFlags(t *testing.T) { "--cas.max-retry-attempts=42", "--cas.timeout=42s", "--cas.port=4242", + "--cas.consistency=ONE", "--cas.proto-version=3", "--cas.socket-keep-alive=42s", // enable aux with a couple overrides @@ -65,6 +66,7 @@ func TestOptionsWithFlags(t *testing.T) { primary := opts.GetPrimary() assert.Equal(t, "jaeger", primary.Keyspace) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) + assert.Equal(t, "ONE", primary.Consistency) aux := opts.Get("cas-aux") require.NotNil(t, aux) @@ -74,6 +76,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, 42, aux.MaxRetryAttempts) assert.Equal(t, 42*time.Second, aux.Timeout) assert.Equal(t, 4242, aux.Port) + assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") assert.Equal(t, 3, aux.ProtoVersion) assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) } diff --git a/storage/spanstore/multiplex.go b/storage/spanstore/composite.go similarity index 78% rename from storage/spanstore/multiplex.go rename to storage/spanstore/composite.go index 4e8042e7829..4c4d7caefda 100644 --- a/storage/spanstore/multiplex.go +++ b/storage/spanstore/composite.go @@ -19,20 +19,20 @@ import ( "github.com/jaegertracing/jaeger/pkg/multierror" ) -// MultiplexWriter is a span Writer that tries to save spans into several underlying span Writers -type MultiplexWriter struct { +// CompositeWriter is a span Writer that tries to save spans into several underlying span Writers +type CompositeWriter struct { spanWriters []Writer } -// NewMultiplexWriter creates a MultiplexWriter -func NewMultiplexWriter(spanWriters ...Writer) *MultiplexWriter { - return &MultiplexWriter{ +// NewCompositeWriter creates a CompositeWriter +func NewCompositeWriter(spanWriters ...Writer) *CompositeWriter { + return &CompositeWriter{ spanWriters: spanWriters, } } // WriteSpan calls WriteSpan on each span writer. It will sum up failures, it is not transactional -func (c *MultiplexWriter) WriteSpan(span *model.Span) error { +func (c *CompositeWriter) WriteSpan(span *model.Span) error { var errors []error for _, writer := range c.spanWriters { if err := writer.WriteSpan(span); err != nil { diff --git a/storage/spanstore/multiplex_test.go b/storage/spanstore/composite_test.go similarity index 88% rename from storage/spanstore/multiplex_test.go rename to storage/spanstore/composite_test.go index b158df1af44..dada4c768e5 100644 --- a/storage/spanstore/multiplex_test.go +++ b/storage/spanstore/composite_test.go @@ -40,16 +40,16 @@ func (n *noopWriteSpanStore) WriteSpan(span *model.Span) error { } func TestCompositeWriteSpanStoreSuccess(t *testing.T) { - c := NewMultiplexWriter(&noopWriteSpanStore{}, &noopWriteSpanStore{}) + c := NewCompositeWriter(&noopWriteSpanStore{}, &noopWriteSpanStore{}) assert.NoError(t, c.WriteSpan(nil)) } func TestCompositeWriteSpanStoreSecondFailure(t *testing.T) { - c := NewMultiplexWriter(&errProneWriteSpanStore{}, &errProneWriteSpanStore{}) + c := NewCompositeWriter(&errProneWriteSpanStore{}, &errProneWriteSpanStore{}) assert.EqualError(t, c.WriteSpan(nil), fmt.Sprintf("[%s, %s]", errIWillAlwaysFail, errIWillAlwaysFail)) } func TestCompositeWriteSpanStoreFirstFailure(t *testing.T) { - c := NewMultiplexWriter(&errProneWriteSpanStore{}, &noopWriteSpanStore{}) + c := NewCompositeWriter(&errProneWriteSpanStore{}, &noopWriteSpanStore{}) assert.Equal(t, errIWillAlwaysFail, c.WriteSpan(nil)) }