diff --git a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanConsumer.java b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanConsumer.java index bf1e1bf2456..bc4ef5ee3ab 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanConsumer.java @@ -31,7 +31,9 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import zipkin.Annotation; import zipkin.Codec; +import zipkin.Constants; import zipkin.Span; import zipkin.internal.Nullable; import zipkin.internal.Pair; @@ -112,9 +114,12 @@ public ListenableFuture accept(List rawSpans) { Long timestamp = guessTimestamp(span); spans.add(span); + boolean isServerRecvSpan = isServerRecvSpan(span); + futures.add(storeSpan( span.traceId, timestamp != null ? timestamp : 0L, + isServerRecvSpan, String.format("%s%d_%d_%d", span.traceIdHigh == 0 ? "" : span.traceIdHigh + "_", span.id, @@ -136,12 +141,24 @@ public ListenableFuture accept(List rawSpans) { return transform(Futures.allAsList(futures.build()), TO_VOID); } + private static boolean isServerRecvSpan(Span span) { + for (int i = 0, length = span.annotations.size(); i < length; i++) { + Annotation annotation = span.annotations.get(i); + if (annotation.value.equals(Constants.SERVER_RECV)) { + return true; + } + } + return false; + } + /** * Store the span in the underlying storage for later retrieval. */ - ListenableFuture storeSpan(long traceId, long timestamp, String key, ByteBuffer span) { + ListenableFuture storeSpan(long traceId, long timestamp, boolean isServerRecvSpan, String key, ByteBuffer span) { try { - if (0 == timestamp && metadata.compactionClass.contains("DateTieredCompactionStrategy")) { + // If we couldn't guess the timestamp, that probably means that there was a missing timestamp. + // However, tracers are supposed to put a timestamp *only* on the span originator (not on SR annotation) + if (0 == timestamp && !isServerRecvSpan && metadata.compactionClass.contains("DateTieredCompactionStrategy")) { LOG.warn("Span {} in trace {} had no timestamp. " + "If this happens a lot consider switching back to SizeTieredCompactionStrategy for " + "{}.traces", key, traceId, session.getLoggedKeyspace()); diff --git a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanConsumerTest.java b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanConsumerTest.java index 65252e408fd..d294591f0bc 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanConsumerTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanConsumerTest.java @@ -13,11 +13,18 @@ */ package zipkin.storage.cassandra; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.LoggingEvent; +import ch.qos.logback.core.Appender; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import java.util.stream.IntStream; + +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.slf4j.LoggerFactory; import zipkin.Annotation; import zipkin.Constants; import zipkin.Span; @@ -25,10 +32,19 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static zipkin.Constants.CLIENT_RECV; +import static zipkin.Constants.CLIENT_SEND; +import static zipkin.TestObjects.APP_ENDPOINT; public class CassandraSpanConsumerTest { private final CassandraStorage storage; + private final Appender mockAppender = mock(Appender.class); public CassandraSpanConsumerTest() { this.storage = CassandraTestGraph.INSTANCE.storage.get(); @@ -37,6 +53,15 @@ public CassandraSpanConsumerTest() { @Before public void clear() { storage.clear(); + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + when(mockAppender.getName()).thenReturn(CassandraSpanConsumerTest.class.getName()); + root.addAppender(mockAppender); + } + + @After + public void tearDown() { + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + root.detachAppender(mockAppender); } /** @@ -61,6 +86,31 @@ public void doesntIndexCoreOrNonStringAnnotations() { assertThat(rowCount(Tables.ANNOTATIONS_INDEX)).isZero(); } + @Test + public void logTimestampMissingOnClientSend() { + Span span = Span.builder().traceId(1L).parentId(1L).id(2L).name("query") + .addAnnotation(Annotation.create(0L, CLIENT_SEND, APP_ENDPOINT)) + .addAnnotation(Annotation.create(0L, CLIENT_RECV, APP_ENDPOINT)).build(); + accept(span); + verify(mockAppender).doAppend(considerSwitchStrategyLog()); + } + + @Test + public void dontLogTimestampMissingOnMidTierServerSpan() { + Span span = TestObjects.TRACE.get(0); + accept(span); + verify(mockAppender, never()).doAppend(considerSwitchStrategyLog()); + } + + private static Object considerSwitchStrategyLog() { + return argThat(new ArgumentMatcher() { + @Override + public boolean matches(final Object argument) { + return ((LoggingEvent)argument).getFormattedMessage().contains("If this happens a lot consider switching back to SizeTieredCompactionStrategy"); + } + }); + } + /** * Simulates a trace with a step pattern, where each span starts a millisecond after the prior * one. The consumer code optimizes index inserts to only represent the interval represented by diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java index b40bb3bdb5e..2f19036e82a 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import zipkin.Annotation; import zipkin.BinaryAnnotation; +import zipkin.Constants; import zipkin.Span; import zipkin.storage.cassandra3.Schema.AnnotationUDT; import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT; @@ -102,7 +103,8 @@ public ListenableFuture accept(List rawSpans) { // indexing occurs by timestamp, so derive one if not present. Long timestamp = guessTimestamp(span); TraceIdUDT traceId = new TraceIdUDT(span.traceIdHigh, span.traceId); - futures.add(storeSpan(span, traceId, timestamp)); + boolean isServerRecvSpan = isServerRecvSpan(span); + futures.add(storeSpan(span, traceId, isServerRecvSpan, timestamp)); for (String serviceName : span.serviceNames()) { // QueryRequest.min/maxDuration @@ -121,12 +123,23 @@ public ListenableFuture accept(List rawSpans) { return transform(Futures.allAsList(futures.build()), TO_VOID); } + private static boolean isServerRecvSpan(Span span) { + for (int i = 0, length = span.annotations.size(); i < length; i++) { + Annotation annotation = span.annotations.get(i); + if (annotation.value.equals(Constants.SERVER_RECV)) { + return true; + } + } + return false; + } + /** * Store the span in the underlying storage for later retrieval. */ - ListenableFuture storeSpan(Span span, TraceIdUDT traceId, Long timestamp) { + ListenableFuture storeSpan(Span span, TraceIdUDT traceId, boolean isServerRecvSpan, Long timestamp) { try { if ((null == timestamp || 0 == timestamp) + && !isServerRecvSpan && metadata.compactionClass.contains("TimeWindowCompactionStrategy")) { LOG.warn("Span {} in trace {} had no timestamp. " @@ -145,7 +158,7 @@ ListenableFuture storeSpan(Span span, TraceIdUDT traceId, Long timestamp) { Set annotationKeys = CassandraUtil.annotationKeys(span); if (!strictTraceId && traceId.getHigh() != 0L) { - storeSpan(span, new TraceIdUDT(0L, traceId.getLow()), timestamp); + storeSpan(span, new TraceIdUDT(0L, traceId.getLow()), isServerRecvSpan, timestamp); } BoundStatement bound = bindWithName(insertSpan, "insert-span") diff --git a/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/CassandraSpanConsumerTest.java b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/CassandraSpanConsumerTest.java index 481cfe27be0..e1e0413fd96 100644 --- a/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/CassandraSpanConsumerTest.java +++ b/zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/CassandraSpanConsumerTest.java @@ -13,21 +13,37 @@ */ package zipkin.storage.cassandra3; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.LoggingEvent; +import ch.qos.logback.core.Appender; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import java.util.stream.IntStream; + +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.slf4j.LoggerFactory; import zipkin.Annotation; import zipkin.Span; import zipkin.TestObjects; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static zipkin.Constants.CLIENT_RECV; +import static zipkin.Constants.CLIENT_SEND; +import static zipkin.TestObjects.APP_ENDPOINT; public class CassandraSpanConsumerTest { private final Cassandra3Storage storage; + private final Appender mockAppender = mock(Appender.class); public CassandraSpanConsumerTest() { this.storage = Cassandra3TestGraph.INSTANCE.storage.get(); @@ -36,6 +52,15 @@ public CassandraSpanConsumerTest() { @Before public void clear() { storage.clear(); + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + when(mockAppender.getName()).thenReturn(CassandraSpanConsumerTest.class.getName()); + root.addAppender(mockAppender); + } + + @After + public void tearDown() { + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + root.detachAppender(mockAppender); } /** @@ -51,6 +76,31 @@ public void doesntIndexSpansMissingDuration() { assertThat(rowCount(Schema.TABLE_TRACE_BY_SERVICE_SPAN)).isZero(); } + @Test + public void logTimestampMissingOnClientSend() { + Span span = Span.builder().traceId(1L).parentId(1L).id(2L).name("query") + .addAnnotation(Annotation.create(0L, CLIENT_SEND, APP_ENDPOINT)) + .addAnnotation(Annotation.create(0L, CLIENT_RECV, APP_ENDPOINT)).build(); + accept(span); + verify(mockAppender).doAppend(considerSwitchStrategyLog()); + } + + @Test + public void dontLogTimestampMissingOnMidTierServerSpan() { + Span span = TestObjects.TRACE.get(0); + accept(span); + verify(mockAppender, never()).doAppend(considerSwitchStrategyLog()); + } + + private static Object considerSwitchStrategyLog() { + return argThat(new ArgumentMatcher() { + @Override + public boolean matches(final Object argument) { + return ((LoggingEvent)argument).getFormattedMessage().contains("If this happens a lot consider switching back to SizeTieredCompactionStrategy"); + } + }); + } + /** * Simulates a trace with a step pattern, where each span starts a millisecond after the prior * one. The consumer code optimizes index inserts to only represent the interval represented by