diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java index a737513644..7001c4d1e5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,15 +27,18 @@ import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import org.springframework.kafka.support.converter.MessagingMessageConverter; @@ -56,6 +59,7 @@ /** * @author Mark Norkin * @author Gary Russell + * @author Will Kennedy * * @since 2.3.0 */ @@ -281,4 +285,82 @@ public void shouldSendOffsetsToTransaction() { .verify(DEFAULT_VERIFY_TIMEOUT); } + @Test + public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnceWithException() { + ProducerRecord producerRecord = + new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY, + DEFAULT_VALUE); + + StepVerifier.create(reactiveKafkaProducerTemplate + .sendTransactionally(SenderRecord.create(producerRecord, null)) + .then()) + .expectComplete() + .verify(); + + StepVerifier.create(reactiveKafkaConsumerTemplate + .receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager()) + .concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, true)) + .onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error))) + ) + .expectErrorMatches(throwable -> throwable instanceof KafkaException && + throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " + + "attempted from state READY to state ABORTING_TRANSACTION")) + .verify(); + + StepVerifier.create(reactiveKafkaConsumerTemplate + .receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge())) + .assertNext(receiverRecord -> assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE)) + .thenCancel() + .verify(DEFAULT_VERIFY_TIMEOUT); + } + + @Test + public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnce() { + ProducerRecord producerRecord = + new ProducerRecord<>(REACTIVE_INT_KEY_TOPIC, DEFAULT_PARTITION, DEFAULT_TIMESTAMP, DEFAULT_KEY, + DEFAULT_VALUE); + + StepVerifier.create(reactiveKafkaProducerTemplate.sendTransactionally(SenderRecord.create(producerRecord, null)) + .then()) + .expectComplete() + .verify(); + + StepVerifier.create(reactiveKafkaConsumerTemplate + .receiveExactlyOnce(reactiveKafkaProducerTemplate.transactionManager()) + .concatMap(consumerRecordFlux -> sendAndCommit(consumerRecordFlux, false)) + .onErrorResume(error -> reactiveKafkaProducerTemplate.transactionManager().abort().then(Mono.error(error))) + ) + .assertNext(senderResult -> { + assertThat(senderResult.correlationMetadata().intValue()).isEqualTo(DEFAULT_KEY); + assertThat(senderResult.recordMetadata().offset()).isGreaterThan(0); + }) + .thenCancel() + .verify(DEFAULT_VERIFY_TIMEOUT); + + StepVerifier.create(reactiveKafkaConsumerTemplate + .receive().doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge())) + .assertNext(receiverRecord -> { + assertThat(receiverRecord.value()).isEqualTo(DEFAULT_VALUE + "xyz"); + assertThat(receiverRecord.offset()).isGreaterThan(0); + }) + .thenCancel() + .verify(DEFAULT_VERIFY_TIMEOUT); + } + + private Flux> sendAndCommit(Flux> fluxConsumerRecord, boolean failCommit) { + return reactiveKafkaProducerTemplate + .send(fluxConsumerRecord.map(this::toSenderRecord) + .concatWith(failCommit ? + doThrowKafkaException() : + reactiveKafkaProducerTemplate.transactionManager().commit())); + } + + private Publisher> doThrowKafkaException() { + throw new KafkaException(); + } + + private SenderRecord toSenderRecord(ConsumerRecord record) { + return SenderRecord.create(REACTIVE_INT_KEY_TOPIC, record.partition(), null, record.key(), record.value() + "xyz", record.key()); + } + }