Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SASI indices #1328

Merged
merged 12 commits into from Feb 14, 2019
@@ -6,6 +6,17 @@ Changes by Version

#### Backend Changes

- Remove cassandra SASI indices [#1328](https://github.com/jaegertracing/jaeger/pull/1328)

Migration Path:

1. Run `plugin/storage/cassandra/schema/migration/v001tov002part1.sh` which will copy dependencies into a csv, update the `dependency UDT`, create a new `dependencies_v2` table, and write dependencies from the csv into the `dependencies_v2` table.
2. Run the collector and query services with the cassandra flag `cassandra.enable-dependencies-v2=true` which will instruct jaeger to write and read to and from the new `dependencies_v2` table.
3. Update [spark job](https://github.com/jaegertracing/spark-dependencies) to write to the new `dependencies_v2` table.

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

this should rather say "update Spark job to version N"

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 13, 2019

Author Collaborator

I'll punt on this until I've actually made the change in spark.

4. Run `plugin/storage/cassandra/schema/migration/v001tov002part2.sh` which will DELETE the old dependency table and the SASI index.

Users who wish to continue to use the v1 table don't have to do anything as the cassandra flag `cassandra.enable-dependencies-v2` will default to false. Users may migrate on their own timeline however new features will be built solely on the `dependencies_v2` table. In the future, we will remove support for v1 completely.

##### Breaking Changes

##### New Features
@@ -14,9 +14,26 @@

package model

// DependencyLinkSource is the source of data used to generate the dependencies.
type DependencyLinkSource string

const (
// JaegerDependencyLinkSource describes a dependency diagram that was generated from Jaeger traces.
JaegerDependencyLinkSource = DependencyLinkSource("jaeger")
)

// DependencyLink shows dependencies between services
type DependencyLink struct {
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Source DependencyLinkSource `json:"source"`
}

// ApplyDefaults applies defaults to the DependencyLink.
func (d DependencyLink) ApplyDefaults() DependencyLink {
if d.Source == "" {
d.Source = JaegerDependencyLinkSource
}
return d
}
@@ -0,0 +1,30 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDependencyLinkApplyDefaults(t *testing.T) {
dl := DependencyLink{}.ApplyDefaults()
assert.Equal(t, JaegerDependencyLinkSource, dl.Source)

networkSource := DependencyLinkSource("network")
dl = DependencyLink{Source: networkSource}.ApplyDefaults()
assert.Equal(t, networkSource, dl.Source)
}
@@ -40,6 +40,7 @@ type Configuration struct {
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"`
TLS TLS
}

@@ -25,6 +25,7 @@ type Dependency struct {
Parent string `cql:"parent"`
Child string `cql:"child"`
CallCount int64 `cql:"call_count"` // always unsigned, but we cannot explicitly read uint64 from Cassandra
Source string `cql:"source"`
}

// MarshalUDT handles marshalling a Dependency.
@@ -36,6 +37,8 @@ func (d *Dependency) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error
return gocql.Marshal(info, d.Child)
case "call_count":
return gocql.Marshal(info, d.CallCount)
case "source":

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

what happens if you run this code against the schema where UDT was not upgraded? Will gocql simply not invoke this function for "source"?

This comment has been minimized.

Copy link
@black-adder

black-adder Feb 13, 2019

Author Collaborator

yes

return gocql.Marshal(info, d.Source)
default:
return nil, fmt.Errorf("unknown column for position: %q", name)
}
@@ -50,6 +53,8 @@ func (d *Dependency) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte)
return gocql.Unmarshal(info, data, &d.Child)
case "call_count":
return gocql.Unmarshal(info, data, &d.CallCount)
case "source":
return gocql.Unmarshal(info, data, &d.Source)
default:
return fmt.Errorf("unknown column for position: %q", name)
}
@@ -24,19 +24,21 @@ import (

func TestDependencyUDT(t *testing.T) {
dependency := &Dependency{
Parent: "goo",
Child: "gle",
Parent: "bi",
Child: "ng",

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 12, 2019

Member

quit raising entropy

CallCount: 123,
Source: "jaeger",
}

testCase := testutils.UDTTestCase{
Obj: dependency,
New: func() gocql.UDTUnmarshaler { return &Dependency{} },
ObjName: "Dependency",
Fields: []testutils.UDTField{
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("goo"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("gle"), Err: false},
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("bi"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("ng"), Err: false},
{Name: "call_count", Type: gocql.TypeBigInt, ValIn: []byte{0, 0, 0, 0, 0, 0, 0, 123}, Err: false},
{Name: "source", Type: gocql.TypeAscii, ValIn: []byte("jaeger"), Err: false},
{Name: "wrong-field", Err: true},
},
}
@@ -26,29 +26,59 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// Version determines which version of the dependencies table to use.
type Version int

// IsValid returns true if the Version is a valid one.
func (i Version) IsValid() bool {
return i < end

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 13, 2019

Member

I would prefer a less confusing name than 'end', e.g. 'versionEnumEnd'

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Feb 13, 2019

Member

also, Version(-1) is valid according to this function

}

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
// V1 is used when the dependency table is SASI indexed.
V1 Version = iota

// V2 is used when the dependency table is NOT SASI indexed.
V2
end

depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"

// TODO: Make this customizable.
tsBucket = 24 * time.Hour
)

var (
errInvalidVersion = errors.New("invalid version")
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
version Version
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
) *DependencyStore {
version Version,
) (*DependencyStore, error) {
if !version.IsValid() {
return nil, errInvalidVersion
}
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
}
version: version,
}, nil
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
@@ -59,27 +89,44 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
Parent: d.Parent,
Child: d.Child,
CallCount: int64(d.CallCount),
Source: string(d.Source),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)

var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsInsertStmtV1, ts, ts, deps)
case V2:
query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps)
}
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsSelectStmtV1, startTs, endTs)
case V2:
query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
var dependencies []Dependency
var ts time.Time
for iter.Scan(&ts, &dependencies) {
for _, dependency := range dependencies {
mDependency = append(mDependency, model.DependencyLink{
dl := model.DependencyLink{
Parent: dependency.Parent,
Child: dependency.Child,
CallCount: uint64(dependency.CallCount),
})
Source: model.DependencyLinkSource(dependency.Source),
}.ApplyDefaults()
mDependency = append(mDependency, dl)
}
}

@@ -89,3 +136,12 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
}
return mDependency, nil
}

func getBuckets(startTs time.Time, endTs time.Time) []time.Time {
// TODO: Preallocate the array using some maths and maybe use a pool? This endpoint probably isn't used enough to warrant this.
var tsBuckets []time.Time
for ts := startTs.Truncate(tsBucket); ts.Before(endTs); ts = ts.Add(tsBucket) {
tsBuckets = append(tsBuckets, ts)
}
return tsBuckets
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.