Skip to content

Commit

Permalink
Merge 7d98f69 into 4106c29
Browse files Browse the repository at this point in the history
  • Loading branch information
vprithvi committed May 7, 2018
2 parents 4106c29 + 7d98f69 commit 27ee3c5
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 7 deletions.
21 changes: 17 additions & 4 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dependencystore

import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -27,8 +29,10 @@ import (
)

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsInsertStmt = "INSERT INTO dependencies(ts, date_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectFmt = "SELECT ts, dependencies FROM dependencies WHERE date_bucket IN (%s) AND ts >= ? AND ts < ?"
dateFmt = "20060102"
day = 24 * time.Hour
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
Expand Down Expand Up @@ -64,13 +68,14 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
CallCount: int64(d.CallCount),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)
query := s.session.Query(depsInsertStmt, ts, ts.Format(dateFmt), deps)
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
query := s.session.Query(getDepSelectString(startTs, endTs), startTs, endTs)
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
Expand All @@ -93,6 +98,14 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
return mDependency, nil
}

func getDepSelectString(startTs time.Time, endTs time.Time) string {
var dateBuckets []string
for ts := startTs.Truncate(day); ts.Before(endTs); ts = ts.Add(day) {
dateBuckets = append(dateBuckets, ts.Format(dateFmt))
}
return fmt.Sprintf(depsSelectFmt, strings.Join(dateBuckets, ","))
}

func (s *DependencyStore) timeIntervalToPoints(endTs time.Time, lookback time.Duration) []time.Time {
startTs := endTs.Add(-lookback)
var days []time.Time
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
if d, ok := args[1].(time.Time); ok {
assert.Equal(t, ts, d)
if d, ok := args[1].(string); ok {
assert.Equal(t, ts.Format(dateFmt), d)
} else {
assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args)
assert.Fail(t, "expecting second arg as string", "received: %+v", args)
}
if d, ok := args[2].([]Dependency); ok {
assert.Equal(t, []Dependency{
Expand Down
78 changes: 78 additions & 0 deletions plugin/storage/cassandra/schema/migration/v001tov002.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env bash

function usage {
>&2 echo "Error: $1"
>&2 echo ""
>&2 echo "Usage: KEYSPACE={keyspace} $0"
>&2 echo ""
>&2 echo "The following parameters can be set via environment:"
>&2 echo " KEYSPACE - keyspace "
>&2 echo ""
exit 1
}

confirm() {
read -r -p "${1:-Are you sure? [y/N]} " response
case "$response" in
[yY][eE][sS]|[yY])
true
;;
*)
exit 1
;;
esac
}

keyspace=${KEYSPACE}

if [[ ${keyspace} == "" ]]; then
usage "missing KEYSPACE parameter"
fi

if [[ ${keyspace} =~ [^a-zA-Z0-9_] ]]; then
usage "invalid characters in KEYSPACE=$keyspace parameter, please use letters, digits or underscores"
fi


row_count=$(cqlsh -e "select count(*) from $keyspace.dependencies;"|head -4|tail -1| tr -d ' ')

echo "About to copy $row_count rows."
confirm

cqlsh -e "COPY $keyspace.dependencies (ts, dependencies ) to 'dependencies.csv'"

if [ ! -f dependencies.csv ]; then
echo "Could not find dependencies.csv. Backup from cassandra was probably not successful"
exit 1
fi

if [ ${row_count} -ne $(wc -l dependencies.csv | cut -f 1 -d ' ') ]; then
echo "Number of rows in file is not equal to number of rows in cassandra"
exit 1
fi


while IFS="," read ts dependency; do bucket=`date +"%Y%m%d" -d "$ts"`; echo "$bucket,$ts,$dependency"; done < dependencies.csv > dependencies_datebucket.csv

dependencies_ttl=$(cqlsh -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='dependencies';"|head -4|tail -1|tr -d ' ')

echo "Setting dependencies_ttl to $dependencies_ttl"

cqlsh -e "DROP INDEX IF EXISTS $keyspace.ts_index"
cqlsh -e "DROP TABLE IF EXISTS $keyspace.dependencies"

cqlsh -e "CREATE TABLE $keyspace.dependencies (
ts timestamp,
date_bucket bigint,
dependencies list<frozen<dependency>>,
PRIMARY KEY (date_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = $dependencies_ttl
"

cqlsh -e "COPY $keyspace.dependencies (date_bucket, ts, dependencies) FROM 'dependencies_datebucket.csv'"
202 changes: 202 additions & 0 deletions plugin/storage/cassandra/schema/v002.cql.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
--
-- Creates Cassandra keyspace with tables for traces and dependencies.
--
-- Required parameters:
--
-- keyspace
-- name of the keyspace
-- replication
-- replication strategy for the keyspace, such as
-- for prod environments
-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' }
-- for test environments
-- {'class': 'SimpleStrategy', 'replication_factor': '1'}
-- trace_ttl
-- default time to live for trace data, in seconds
-- dependencies_ttl
-- default time to live for dependencies data, in seconds (0 for no TTL)
--
-- Non-configurable settings:
-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/
-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html

CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};

CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue (
key text,
value_type text,
value_string text,
value_bool boolean,
value_long bigint,
value_double double,
value_binary blob,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.log (
ts bigint,
fields list<frozen<keyvalue>>,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref (
ref_type text,
trace_id blob,
span_id bigint,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.process (
service_name text,
tags list<frozen<keyvalue>>,
);

-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID.
-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table".
-- start_time is bigint instead of timestamp as we require microsecond precision
CREATE TABLE IF NOT EXISTS ${keyspace}.traces (
trace_id blob,
span_id bigint,
span_hash bigint,
parent_id bigint,
operation_name text,
flags int,
start_time bigint,
duration bigint,
tags list<frozen<keyvalue>>,
logs list<frozen<log>>,
refs list<frozen<span_ref>>,
process frozen<process>,
PRIMARY KEY (trace_id, span_id, span_hash)
)
WITH compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_names (
service_name text,
PRIMARY KEY (service_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names (
service_name text,
operation_name text,
PRIMARY KEY ((service_name), operation_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- index of trace IDs by service + operation names, sorted by span start_time.
CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index (
service_name text,
operation_name text,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index (
service_name text,
bucket int,
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, bucket), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index (
service_name text, // service name
operation_name text, // operation name, or blank for queries without span name
bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour
duration bigint, // span duration, in microseconds
start_time bigint,
trace_id blob,
PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id)
) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- a bucketing strategy may have to be added for tag queries
-- we can make this table even better by adding a timestamp to it
CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index (
service_name text,
tag_key text,
tag_value text,
start_time bigint,
trace_id blob,
span_id bigint,
PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id)
)
WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
parent text,
child text,
call_count bigint,
);

-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies (
ts timestamp,
date_bucket bigint,
dependencies list<frozen<dependency>>,
PRIMARY KEY (date_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${dependencies_ttl};

0 comments on commit 27ee3c5

Please sign in to comment.