Skip to content

Commit

Permalink
GH-1184: Add Exactly Once Test Case
Browse files Browse the repository at this point in the history
Resolves #1184

Initial commit "exactly once" unit tests

Clean up new tests, verify that tests run quickly

Update imports to meet check styles
  • Loading branch information
wkennedy authored and garyrussell committed Sep 8, 2020
1 parent 7dda5ec commit 625bee2
Showing 1 changed file with 83 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,15 +27,18 @@
import java.util.Optional;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.kafka.support.converter.MessagingMessageConverter;
Expand All @@ -56,6 +59,7 @@
/**
* @author Mark Norkin
* @author Gary Russell
* @author Will Kennedy
*
* @since 2.3.0
*/
Expand Down Expand Up @@ -281,4 +285,82 @@ public void shouldSendOffsetsToTransaction() {
.verify(DEFAULT_VERIFY_TIMEOUT);
}

@Test
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnceWithException() {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY,
DEFAULT_VALUE);

StepVerifier.create(reactiveKafkaProducerTemplate
.sendTransactionally(SenderRecord.create(producerRecord, null))
.then())
.expectComplete()
.verify();

StepVerifier.create(reactiveKafkaConsumerTemplate
.receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager())
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, true))
.onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error)))
)
.expectErrorMatches(throwable -> throwable instanceof KafkaException &&
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
"attempted from state READY to state ABORTING_TRANSACTION"))
.verify();

StepVerifier.create(reactiveKafkaConsumerTemplate
.receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge()))
.assertNext(receiverRecord -> assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE))
.thenCancel()
.verify(DEFAULT_VERIFY_TIMEOUT);
}

@Test
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnce() {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY,
DEFAULT_VALUE);

StepVerifier.create(reactiveKafkaProducerTemplate.sendTransactionally(SenderRecord.create(producerRecord, null))
.then())
.expectComplete()
.verify();

StepVerifier.create(reactiveKafkaConsumerTemplate
.receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager())
.concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, false))
.onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error)))
)
.assertNext(senderResult -> {
assertThat(senderResult.correlationMetadata().intValue()).isEqualTo(DEFAULT_KEY);
assertThat(senderResult.recordMetadata().offset()).isGreaterThan(0);
})
.thenCancel()
.verify(DEFAULT_VERIFY_TIMEOUT);

StepVerifier.create(reactiveKafkaConsumerTemplate
.receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge()))
.assertNext(receiverRecord -> {
assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE + "xyz");
assertThat(receiverRecord.offset()).isGreaterThan(0);
})
.thenCancel()
.verify(DEFAULT_VERIFY_TIMEOUT);
}

private Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, String>> fluxConsumerRecord, boolean failCommit) {
return reactiveKafkaProducerTemplate
.send(fluxConsumerRecord.map(this::toSenderRecord)
.concatWith(failCommit ?
doThrowKafkaException() :
reactiveKafkaProducerTemplate.transactionManager().commit()));
}

private Publisher<? extends SenderRecord<Integer, String, Integer>> doThrowKafkaException() {
throw new KafkaException();
}

private SenderRecord<Integer, String, Integer> toSenderRecord(ConsumerRecord<Integer, String> record) {
return SenderRecord.create(REACTIVE_INT_KEY_TOPIC, record.partition(), null, record.key(), record.value() + "xyz", record.key());
}

}

0 comments on commit 625bee2

Please sign in to comment.