Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SASI indices #1328

Merged
merged 12 commits into from Feb 14, 2019
Next

Remove SASI indices

Signed-off-by: Won Jun Jang <wjang@uber.com>
  • Loading branch information...
black-adder committed Feb 11, 2019
commit 11c2cce340c890e6016f8d78ec8c84e88df21727
@@ -14,9 +14,26 @@

package model

// DependencyLinkSource is the source of data used to generate the dependencies.
type DependencyLinkSource string

const (
// JaegerDependencyLinkSource describes a dependency diagram that was generated from Jaeger traces.
JaegerDependencyLinkSource = DependencyLinkSource("jaeger")
)

// DependencyLink shows dependencies between services
type DependencyLink struct {
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Source DependencyLinkSource `json:"source"`
}

// Sanitize sanitizes the DependencyLink.
func (d DependencyLink) Sanitize() DependencyLink {

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

ApplyDefaults

if d.Source == "" {
d.Source = JaegerDependencyLinkSource
}
return d
}
@@ -0,0 +1,30 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDependencyLinkSanitize(t *testing.T) {
dl := DependencyLink{}.Sanitize()
assert.Equal(t, JaegerDependencyLinkSource, dl.Source)

networkSource := DependencyLinkSource("network")
dl = DependencyLink{Source: networkSource}.Sanitize()
assert.Equal(t, networkSource, dl.Source)
}
@@ -27,20 +27,21 @@ import (

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
TLS TLS
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
DependencySASIDisabled bool `yaml:"dependency_sasi_disabled"`

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Could this be simplified to SASI disabled?

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

i'd rather it be more explicit but I can simplify

TLS TLS
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
@@ -25,6 +25,7 @@ type Dependency struct {
Parent string `cql:"parent"`
Child string `cql:"child"`
CallCount int64 `cql:"call_count"` // always unsigned, but we cannot explicitly read uint64 from Cassandra
Source string `cql:"source"`
}

// MarshalUDT handles marshalling a Dependency.
@@ -36,6 +37,8 @@ func (d *Dependency) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error
return gocql.Marshal(info, d.Child)
case "call_count":
return gocql.Marshal(info, d.CallCount)
case "source":

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

what happens if you run this code against the schema where UDT was not upgraded? Will gocql simply not invoke this function for "source"?

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 13, 2019

Author Collaborator

yes

return gocql.Marshal(info, d.Source)
default:
return nil, fmt.Errorf("unknown column for position: %q", name)
}
@@ -50,6 +53,8 @@ func (d *Dependency) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte)
return gocql.Unmarshal(info, data, &d.Child)
case "call_count":
return gocql.Unmarshal(info, data, &d.CallCount)
case "source":
return gocql.Unmarshal(info, data, &d.Source)
default:
return fmt.Errorf("unknown column for position: %q", name)
}
@@ -24,19 +24,21 @@ import (

func TestDependencyUDT(t *testing.T) {
dependency := &Dependency{
Parent: "goo",
Child: "gle",
Parent: "bi",
Child: "ng",

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

quit raising entropy

CallCount: 123,
Source: "jaeger",
}

testCase := testutils.UDTTestCase{
Obj: dependency,
New: func() gocql.UDTUnmarshaler { return &Dependency{} },
ObjName: "Dependency",
Fields: []testutils.UDTField{
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("goo"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("gle"), Err: false},
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("bi"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("ng"), Err: false},
{Name: "call_count", Type: gocql.TypeBigInt, ValIn: []byte{0, 0, 0, 0, 0, 0, 0, 123}, Err: false},
{Name: "source", Type: gocql.TypeAscii, ValIn: []byte("jaeger"), Err: false},
{Name: "wrong-field", Err: true},
},
}
@@ -15,6 +15,8 @@
package dependencystore

import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"
@@ -26,60 +28,96 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// IndexMode determines how the dependency data is indexed.
type IndexMode int

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
// SASIEnabled is used when the dependency table is SASI indexed.
SASIEnabled IndexMode = iota

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

would it not make sense to simply refer to this as v2 throughout, including constants, variables, the CLI flags? SASI then becomes a side effect that the user doesn't really need to know about.


// SASIDisabled is used when the dependency table is NOT SASI indexed.
SASIDisabled

depsInsertStmtSASI = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmt = "INSERT INTO dependenciesv2(ts, date_bucket, dependencies) VALUES (?, ?, ?)"

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

can we do dependencies_v2

depsSelectStmtSASI = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectFmt = "SELECT ts, dependencies FROM dependenciesv2 WHERE date_bucket IN (%s) AND ts >= ? AND ts < ?"
dateFmt = "20060102"
day = 24 * time.Hour
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
indexMode IndexMode
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
indexMode IndexMode,

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Is there anything preventing someone from passing in an undefined IndexMode, like IndexMode(10) for e.g.?

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

ill validate here

) *DependencyStore {
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
indexMode: indexMode,
}
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
deps := make([]Dependency, len(dependencies))
for i, d := range dependencies {
deps[i] = Dependency{
dep := Dependency{
Parent: d.Parent,
Child: d.Child,
CallCount: int64(d.CallCount),
}
if s.indexMode == SASIEnabled {
dep.Source = string(d.Source)

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Does this assignment need to be conditional? (Also - shouldn't this be set for SASIDisabled?)

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

yes, if you attempt to add the source field to a schema that doesn't support it, c* will error. And good catch, it should be for disabled

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 12, 2019

Member

Yes - but that doesn't answer my question; the query string for SASIDisabled doesn't include this field anyways

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 12, 2019

Author Collaborator

i'll test it and let you know

}
deps[i] = dep
}

var query cassandra.Query
switch s.indexMode {
case SASIDisabled:
query = s.session.Query(depsInsertStmt, ts, ts.Format(dateFmt), deps)
case SASIEnabled:
query = s.session.Query(depsInsertStmtSASI, ts, ts, deps)
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.indexMode {

This comment has been minimized.

Copy link
@vprithvi

vprithvi Feb 11, 2019

Member

Would GetDependencies be simplified by omitting this switch? Instead we might read from SASIDisabled first, and if that fails we can read from SASIEnabled?
When migrating, this would mean that users don't need to redeploy query after moving over the dependency table

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 11, 2019

Author Collaborator

I'm not a fan of adding the extra roundtrip time. Additionally, if the user has opted in to migrating to the new table, redeploying the query service will not be the most strenuous thing.

case SASIDisabled:
query = s.session.Query(getDepSelectString(startTs, endTs), startTs, endTs)
case SASIEnabled:
query = s.session.Query(depsSelectStmtSASI, startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
var dependencies []Dependency
var ts time.Time
for iter.Scan(&ts, &dependencies) {
for _, dependency := range dependencies {
mDependency = append(mDependency, model.DependencyLink{
dl := model.DependencyLink{
Parent: dependency.Parent,
Child: dependency.Child,
CallCount: uint64(dependency.CallCount),
})
Source: model.DependencyLinkSource(dependency.Source),
}.Sanitize()
mDependency = append(mDependency, dl)
}
}

@@ -89,3 +127,11 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
}
return mDependency, nil
}

func getDepSelectString(startTs time.Time, endTs time.Time) string {
var dateBuckets []string
for ts := startTs.Truncate(day); ts.Before(endTs); ts = ts.Add(day) {
dateBuckets = append(dateBuckets, ts.Format(dateFmt))
}
return fmt.Sprintf(depsSelectFmt, strings.Join(dateBuckets, ","))
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.