Skip to content

Commit

Permalink
Fix Request/Reply with ConsumerRecord<?, ?>
Browse files Browse the repository at this point in the history
Message conversion is bypassed when consuming the raw `ConsumerRecord`.
However, the request message is needed when returning a result (for
reply topic determination, correlation, etc).

**I will do the backports; I expect conflicts in the test.**
  • Loading branch information
garyrussell committed Aug 27, 2020
1 parent 72fddc1 commit 18edc93
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
}
}

if (notConvertibleParameters == method.getParameterCount()) {
if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) {
this.conversionNeeded = false;
}
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -152,6 +153,10 @@ public class ReplyingKafkaTemplateTests {

public static final String J_REQUEST = "jRequest";

public static final String K_REPLY = "kReply";

public static final String K_REQUEST = "kRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -198,6 +203,24 @@ public void testGood() throws Exception {
}
}

@Test
void testConsumerRecord() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(K_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
ProducerRecord<Integer, String> record = new ProducerRecord<>(K_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testBadDeserialize() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
Expand Down Expand Up @@ -730,6 +753,12 @@ public String handleJ(String in) throws InterruptedException {
return in.toUpperCase();
}

@KafkaListener(id = K_REQUEST, topics = { K_REQUEST })
@SendTo
public String handleK(ConsumerRecord<String, String> in) {
return in.value().toUpperCase();
}

}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand Down

0 comments on commit 18edc93

Please sign in to comment.