Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SASI indices #1328

Merged
merged 12 commits into from Feb 14, 2019

actually address comments

Signed-off-by: Won Jun Jang <wjang@uber.com>
  • Loading branch information...
black-adder committed Feb 12, 2019
commit 24da0160ad23e4ac9aac83796983a12eed4913eb
@@ -27,21 +27,21 @@ import (

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
DependencySASIDisabled bool `yaml:"dependency_sasi_disabled"`
TLS TLS
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"`
TLS TLS
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
@@ -26,58 +26,58 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// IndexMode determines how the dependency data is indexed.
type IndexMode int
// Version determines which version of the dependencies table to use.
type Version int

// IsValid returns true if the IndexMode is a valid one.
func (i IndexMode) IsValid() bool {
// IsValid returns true if the Version is a valid one.
func (i Version) IsValid() bool {
return i < end

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 13, 2019

Member

I would prefer a less confusing name than 'end', e.g. 'versionEnumEnd'

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 13, 2019

Member

also, Version(-1) is valid according to this function

}

const (
// SASIEnabled is used when the dependency table is SASI indexed.
SASIEnabled IndexMode = iota
// V1 is used when the dependency table is SASI indexed.
V1 Version = iota

// SASIDisabled is used when the dependency table is NOT SASI indexed.
SASIDisabled
// V2 is used when the dependency table is NOT SASI indexed.
V2
end

depsInsertStmtSASI = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmt = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtSASI = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"
depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"

// TODO: Make this customizable.
tsBucket = 24 * time.Hour
)

var (
errInvalidIndexMode = errors.New("invalid index mode")
errInvalidVersion = errors.New("invalid version")
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
indexMode IndexMode
version Version
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
indexMode IndexMode,
version Version,
) (*DependencyStore, error) {
if !indexMode.IsValid() {
return nil, errInvalidIndexMode
if !version.IsValid() {
return nil, errInvalidVersion
}
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
indexMode: indexMode,
version: version,
}, nil
}

@@ -90,18 +90,18 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
Child: d.Child,
CallCount: int64(d.CallCount),
}
if s.indexMode == SASIDisabled {
if s.version == V2 {
dep.Source = string(d.Source)

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Does this assignment need to be conditional? (Also - shouldn't this be set for SASIDisabled?)

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

yes, if you attempt to add the source field to a schema that doesn't support it, c* will error. And good catch, it should be for disabled

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Yes - but that doesn't answer my question; the query string for SASIDisabled doesn't include this field anyways

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

i'll test it and let you know

}
deps[i] = dep
}

var query cassandra.Query
switch s.indexMode {
case SASIDisabled:
query = s.session.Query(depsInsertStmt, ts, ts.Truncate(tsBucket), deps)
case SASIEnabled:
query = s.session.Query(depsInsertStmtSASI, ts, ts, deps)
switch s.version {
case V1:
query = s.session.Query(depsInsertStmtV1, ts, ts, deps)
case V2:
query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps)
}
return s.dependenciesTableMetrics.Exec(query, s.logger)
}
@@ -110,11 +110,11 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.indexMode {
case SASIDisabled:
query = s.session.Query(depsSelectStmt, getBuckets(startTs, endTs), startTs, endTs)
case SASIEnabled:
query = s.session.Query(depsSelectStmtSASI, startTs, endTs)
switch s.version {
case V1:
query = s.session.Query(depsSelectStmtV1, startTs, endTs)
case V2:
query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

@@ -40,12 +40,12 @@ type depStorageTest struct {
storage *DependencyStore
}

func withDepStore(indexMode IndexMode, fn func(s *depStorageTest)) {
func withDepStore(version Version, fn func(s *depStorageTest)) {
session := &mocks.Session{}
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(time.Second)
defer metricsFactory.Stop()
store, _ := NewDependencyStore(session, metricsFactory, logger, indexMode)
store, _ := NewDependencyStore(session, metricsFactory, logger, version)
s := &depStorageTest{
session: session,
logger: logger,
@@ -58,35 +58,35 @@ func withDepStore(indexMode IndexMode, fn func(s *depStorageTest)) {
var _ dependencystore.Reader = &DependencyStore{} // check API conformance
var _ dependencystore.Writer = &DependencyStore{} // check API conformance

func TestIndexModeIsValid(t *testing.T) {
assert.True(t, SASIEnabled.IsValid())
assert.True(t, SASIDisabled.IsValid())
func TestVersionIsValid(t *testing.T) {
assert.True(t, V1.IsValid())
assert.True(t, V2.IsValid())
assert.False(t, end.IsValid())
}

func TestInvalidIndexMode(t *testing.T) {
func TestInvalidVersion(t *testing.T) {
_, err := NewDependencyStore(&mocks.Session{}, metrics.NullFactory, zap.NewNop(), end)
assert.Error(t, err)
}

func TestDependencyStoreWrite(t *testing.T) {
testCases := []struct {
caption string
indexMode IndexMode
caption string
version Version
}{
{
caption: "SASI enabled",
indexMode: SASIEnabled,
caption: "V1",
version: V1,
},
{
caption: "SASI disabled",
indexMode: SASIDisabled,
caption: "V2",
version: V2,
},
}
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withDepStore(testCase.indexMode, func(s *depStorageTest) {
withDepStore(testCase.version, func(s *depStorageTest) {
query := &mocks.Query{}
query.On("Exec").Return(nil)

@@ -116,7 +116,7 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
if testCase.indexMode == SASIDisabled {
if testCase.version == V2 {
if d, ok := args[1].(time.Time); ok {
assert.Equal(t, time.Date(2017, time.January, 24, 0, 0, 0, 0, time.UTC), d)
} else {
@@ -130,7 +130,7 @@ func TestDependencyStoreWrite(t *testing.T) {
}
}
if d, ok := args[2].([]Dependency); ok {
if testCase.indexMode == SASIDisabled {
if testCase.version == V2 {
assert.Equal(t, []Dependency{
{
Parent: "a",
@@ -162,39 +162,39 @@ func TestDependencyStoreGetDependencies(t *testing.T) {
queryError error
expectedError string
expectedLogs []string
indexMode IndexMode
version Version
}{
{
caption: "success SASI enabled",
indexMode: SASIEnabled,
caption: "success V1",
version: V1,
},
{
caption: "success SASI disabled",
indexMode: SASIDisabled,
caption: "success V2",
version: V2,
},
{
caption: "failure SASI enabled",
caption: "failure V1",
queryError: errors.New("query error"),
expectedError: "Error reading dependencies from storage: query error",
expectedLogs: []string{
"Failure to read Dependencies",
},
indexMode: SASIEnabled,
version: V1,
},
{
caption: "failure SASI disabled",
caption: "failure V2",
queryError: errors.New("query error"),
expectedError: "Error reading dependencies from storage: query error",
expectedLogs: []string{
"Failure to read Dependencies",
},
indexMode: SASIDisabled,
version: V2,
},
}
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withDepStore(testCase.indexMode, func(s *depStorageTest) {
withDepStore(testCase.version, func(s *depStorageTest) {
scanMatcher := func() interface{} {
deps := [][]Dependency{
{
@@ -106,11 +106,11 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
indexMode := cDepStore.SASIEnabled
if f.Options.GetPrimary().DependencySASIDisabled {
indexMode = cDepStore.SASIDisabled
version := cDepStore.V1
if f.Options.GetPrimary().EnableDependenciesV2 {
version = cDepStore.V2
}
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, indexMode)
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
@@ -47,7 +47,7 @@ func TestCassandraFactory(t *testing.T) {
logger, logBuf := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--cassandra-archive.enabled=true", "--cassandra.sasi-disabled=true"})
command.ParseFlags([]string{"--cassandra-archive.enabled=true", "--cassandra.enable-dependencies-v2=true"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
@@ -26,27 +26,27 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixDC = ".local-dc"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixSASIDisabled = ".sasi-disabled"
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixDC = ".local-dc"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixEnableDependenciesV2 = ".enable-dependencies-v2"

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
@@ -199,9 +199,9 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.TLS.EnableHostVerification,
"Enable (or disable) host key verification")
flagSet.Bool(
nsConfig.namespace+suffixSASIDisabled,
nsConfig.DependencySASIDisabled,
"Disable (or enable) SASI indexes for the dependencies table. Only set this to true if you've migrated the dependencies table to NOT use SASI indexes")
nsConfig.namespace+suffixEnableDependenciesV2,
nsConfig.EnableDependenciesV2,
"Disable (or enable) the dependencies v2 table. Only set this to true if you've migrated the dependencies table to v2")
}

// InitFromViper initializes Options with properties from viper
@@ -236,7 +236,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA)
cfg.TLS.ServerName = v.GetString(cfg.namespace + suffixServerName)
cfg.TLS.EnableHostVerification = v.GetBool(cfg.namespace + suffixVerifyHost)
cfg.DependencySASIDisabled = v.GetBool(cfg.namespace + suffixSASIDisabled)
cfg.EnableDependenciesV2 = v.GetBool(cfg.namespace + suffixEnableDependenciesV2)
}

// GetPrimary returns primary configuration.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.