Skip to content

Commit

Permalink
Updates, just need to test the schema upgrade thing
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Dec 18, 2018
1 parent 32fff9a commit 109c40f
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 636 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ StorageComponent storage(
ZipkinCassandraStorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocompleteKeys:}") List<String> autocompleteKeys) {
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ public void strictTraceId_canSetToFalse() {
}

@Test
public void tags_list() {
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra",
"zipkin.storage.autocompleteKeys:environment")
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerCassandra(context);
context.refresh();

assertThat(context.getBean(CassandraStorage.class).autocompleteKeys)
.containsOnly("environment");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ StorageComponent storage(
ZipkinCassandra3StorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocompleteKeys:}") List<String> autocompleteKeys) {
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ public void searchEnabled_canSetToFalse() {
}

@Test
public void tags_list() {
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra3",
"zipkin.storage.autocompleteKeys:environment",
"zipkin.storage.search-enabled:true")
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerCassandra3(context);
context.refresh();

assertThat(context.getBean(CassandraStorage.class).autocompleteKeys())
.containsOnly("environment");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@
final class CassandraSpanConsumer implements SpanConsumer {
static final int WRITTEN_NAMES_TTL =
Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);
static final long WRITTEN_TAGS_TTL =
Long.getLong("zipkin2.storage.cassandra.internal.writeTagsTtl", 60 * 60 * 1000);
final InsertTrace.Factory insertTrace;
final InsertServiceName.Factory insertServiceName;
final InsertSpanName.Factory insertSpanName;
final Schema.Metadata metadata;
final CompositeIndexer indexer;
final InsertAutocompleteValue.Factory insertTags;
final InsertAutocompleteValue.Factory insertAutocompleteValue;
final Set<String> autocompleteKeys;

CassandraSpanConsumer(CassandraStorage storage, CacheBuilderSpec indexCacheSpec) {
Expand All @@ -53,8 +51,8 @@ final class CassandraSpanConsumer implements SpanConsumer {
insertTrace = new InsertTrace.Factory(session, metadata, spanTtl);
insertServiceName = new InsertServiceName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
insertSpanName = new InsertSpanName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
insertAutocompleteValue = new InsertAutocompleteValue.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
indexer = new CompositeIndexer(session, indexCacheSpec, storage.bucketCount, indexTtl);
insertTags = new InsertAutocompleteValue.Factory(session, WRITTEN_TAGS_TTL);
autocompleteKeys = new LinkedHashSet<>(storage.autocompleteKeys);
}

Expand Down Expand Up @@ -105,7 +103,7 @@ public Call<Void> accept(List<Span> rawSpans) {
calls.add(insertSpanName.create(insert));
}
for (Map.Entry<String, String> entry : autocompleteTags) {
calls.add(insertTags.create(entry));
calls.add(insertAutocompleteValue.create(entry));
}
indexer.index(spansToIndex.build(), calls);
if (calls.size() == 1) return calls.get(0).map(r -> null);
Expand All @@ -115,9 +113,9 @@ public Call<Void> accept(List<Span> rawSpans) {
/** Clears any caches */
@VisibleForTesting
void clear() {
insertServiceName.cache.clear();
insertSpanName.cache.clear();
insertTags.cache.clear();
insertServiceName.clear();
insertSpanName.clear();
insertAutocompleteValue.clear();
indexer.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public final class CassandraSpanStore implements SpanStore {
private final SelectDependencies.Factory dependencies;
private final Call<List<String>> serviceNames;
private final SelectSpanNames.Factory spanNames;

private final SelectTraceIdTimestampFromServiceName.Factory selectTraceIdsByServiceName;
private final SelectTraceIdTimestampFromServiceNames.Factory selectTraceIdsByServiceNames;
private final SelectTraceIdTimestampFromServiceSpanName.Factory selectTraceIdsBySpanName;
Expand All @@ -70,6 +69,7 @@ public final class CassandraSpanStore implements SpanStore {
dependencies = new SelectDependencies.Factory(session);
spanNames = new SelectSpanNames.Factory(session);
serviceNames = new SelectServiceNames.Factory(session).create();

selectTraceIdsByServiceName =
new SelectTraceIdTimestampFromServiceName.Factory(session, timestampCodec, buckets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

/**
* Inserts index rows into Cassandra according to {@link IndexSupport} of a table. This skips
* entries that don't improve results based on {@link QueryRequest#endTs} and {@link
* QueryRequest#lookback}. For example, it doesn't insert rows that only vary on timestamp and exist
* between timestamps of existing rows.
* entries that don't improve results based on {@link QueryRequest#endTs()} and {@link
* QueryRequest#lookback()}. For example, it doesn't insert rows that only vary on timestamp and
* exist between timestamps of existing rows.
*/
final class Indexer {
static final Logger LOG = LoggerFactory.getLogger(Indexer.class);
Expand Down Expand Up @@ -118,7 +118,7 @@ public IndexCall clone() {
}

void index(List<V1Span> spans, List<Call<ResultSet>> calls) {
// First parse each span into partition getKeys used to support query requests
// First parse each span into partition keys used to support query requests
Builder<PartitionKeyToTraceId, Long> parsed = ImmutableSetMultimap.builder();
for (V1Span span : spans) {
Long timestamp = guessTimestamp(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,112 +18,57 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall;
import zipkin2.storage.cassandra.internal.call.DeduplicatingCall;

import static zipkin2.storage.cassandra.v1.Tables.TABLE_AUTOCOMPLETE_TAGS;

final class InsertAutocompleteValue extends ResultSetFutureCall {
final class InsertAutocompleteValue extends DeduplicatingCall<Map.Entry<String, String>> {

static class Factory {
static class Factory
extends DeduplicatingCall.Factory<Map.Entry<String, String>, InsertAutocompleteValue> {
final Session session;
final PreparedStatement preparedStatement;
final ConcurrentMap<Map.Entry<String, String>, InsertAutocompleteValue> cache;

Factory(Session session, long ttl) {
/**
* @param indexTtl how long cassandra will persist the rows
* @param redundantCallTtl how long in milliseconds to obviate redundant calls
*/
Factory(Session session, int indexTtl, int redundantCallTtl) {
super(redundantCallTtl);
this.session = session;
this.preparedStatement =
session.prepare(
QueryBuilder.insertInto(TABLE_AUTOCOMPLETE_TAGS)
.value("key", QueryBuilder.bindMarker("key"))
.value("value", QueryBuilder.bindMarker("value")));
this.cache =
CacheBuilder.newBuilder()
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.ticker(
new Ticker() {
@Override
public long read() {
return nanoTime();
}
})
// TODO: maximum size or weight
.<Map.Entry<String, String>, InsertAutocompleteValue>build()
.asMap();
Insert insertQuery = QueryBuilder.insertInto(TABLE_AUTOCOMPLETE_TAGS)
.value("key", QueryBuilder.bindMarker("key"))
.value("value", QueryBuilder.bindMarker("value"));
if (indexTtl > 0) insertQuery.using(QueryBuilder.ttl(indexTtl));
this.preparedStatement = session.prepare(insertQuery);
}

// visible for testing, since nanoTime is weird and can return negative
long nanoTime() {
return System.nanoTime();
}

Call<ResultSet> create(Map.Entry<String, String> input) {
if (input == null) throw new NullPointerException("input == null");
if (cache.containsKey(input)) return Call.create(null);
InsertAutocompleteValue realCall = new InsertAutocompleteValue(this, input);
if (cache.putIfAbsent(input, realCall) != null) return Call.create(null);
return realCall;
@Override protected InsertAutocompleteValue newCall(Map.Entry<String, String> input) {
return new InsertAutocompleteValue(this, input);
}
}

final Factory factory;
final Map.Entry<String, String> input;

InsertAutocompleteValue(Factory factory, Map.Entry<String, String> input) {
super(factory, input);
this.factory = factory;
this.input = input;
}

@Override
protected ResultSetFuture newFuture() {
@Override protected ResultSetFuture newFuture() {
return factory.session.executeAsync(factory.preparedStatement.bind()
.setString("key", input.getKey())
.setString("value", input.getValue()));
}

@Override
protected ResultSet doExecute() throws IOException {
try {
return super.doExecute();
} catch (IOException | RuntimeException | Error e) {
factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate
throw e;
}
}

@Override
protected void doEnqueue(Callback<ResultSet> callback) {
super.doEnqueue(
new Callback<ResultSet>() {
@Override
public void onSuccess(ResultSet value) {
callback.onSuccess(value);
}

@Override
public void onError(Throwable t) {
factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate
callback.onError(t);
}
});
}

@Override public String toString() {
return input.toString().replace("Input", "InsertAutocompleteValue");
}

@Override
protected void doCancel() {
factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate
super.doCancel();
return "InsertAutocompleteValue(" + input + ")";
}

@Override public Call<ResultSet> clone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import zipkin2.storage.cassandra.internal.call.DeduplicatingCall;

final class InsertServiceName extends DeduplicatingCall<String> {

Expand All @@ -32,15 +33,13 @@ static class Factory extends DeduplicatingCall.Factory<String, InsertServiceName
Factory(Session session, int indexTtl, int redundantCallTtl) {
super(redundantCallTtl);
this.session = session;
Insert insertQuery =
QueryBuilder.insertInto(Tables.SERVICE_NAMES)
.value("service_name", QueryBuilder.bindMarker("service_name"));
Insert insertQuery = QueryBuilder.insertInto(Tables.SERVICE_NAMES)
.value("service_name", QueryBuilder.bindMarker("service_name"));
if (indexTtl > 0) insertQuery.using(QueryBuilder.ttl(indexTtl));
this.preparedStatement = session.prepare(insertQuery);
}

@Override
InsertServiceName newCall(String input) {
@Override protected InsertServiceName newCall(String input) {
return new InsertServiceName(this, input);
}
}
Expand All @@ -54,19 +53,16 @@ InsertServiceName newCall(String input) {
this.service_name = service_name;
}

@Override
protected ResultSetFuture newFuture() {
@Override protected ResultSetFuture newFuture() {
return factory.session.executeAsync(
factory.preparedStatement.bind().setString("service_name", service_name));
factory.preparedStatement.bind().setString("service_name", service_name));
}

@Override
public String toString() {
@Override public String toString() {
return "InsertServiceName(" + service_name + ")";
}

@Override
public InsertServiceName clone() {
@Override public InsertServiceName clone() {
return new InsertServiceName(factory, service_name);
}
}

0 comments on commit 109c40f

Please sign in to comment.