Skip to content

Commit

Permalink
Merge 7fdec0d into 2fd5487
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlosPanarello committed May 4, 2019
2 parents 2fd5487 + 7fdec0d commit 9567b08
Showing 1 changed file with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,43 @@
package io.opentracing.contrib.kafka;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import io.opentracing.Scope;
import io.opentracing.SpanContext;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import static org.junit.Assert.*;

public class TracingKafkaTest {

@ClassRule
Expand Down Expand Up @@ -213,6 +213,56 @@ public void nullKey() throws Exception {
producer.close();
}

@Test
public void testSeekInConsumerAndCloseInProducer() throws InterruptedException{

Producer<Integer, String> producer = createTracingProducer();

// Send 1
producer.send(new ProducerRecord<>("messages-for-seek", 1, "test"));

producer.close(Duration.ofSeconds(40));

final CountDownLatch latch = new CountDownLatch(1);
Integer key = 1;

ExecutorService executorService = Executors.newSingleThreadExecutor();

final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");

executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
Consumer<Integer, String> consumer;

consumer = new TracingKafkaConsumer<>(kafkaConsumer, mockTracer, null);

TopicPartition tp = new TopicPartition("messages-for-seek",0);
consumer.assign(Collections.singletonList(tp));

consumer.seek(tp, new OffsetAndMetadata(0));

while (latch.getCount() > 0) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : records) {
SpanContext spanContext = TracingKafkaUtils
.extractSpanContext(record.headers(), mockTracer);
assertNotNull(spanContext);
assertEquals("test", record.value());
assertEquals(key, record.key());

consumer.commitSync();
latch.countDown();
}
}
kafkaConsumer.close();
});

assertTrue(latch.await(30, TimeUnit.SECONDS));
}


private Producer<Integer, String> createTracingProducer() {
return new TracingKafkaProducer<>(createProducer(), mockTracer);
}
Expand Down

0 comments on commit 9567b08

Please sign in to comment.