From a19184fcf8dd083f84bd2e13c28918b612c98d45 Mon Sep 17 00:00:00 2001
From: Prithvi Raj
Date: Fri, 27 Apr 2018 15:25:23 -0400
Subject: [PATCH] Remove SASI indexes
- Use traditional indexes and a date bucket field instead
- This allows people to use older versions of cassandra
Signed-off-by: Prithvi Raj
---
.../cassandra/dependencystore/storage.go | 21 +-
.../cassandra/dependencystore/storage_test.go | 6 +-
.../cassandra/schema/migration/v001tov002.sh | 78 +++++++
plugin/storage/cassandra/schema/v002.cql.tmpl | 202 ++++++++++++++++++
4 files changed, 300 insertions(+), 7 deletions(-)
create mode 100755 plugin/storage/cassandra/schema/migration/v001tov002.sh
create mode 100644 plugin/storage/cassandra/schema/v002.cql.tmpl
diff --git a/plugin/storage/cassandra/dependencystore/storage.go b/plugin/storage/cassandra/dependencystore/storage.go
index 72c3c470548..44bd110cec0 100644
--- a/plugin/storage/cassandra/dependencystore/storage.go
+++ b/plugin/storage/cassandra/dependencystore/storage.go
@@ -15,6 +15,8 @@
package dependencystore
import (
+ "fmt"
+ "strings"
"time"
"github.com/pkg/errors"
@@ -27,8 +29,10 @@ import (
)
const (
- depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
- depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
+ depsInsertStmt = "INSERT INTO dependencies(ts, date_bucket, dependencies) VALUES (?, ?, ?)"
+ depsSelectFmt = "SELECT ts, dependencies FROM dependencies WHERE date_bucket IN (%s) AND ts >= ? AND ts < ?"
+ dateFmt = "20060102"
+ day = 24 * time.Hour
)
// DependencyStore handles all queries and insertions to Cassandra dependencies
@@ -64,13 +68,14 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
CallCount: int64(d.CallCount),
}
}
- query := s.session.Query(depsInsertStmt, ts, ts, deps)
+ query := s.session.Query(depsInsertStmt, ts, ts.Format(dateFmt), deps)
return s.dependenciesTableMetrics.Exec(query, s.logger)
}
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
- query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
+ startTs := endTs.Add(-1 * lookback)
+ query := s.session.Query(getDepSelectString(startTs, endTs), startTs, endTs)
iter := query.Consistency(cassandra.One).Iter()
var mDependency []model.DependencyLink
@@ -93,6 +98,14 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
return mDependency, nil
}
+func getDepSelectString(startTs time.Time, endTs time.Time) string {
+ var dateBuckets []string
+ for ts := startTs.Truncate(day); ts.Before(endTs); ts = ts.Add(day) {
+ dateBuckets = append(dateBuckets, ts.Format(dateFmt))
+ }
+ return fmt.Sprintf(depsSelectFmt, strings.Join(dateBuckets, ","))
+}
+
func (s *DependencyStore) timeIntervalToPoints(endTs time.Time, lookback time.Duration) []time.Time {
startTs := endTs.Add(-lookback)
var days []time.Time
diff --git a/plugin/storage/cassandra/dependencystore/storage_test.go b/plugin/storage/cassandra/dependencystore/storage_test.go
index 168f499029b..788f666ea0f 100644
--- a/plugin/storage/cassandra/dependencystore/storage_test.go
+++ b/plugin/storage/cassandra/dependencystore/storage_test.go
@@ -86,10 +86,10 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
- if d, ok := args[1].(time.Time); ok {
- assert.Equal(t, ts, d)
+ if d, ok := args[1].(string); ok {
+ assert.Equal(t, ts.Format(dateFmt), d)
} else {
- assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args)
+ assert.Fail(t, "expecting second arg as string", "received: %+v", args)
}
if d, ok := args[2].([]Dependency); ok {
assert.Equal(t, []Dependency{
diff --git a/plugin/storage/cassandra/schema/migration/v001tov002.sh b/plugin/storage/cassandra/schema/migration/v001tov002.sh
new file mode 100755
index 00000000000..b825c7312a0
--- /dev/null
+++ b/plugin/storage/cassandra/schema/migration/v001tov002.sh
@@ -0,0 +1,78 @@
+#!/usr/bin/env bash
+
+function usage {
+ >&2 echo "Error: $1"
+ >&2 echo ""
+ >&2 echo "Usage: KEYSPACE={keyspace} $0"
+ >&2 echo ""
+ >&2 echo "The following parameters can be set via environment:"
+ >&2 echo " KEYSPACE - keyspace "
+ >&2 echo ""
+ exit 1
+}
+
+confirm() {
+ read -r -p "${1:-Are you sure? [y/N]} " response
+ case "$response" in
+ [yY][eE][sS]|[yY])
+ true
+ ;;
+ *)
+ exit 1
+ ;;
+ esac
+}
+
+keyspace=${KEYSPACE}
+
+if [[ ${keyspace} == "" ]]; then
+ usage "missing KEYSPACE parameter"
+fi
+
+if [[ ${keyspace} =~ [^a-zA-Z0-9_] ]]; then
+ usage "invalid characters in KEYSPACE=$keyspace parameter, please use letters, digits or underscores"
+fi
+
+
+row_count=$(cqlsh -e "select count(*) from $keyspace.dependencies;"|head -4|tail -1| tr -d ' ')
+
+echo "About to copy $row_count rows."
+confirm
+
+cqlsh -e "COPY $keyspace.dependencies (ts, dependencies ) to 'dependencies.csv'"
+
+if [ ! -f dependencies.csv ]; then
+ echo "Could not find dependencies.csv. Backup from cassandra was probably not successful"
+ exit 1
+fi
+
+if [ ${row_count} -ne $(wc -l dependencies.csv | cut -f 1 -d ' ') ]; then
+ echo "Number of rows in file is not equal to number of rows in cassandra"
+ exit 1
+fi
+
+
+while IFS="," read ts dependency; do bucket=`date +"%Y%m%d" -d "$ts"`; echo "$bucket,$ts,$dependency"; done < dependencies.csv > dependencies_datebucket.csv
+
+dependencies_ttl=$(cqlsh -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='dependencies';"|head -4|tail -1|tr -d ' ')
+
+echo "Setting dependencies_ttl to $dependencies_ttl"
+
+cqlsh -e "DROP INDEX IF EXISTS $keyspace.ts_index"
+cqlsh -e "DROP TABLE IF EXISTS $keyspace.dependencies"
+
+cqlsh -e "CREATE TABLE $keyspace.dependencies (
+ ts timestamp,
+ date_bucket bigint,
+ dependencies list>,
+ PRIMARY KEY (date_bucket, ts)
+) WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {
+ 'min_threshold': '4',
+ 'max_threshold': '32',
+ 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
+ }
+ AND default_time_to_live = $dependencies_ttl
+"
+
+cqlsh -e "COPY $keyspace.dependencies (date_bucket, ts, dependencies) FROM 'dependencies_datebucket.csv'"
diff --git a/plugin/storage/cassandra/schema/v002.cql.tmpl b/plugin/storage/cassandra/schema/v002.cql.tmpl
new file mode 100644
index 00000000000..4f09b93bc07
--- /dev/null
+++ b/plugin/storage/cassandra/schema/v002.cql.tmpl
@@ -0,0 +1,202 @@
+--
+-- Creates Cassandra keyspace with tables for traces and dependencies.
+--
+-- Required parameters:
+--
+-- keyspace
+-- name of the keyspace
+-- replication
+-- replication strategy for the keyspace, such as
+-- for prod environments
+-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' }
+-- for test environments
+-- {'class': 'SimpleStrategy', 'replication_factor': '1'}
+-- trace_ttl
+-- default time to live for trace data, in seconds
+-- dependencies_ttl
+-- default time to live for dependencies data, in seconds (0 for no TTL)
+--
+-- Non-configurable settings:
+-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/
+-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html
+
+CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};
+
+CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue (
+ key text,
+ value_type text,
+ value_string text,
+ value_bool boolean,
+ value_long bigint,
+ value_double double,
+ value_binary blob,
+);
+
+CREATE TYPE IF NOT EXISTS ${keyspace}.log (
+ ts bigint,
+ fields list>,
+);
+
+CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref (
+ ref_type text,
+ trace_id blob,
+ span_id bigint,
+);
+
+CREATE TYPE IF NOT EXISTS ${keyspace}.process (
+ service_name text,
+ tags list>,
+);
+
+-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID.
+-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table".
+-- start_time is bigint instead of timestamp as we require microsecond precision
+CREATE TABLE IF NOT EXISTS ${keyspace}.traces (
+ trace_id blob,
+ span_id bigint,
+ span_hash bigint,
+ parent_id bigint,
+ operation_name text,
+ flags int,
+ start_time bigint,
+ duration bigint,
+ tags list>,
+ logs list>,
+ refs list>,
+ process frozen,
+ PRIMARY KEY (trace_id, span_id, span_hash)
+)
+ WITH compaction = {
+ 'compaction_window_size': '1',
+ 'compaction_window_unit': 'HOURS',
+ 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+CREATE TABLE IF NOT EXISTS ${keyspace}.service_names (
+ service_name text,
+ PRIMARY KEY (service_name)
+)
+ WITH compaction = {
+ 'min_threshold': '4',
+ 'max_threshold': '32',
+ 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names (
+ service_name text,
+ operation_name text,
+ PRIMARY KEY ((service_name), operation_name)
+)
+ WITH compaction = {
+ 'min_threshold': '4',
+ 'max_threshold': '32',
+ 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+-- index of trace IDs by service + operation names, sorted by span start_time.
+CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index (
+ service_name text,
+ operation_name text,
+ start_time bigint,
+ trace_id blob,
+ PRIMARY KEY ((service_name, operation_name), start_time)
+) WITH CLUSTERING ORDER BY (start_time DESC)
+ AND compaction = {
+ 'compaction_window_size': '1',
+ 'compaction_window_unit': 'HOURS',
+ 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index (
+ service_name text,
+ bucket int,
+ start_time bigint,
+ trace_id blob,
+ PRIMARY KEY ((service_name, bucket), start_time)
+) WITH CLUSTERING ORDER BY (start_time DESC)
+ AND compaction = {
+ 'compaction_window_size': '1',
+ 'compaction_window_unit': 'HOURS',
+ 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index (
+ service_name text, // service name
+ operation_name text, // operation name, or blank for queries without span name
+ bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour
+ duration bigint, // span duration, in microseconds
+ start_time bigint,
+ trace_id blob,
+ PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id)
+) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC)
+ AND compaction = {
+ 'compaction_window_size': '1',
+ 'compaction_window_unit': 'HOURS',
+ 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+-- a bucketing strategy may have to be added for tag queries
+-- we can make this table even better by adding a timestamp to it
+CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index (
+ service_name text,
+ tag_key text,
+ tag_value text,
+ start_time bigint,
+ trace_id blob,
+ span_id bigint,
+ PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id)
+)
+ WITH CLUSTERING ORDER BY (start_time DESC)
+ AND compaction = {
+ 'compaction_window_size': '1',
+ 'compaction_window_unit': 'HOURS',
+ 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
+ }
+ AND dclocal_read_repair_chance = 0.0
+ AND default_time_to_live = ${trace_ttl}
+ AND speculative_retry = 'NONE'
+ AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes
+
+CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
+ parent text,
+ child text,
+ call_count bigint,
+);
+
+-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
+CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies (
+ ts timestamp,
+ date_bucket bigint,
+ dependencies list>,
+ PRIMARY KEY (date_bucket, ts)
+) WITH CLUSTERING ORDER BY (ts DESC)
+ AND compaction = {
+ 'min_threshold': '4',
+ 'max_threshold': '32',
+ 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
+ }
+ AND default_time_to_live = ${dependencies_ttl};