Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cassandra] Remove SASI index from dependencies table #793

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dependencystore

import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -27,8 +29,10 @@ import (
)

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsInsertStmt = "INSERT INTO dependencies(ts, date_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectFmt = "SELECT ts, dependencies FROM dependencies WHERE date_bucket IN (%s) AND ts >= ? AND ts < ?"
dateFmt = "20060102"
day = 24 * time.Hour
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
Expand Down Expand Up @@ -64,13 +68,14 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
CallCount: int64(d.CallCount),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)
query := s.session.Query(depsInsertStmt, ts, ts.Format(dateFmt), deps)
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
query := s.session.Query(getDepSelectString(startTs, endTs), startTs, endTs)
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
Expand All @@ -93,6 +98,14 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
return mDependency, nil
}

func getDepSelectString(startTs time.Time, endTs time.Time) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cassandra has no date range support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cassandra does not support range queries on partition keys

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that explains it.

var dateBuckets []string
for ts := startTs.Truncate(day); ts.Before(endTs); ts = ts.Add(day) {
dateBuckets = append(dateBuckets, ts.Format(dateFmt))
}
return fmt.Sprintf(depsSelectFmt, strings.Join(dateBuckets, ","))
}

func (s *DependencyStore) timeIntervalToPoints(endTs time.Time, lookback time.Duration) []time.Time {
startTs := endTs.Add(-lookback)
var days []time.Time
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
if d, ok := args[1].(time.Time); ok {
assert.Equal(t, ts, d)
if d, ok := args[1].(string); ok {
assert.Equal(t, ts.Format(dateFmt), d)
} else {
assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args)
assert.Fail(t, "expecting second arg as string", "received: %+v", args)
}
if d, ok := args[2].([]Dependency); ok {
assert.Equal(t, []Dependency{
Expand Down
78 changes: 78 additions & 0 deletions plugin/storage/cassandra/schema/migration/v001tov002.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env bash

function usage {
>&2 echo "Error: $1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use a heredoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, let me try it out

>&2 echo ""
>&2 echo "Usage: KEYSPACE={keyspace} $0"
>&2 echo ""
>&2 echo "The following parameters can be set via environment:"
>&2 echo " KEYSPACE - keyspace "
>&2 echo ""
exit 1
}

confirm() {
read -r -p "${1:-Are you sure? [y/N]} " response
case "$response" in
[yY][eE][sS]|[yY])
Copy link
Contributor

@isaachier isaachier May 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably just do if [[ "$(echo $response | tr 'a-z' 'A-Z')" =~ "Y(ES)?" ]].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying it's necessarily easier, but why wouldn't it work?

Copy link
Contributor Author

@vprithvi vprithvi May 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, I didn't spend time debugging it. Does it work for you?

bash-3.2$ response=Y && if [[ "$(echo $response | tr 'a-z' 'A-Z')" =~ "Y(ES)?" ]]; then echo "passed"; fi
bash-3.2$

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I think bash regex might just be awful.

true
;;
*)
exit 1
;;
esac
}

keyspace=${KEYSPACE}

if [[ ${keyspace} == "" ]]; then
usage "missing KEYSPACE parameter"
fi

if [[ ${keyspace} =~ [^a-zA-Z0-9_] ]]; then
usage "invalid characters in KEYSPACE=$keyspace parameter, please use letters, digits or underscores"
fi


row_count=$(cqlsh -e "select count(*) from $keyspace.dependencies;"|head -4|tail -1| tr -d ' ')

echo "About to copy $row_count rows."
confirm

cqlsh -e "COPY $keyspace.dependencies (ts, dependencies ) to 'dependencies.csv'"

if [ ! -f dependencies.csv ]; then
echo "Could not find dependencies.csv. Backup from cassandra was probably not successful"
exit 1
fi

if [ ${row_count} -ne $(wc -l dependencies.csv | cut -f 1 -d ' ') ]; then
echo "Number of rows in file is not equal to number of rows in cassandra"
exit 1
fi


while IFS="," read ts dependency; do bucket=`date +"%Y%m%d" -d "$ts"`; echo "$bucket,$ts,$dependency"; done < dependencies.csv > dependencies_datebucket.csv

dependencies_ttl=$(cqlsh -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='dependencies';"|head -4|tail -1|tr -d ' ')

echo "Setting dependencies_ttl to $dependencies_ttl"

cqlsh -e "DROP INDEX IF EXISTS $keyspace.ts_index"
cqlsh -e "DROP TABLE IF EXISTS $keyspace.dependencies"

cqlsh -e "CREATE TABLE $keyspace.dependencies (
ts timestamp,
date_bucket bigint,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition key first

dependencies list<frozen<dependency>>,
PRIMARY KEY (date_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = $dependencies_ttl
"

cqlsh -e "COPY $keyspace.dependencies (date_bucket, ts, dependencies) FROM 'dependencies_datebucket.csv'"
202 changes: 202 additions & 0 deletions plugin/storage/cassandra/schema/v002.cql.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
--
-- Creates Cassandra keyspace with tables for traces and dependencies.
--
-- Required parameters:
--
-- keyspace
-- name of the keyspace
-- replication
-- replication strategy for the keyspace, such as
-- for prod environments
-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' }
-- for test environments
-- {'class': 'SimpleStrategy', 'replication_factor': '1'}
-- trace_ttl
-- default time to live for trace data, in seconds
-- dependencies_ttl
-- default time to live for dependencies data, in seconds (0 for no TTL)
--
-- Non-configurable settings:
-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/
-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html

CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};

CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue (
key text,
value_type text,
value_string text,
value_bool boolean,
value_long bigint,
value_double double,
value_binary blob,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.log (
ts bigint,
fields list<frozen<keyvalue>>,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref (
ref_type text,
trace_id blob,
span_id bigint,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.process (
service_name text,
tags list<frozen<keyvalue>>,
);

-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID.
-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table".
-- start_time is bigint instead of timestamp as we require microsecond precision
CREATE TABLE IF NOT EXISTS ${keyspace}.traces (
trace_id blob,
span_id bigint,
span_hash bigint,
parent_id bigint,
operation_name text,
flags int,
start_time bigint,
duration bigint,
tags list<frozen<keyvalue>>,
logs list<frozen<log>>,
refs list<frozen<span_ref>>,
process frozen<process>,
PRIMARY KEY (trace_id, span_id, span_hash)
)
WITH compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_names (
service_name text,
PRIMARY KEY (service_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names (
service_name text,
operation_name text,
PRIMARY KEY ((service_name), operation_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- index of trace IDs by service + operation names, sorted by span start_time.
CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index (
service_name text,
operation_name text,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index (
service_name text,
bucket int,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, bucket), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index (
service_name text, // service name
operation_name text, // operation name, or blank for queries without span name
bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour
duration bigint, // span duration, in microseconds
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id)
) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- a bucketing strategy may have to be added for tag queries
-- we can make this table even better by adding a timestamp to it
CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index (
service_name text,
tag_key text,
tag_value text,
start_time bigint,
trace_id blob,
span_id bigint,
PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id)
)
WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
parent text,
child text,
call_count bigint,
);

-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies (
ts timestamp,
date_bucket bigint,
dependencies list<frozen<dependency>>,
PRIMARY KEY (date_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${dependencies_ttl};