Skip to content

Commit

Permalink
Added tracing for TopicConnection for JMS
Browse files Browse the repository at this point in the history
fixes gh-1324
  • Loading branch information
marcingrzejszczak committed Apr 5, 2019
1 parent 58b2115 commit 1b493e6
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 82 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Expand Up @@ -244,12 +244,6 @@
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 0 additions & 5 deletions spring-cloud-sleuth-core/pom.xml
Expand Up @@ -336,11 +336,6 @@
<version>20.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Expand Up @@ -272,8 +272,8 @@ public Message<?> postReceive(Message<?> message, MessageChannel channel) {
headers.setImmutable();
if (message instanceof ErrorMessage) {
ErrorMessage errorMessage = (ErrorMessage) message;
return new ErrorMessage(errorMessage.getPayload(), headers.getMessageHeaders(),
errorMessage.getOriginalMessage());
return new ErrorMessage(errorMessage.getPayload(),
headers.getMessageHeaders(), errorMessage.getOriginalMessage());
}
return new GenericMessage<>(message.getPayload(), headers.getMessageHeaders());
}
Expand Down
Expand Up @@ -22,6 +22,8 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XAJMSContext;
Expand Down Expand Up @@ -74,6 +76,10 @@ public Object postProcessAfterInitialization(Object bean, String beanName)
return new LazyXAConnectionFactory(this.beanFactory,
(XAConnectionFactory) bean);
}
else if (bean instanceof TopicConnectionFactory) {
return new LazyTopicConnectionFactory(this.beanFactory,
(TopicConnectionFactory) bean);
}
else if (bean instanceof ConnectionFactory) {
return new LazyConnectionFactory(this.beanFactory, (ConnectionFactory) bean);
}
Expand Down Expand Up @@ -135,6 +141,73 @@ private XAConnectionFactory wrappedDelegate() {

}

class LazyTopicConnectionFactory implements TopicConnectionFactory {

private final BeanFactory beanFactory;

private final TopicConnectionFactory delegate;

private final LazyConnectionFactory factory;

private JmsTracing jmsTracing;

LazyTopicConnectionFactory(BeanFactory beanFactory, TopicConnectionFactory delegate) {
this.beanFactory = beanFactory;
this.delegate = delegate;
this.factory = new LazyConnectionFactory(beanFactory, delegate);
}

@Override
public TopicConnection createTopicConnection() throws JMSException {
return jmsTracing().topicConnection(this.delegate.createTopicConnection());
}

@Override
public TopicConnection createTopicConnection(String s, String s1)
throws JMSException {
return jmsTracing().topicConnection(this.delegate.createTopicConnection(s, s1));
}

@Override
public Connection createConnection() throws JMSException {
return this.factory.createConnection();
}

@Override
public Connection createConnection(String s, String s1) throws JMSException {
return this.factory.createConnection(s, s1);
}

@Override
public JMSContext createContext() {
return this.factory.createContext();
}

@Override
public JMSContext createContext(String s, String s1) {
return this.factory.createContext(s, s1);
}

@Override
public JMSContext createContext(String s, String s1, int i) {
return this.factory.createContext(s, s1, i);
}

@Override
public JMSContext createContext(int i) {
return this.factory.createContext(i);
}

private JmsTracing jmsTracing() {
if (this.jmsTracing != null) {
return this.jmsTracing;
}
this.jmsTracing = this.beanFactory.getBean(JmsTracing.class);
return this.jmsTracing;
}

}

class LazyConnectionFactory implements ConnectionFactory {

private final BeanFactory beanFactory;
Expand Down
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.stream.Collectors;

import javax.annotation.concurrent.NotThreadSafe;

import brave.Span;
import brave.Tracer;
import brave.sampler.Sampler;
Expand Down Expand Up @@ -50,6 +52,7 @@
@SpringBootTest(classes = SleuthSpanCreatorAspectMonoTests.TestConfiguration.class)
@RunWith(SpringRunner.class)
@DirtiesContext(methodMode = BEFORE_METHOD)
@NotThreadSafe
public class SleuthSpanCreatorAspectMonoTests {

@Autowired
Expand Down
Expand Up @@ -69,15 +69,6 @@ private void whenHystrixCommandAnnotatedMethodGetsExecuted() {
this.catcher.invokeLogicWrappedInHystrixCommand();
}

private void thenSpanInHystrixThreadIsContinued(final Span span) {
then(span).isNotNull();
Awaitility.await().atMost(5, SECONDS).untilAsserted(() -> {
then(HystrixAnnotationsIntegrationTests.this.catcher).isNotNull();
then(span.context().traceId()).isEqualTo(
HystrixAnnotationsIntegrationTests.this.catcher.getTraceId());
});
}

private void thenSpanInHystrixThreadIsCreated() {
Awaitility.await().atMost(5, SECONDS).untilAsserted(() -> {
then(HystrixAnnotationsIntegrationTests.this.catcher.getSpan()).isNotNull();
Expand Down Expand Up @@ -113,8 +104,10 @@ public HystrixCommandInvocationSpanCatcher(Tracing tracing) {

@HystrixCommand
public void invokeLogicWrappedInHystrixCommand() {
System.out.println("FOOO");
this.spanCaughtFromHystrixThread = new AtomicReference<>(
this.tracing.tracer().currentSpan());
System.out.println("aksdhkasd: " + this.spanCaughtFromHystrixThread);
}

public Long getTraceId() {
Expand Down
Expand Up @@ -27,6 +27,8 @@
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.resource.spi.ResourceAdapter;
Expand Down Expand Up @@ -110,6 +112,22 @@ static void checkXAConnection(AssertableApplicationContext ctx) throws JMSExcept
}
}

static void checkTopicConnection(AssertableApplicationContext ctx)
throws JMSException {
// Not using try-with-resources as that doesn't exist in JMS 1.1
TopicConnection con = ctx.getBean(TopicConnectionFactory.class)
.createTopicConnection();
try {
con.setExceptionListener(exception -> {
});
assertThat(con.getExceptionListener().getClass().getName())
.startsWith("brave.jms.TracingExceptionListener");
}
finally {
con.close();
}
}

@Test
public void tracesConnectionFactory() {
this.contextRunner.run(JmsTracingConfigurationTest::checkConnection);
Expand All @@ -124,6 +142,15 @@ public void tracesXAConnectionFactories() {
});
}

@Test
public void tracesTopicConnectionFactories() {
this.contextRunner.withUserConfiguration(XAConfiguration.class).run(ctx -> {
clearSpans(ctx);
checkConnection(ctx);
checkTopicConnection(ctx);
});
}

@Test
public void tracesListener_jmsMessageListener() {
this.contextRunner.withUserConfiguration(SimpleJmsListenerConfiguration.class)
Expand Down
Expand Up @@ -296,8 +296,8 @@ public void errorMessageOriginalMessageRetained() {
this.message = this.channel.receive();

assertThat(this.message).isNotNull();
assertThat(this.message)
.isInstanceOfSatisfying(ErrorMessage.class, errorMessage -> {
assertThat(this.message).isInstanceOfSatisfying(ErrorMessage.class,
errorMessage -> {
assertThat(errorMessage.getOriginalMessage())
.isSameAs(originalMessage);
assertThat(errorMessage.getHeaders().get("header"))
Expand Down
Expand Up @@ -19,10 +19,11 @@
import java.util.AbstractMap;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.NotThreadSafe;

import brave.Span;
import brave.Tracing;
import brave.sampler.Sampler;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
Expand Down Expand Up @@ -185,11 +186,9 @@ class TestBeanWithScheduledMethod {
this.tracing = tracing;
}

@Scheduled(fixedDelay = 1L)
@Scheduled(fixedDelay = 1000L)
public void scheduledMethod() {
log.info("Running the scheduled method");
this.span = this.tracing.tracer().currentSpan();
log.info("Stored the span " + this.span + " as current span");
this.executed.set(true);
}

Expand Down Expand Up @@ -258,7 +257,7 @@ class TestBeanWithScheduledMethodToBeIgnored {
this.tracing = tracing;
}

@Scheduled(fixedDelay = 1L)
@Scheduled(fixedDelay = 1000L)
public void scheduledMethodToIgnore() {
this.span = this.tracing.tracer().currentSpan();
this.executed.set(true);
Expand Down

0 comments on commit 1b493e6

Please sign in to comment.