Skip to content

Commit

Permalink
Fix Request/Reply with ConsumerRecord<?, ?>
Browse files Browse the repository at this point in the history
Message conversion is bypassed when consuming the raw `ConsumerRecord`.
However, the request message is needed when returning a result (for
reply topic determination, correlation, etc).

**I will do the backports; I expect conflicts in the test.**
  • Loading branch information
garyrussell committed Aug 27, 2020
1 parent bc086de commit 2f048be
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Expand Up @@ -579,7 +579,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
}
}

if (notConvertibleParameters == method.getParameterCount()) {
if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) {
this.conversionNeeded = false;
}
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;
Expand Down
Expand Up @@ -106,7 +106,8 @@
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -141,6 +142,10 @@ public class ReplyingKafkaTemplateTests {

public static final String J_REQUEST = "jRequest";

public static final String K_REPLY = "kReply";

public static final String K_REQUEST = "kRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -187,6 +192,24 @@ public void testGood() throws Exception {
}
}

@Test
void testConsumerRecord() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(K_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
ProducerRecord<Integer, String> record = new ProducerRecord<>(K_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testBadDeserialize() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
Expand Down Expand Up @@ -669,6 +692,12 @@ public void gListener(Message<String> in) {
public String handleJ(String in) throws InterruptedException {
return in.toUpperCase();
}
@KafkaListener(id = K_REQUEST, topics = { K_REQUEST })

@SendTo
public String handleK(ConsumerRecord<String, String> in) {
return in.value().toUpperCase();
}

}

Expand Down

0 comments on commit 2f048be

Please sign in to comment.