Skip to content

Commit

Permalink
GH-1683: Improve Missing Ack Argument Exception
Browse files Browse the repository at this point in the history
Resolves #1683

Error message was unclear when a non-MANUAL ack mode was used with an
`Acknowledgment` parameter.

(cherry picked from commit e2cd1a1)
  • Loading branch information
garyrussell authored and artembilan committed Jan 19, 2021
1 parent f736bd9 commit 73ad11e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,7 @@
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -341,6 +342,16 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
"be invoked with the incoming message", message.getPayload()),
new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
if (this.hasAckParameter && acknowledgment == null) {
throw new ListenerExecutionFailedException("invokeHandler Failed",
new IllegalStateException("No Acknowledgment available as an argument, "
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
ex));
}
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()), ex);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
"be invoked with the incoming message", message.getPayload()), ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,19 @@

package org.springframework.kafka.listener.adapter;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.lang.reflect.Method;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.support.GenericMessage;
Expand All @@ -36,7 +41,7 @@
public class MessagingMessageListenerAdapterTests {

@Test
public void testFallbackType() {
void testFallbackType() {
final class MyAdapter extends MessagingMessageListenerAdapter<String, String>
implements AcknowledgingMessageListener<String, String> {

Expand All @@ -61,4 +66,22 @@ public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknow
verify(converter).toMessage(cr, ack, null, String.class);
}

@Test
void testMissingAck() throws NoSuchMethodException, SecurityException {
KafkaListenerAnnotationBeanPostProcessor<String, String> bpp = new KafkaListenerAnnotationBeanPostProcessor<>();
Method method = getClass().getDeclaredMethod("test", Acknowledgment.class);
RecordMessagingMessageListenerAdapter<String, String> adapter =
new RecordMessagingMessageListenerAdapter<>(this, method);
adapter.setHandlerMethod(
new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method)));
assertThatExceptionOfType(ListenerExecutionFailedException.class).isThrownBy(() -> adapter.onMessage(
new ConsumerRecord<>("foo", 0, 0L, null, "foo"), null, null))
.withCauseExactlyInstanceOf(IllegalStateException.class)
.withMessageContaining("MANUAL");
}

public void test(Acknowledgment ack) {

}

}

0 comments on commit 73ad11e

Please sign in to comment.