Skip to content

Commit

Permalink
Merge 50741ff into 20c62aa
Browse files Browse the repository at this point in the history
  • Loading branch information
orange-buffalo committed Oct 27, 2020
2 parents 20c62aa + 50741ff commit 922f8d9
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 19 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -208,6 +208,11 @@ public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

// Use an aspect to decorate @KafkaListeners
@Bean
public TracingKafkaAspect tracingKafkaAspect() {
return new TracingKafkaAspect(tracer());
}
```

##### Custom Span Names for Spring Kafka
Expand Down
9 changes: 8 additions & 1 deletion opentracing-kafka-spring/pom.xml
Expand Up @@ -34,6 +34,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
Expand All @@ -54,4 +61,4 @@
</dependency>

</dependencies>
</project>
</project>
@@ -0,0 +1,71 @@
/*
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.kafka.spring;

import io.opentracing.*;
import io.opentracing.contrib.kafka.TracingKafkaUtils;
import io.opentracing.tag.Tags;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.clients.consumer.ConsumerRecord;

class MessageListenerMethodInterceptor implements MethodInterceptor {

private static final String SPAN_PREFIX = "KafkaListener_";

private final Tracer tracer;

MessageListenerMethodInterceptor(Tracer tracer) {
this.tracer = tracer;
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!"onMessage".equals(invocation.getMethod().getName())) {
return invocation.proceed();
}
Object[] arguments = invocation.getArguments();
ConsumerRecord<?, ?> record = getConsumerRecord(arguments);
if (record == null) {
return invocation.proceed();
}

Tracer.SpanBuilder spanBuilder = tracer.buildSpan(SPAN_PREFIX + record.topic())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER);

SpanContext parentContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
if (parentContext != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, parentContext);
}
Span span = spanBuilder.start();
try (Scope ignored = tracer.activateSpan(span)) {
return invocation.proceed();
} catch (Exception e) {
Tags.ERROR.set(span, Boolean.TRUE);
throw e;
} finally {
span.finish();
}
}

private ConsumerRecord<?, ?> getConsumerRecord(Object[] arguments) {
for (Object object : arguments) {
if (object instanceof ConsumerRecord) {
return (ConsumerRecord<?, ?>) object;
}
}
return null;
}

}
@@ -0,0 +1,69 @@
/*
* Copyright 2017-2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.contrib.kafka.spring;

import io.opentracing.Tracer;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;

/**
* Wraps a {@link MessageListener} into a tracing proxy,
* to support {@link org.springframework.kafka.annotation.KafkaListener} beans.
* <p>
* A port of Spring Sleuth implementation.
*/
@Aspect
public class TracingKafkaAspect {
private final Tracer tracer;

public TracingKafkaAspect(Tracer tracer) {
this.tracer = tracer;
}

@Pointcut("execution(public * org.springframework.kafka.config.KafkaListenerContainerFactory.createListenerContainer(..))")
private void anyCreateListenerContainer() {
}

@Pointcut("execution(public * org.springframework.kafka.config.KafkaListenerContainerFactory.createContainer(..))")
private void anyCreateContainer() {
}

@Around("anyCreateListenerContainer() || anyCreateContainer()")
public Object wrapListenerContainerCreation(ProceedingJoinPoint pjp) throws Throwable {
MessageListenerContainer listener = (MessageListenerContainer) pjp.proceed();
if (listener instanceof AbstractMessageListenerContainer) {
AbstractMessageListenerContainer<?, ?> container = (AbstractMessageListenerContainer<?, ?>) listener;
Object someMessageListener = container.getContainerProperties().getMessageListener();
if (someMessageListener instanceof MessageListener) {
container.setupMessageListener(createProxy(someMessageListener));
}
}
return listener;
}

Object createProxy(Object bean) {
ProxyFactoryBean factory = new ProxyFactoryBean();
factory.setProxyTargetClass(true);
factory.addAdvice(new MessageListenerMethodInterceptor(this.tracer));
factory.setTarget(bean);
return factory.getObject();
}

}
Expand Up @@ -26,7 +26,6 @@ public class Listener {

@KafkaListener(topics = "spring")
public void listen(String message) {
// TODO: no active span
System.out.println("active span: " + tracer.activeSpan());
System.out.println(message);
}
Expand Down
Expand Up @@ -13,25 +13,24 @@
*/
package io.opentracing.contrib.kafka.spring;

import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.embeddedKafka;

import io.opentracing.mock.MockTracer;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.util.Map;

import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.embeddedKafka;

@Configuration
@EnableKafka
@ComponentScan
@EnableAspectJAutoProxy
public class TestConfiguration {

@Bean
Expand Down Expand Up @@ -68,4 +67,9 @@ public ProducerFactory<Integer, String> producerFactory() {
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public TracingKafkaAspect tracingKafkaAspect() {
return new TracingKafkaAspect(tracer());
}
}
Expand Up @@ -13,15 +13,10 @@
*/
package io.opentracing.contrib.kafka.spring;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;

import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -32,6 +27,15 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {TestConfiguration.class})
public class TracingSpringKafkaTest {
Expand All @@ -54,13 +58,35 @@ public void before() {
public void test() {
kafkaTemplate.send("spring", "message");

await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), equalTo(2));
await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), greaterThanOrEqualTo(3));

List<MockSpan> spans = mockTracer.finishedSpans();
assertEquals(2, spans.size());
assertThat(spans, contains(
new SpanMatcher("To_spring"),
new SpanMatcher("From_spring"),
new SpanMatcher("KafkaListener_spring")));
}

private Callable<Integer> reportedSpansSize() {
return () -> mockTracer.finishedSpans().size();
}

private static class SpanMatcher extends BaseMatcher<MockSpan> {

private final String operationName;

private SpanMatcher(String operationName) {
this.operationName = operationName;
}

@Override
public boolean matches(Object actual) {
return actual instanceof MockSpan && operationName.equals(((MockSpan) actual).operationName());
}

@Override
public void describeTo(Description description) {
description.appendText(operationName);
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -69,6 +69,7 @@
<opentracing.version>0.33.0</opentracing.version>
<kafka.version>2.5.0</kafka.version>
<spring.kafka.version>2.5.0.RELEASE</spring.kafka.version>
<spring.version>5.2.7.RELEASE</spring.version>
<coveralls-maven-plugin.version>4.3.0</coveralls-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
</properties>
Expand Down

0 comments on commit 922f8d9

Please sign in to comment.