Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaTemplate Request/Reply #544

Merged
merged 12 commits into from
Feb 2, 2018
Merged

Conversation

garyrussell
Copy link
Contributor

  • Add AsyncKafkaTemplate with sendAndReceive()
  • In @KafkaListener, echo the correlationId header

Needs docs
I marked it @since 2.1.3 because we need to build 2.1.2 on Monday an I don't think this will be merged by then.

@artembilan
Copy link
Member

Something is not stable:

org.springframework.kafka.AsyncKafkaTemplateTests > testGood FAILED
    java.util.concurrent.TimeoutException at AsyncKafkaTemplateTests.java:94

See Travis report.

Why don't deffer this until 2.2 then though?

@garyrussell
Copy link
Contributor Author

We don't have any immediate plans for a 2.2 release. Why would we delay?

@artembilan
Copy link
Member

SF 5.1, Reactor 3.2? Might be Kafka 1.1...

OK. Never mind. I'm just a bit nervous to introduce new features in the point release. More versions support in the future when we face a bug in that feature.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's all.
Need JavaDocs and Docs.

GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) {
super(producerFactory, autoFlush);
this.replyContainer = replyContainer;
this.replyContainer.setupMessageListener(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert.notNull(replyContainer);


@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.scheduler, "A scheduler is required for timeouts");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just don't create an internal instance then?

It doesn't look correct to allow to create this AsyncKafkaTemplate without TaskScheduler, but then require it later.

public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
String correlationId = createCorrelationId();
record.headers()
.add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId.getBytes(StandardCharsets.UTF_8)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need exactly byte[] for the value, so, let's make that createCorrelationId() to return byte[] for us!

The default one indeed can be what you do with UUID, but as this:

 ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
 bb.putLong(uuid.getMostSignificantBits());
 bb.putLong(uuid.getLeastSignificantBits());

return bb.array();

This way we will be free of the String to byte[] conversion here.

WDYT?

}
RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
this.futures.put(correlationId, future);
future.setSendFuture(send(record));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we couldn't send here?
Can't we perform this.futures.put() already after this send()?
Or do you mean there might be a race condition that reply will come earlier than we perform this.futures.put() ?

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (race condition).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. So, the next schedule must be in the final block for the case when we got an exception from the send(record). Or even better do not schedule at all, but just remove in the catch. Just because we haven't sent record therefore we won't anticipate any reply at all.

Does it make sense ?

@@ -4,6 +4,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n
log4j.category.org.springframework.kafka=WARN
log4j.category.org.springframework.kafka.AsyncKafkaTemplate=DEBUG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we will remove it during merge.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool feature, but I still have some concerns.

Thanks

}

protected void setTaskScheduler(TaskScheduler scheduler) {
this.scheduler = scheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert.notNull()


@Override
public synchronized void start() {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no if (!this.running) ?

}
TemplateRequestReplyFuture<K, V, R> future = new TemplateRequestReplyFuture<>();
this.futures.put(correlationId, future);
future.setSendFuture(send(record));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why still no try...catch()?
In case the send(record) throws an exception we don't reach this.scheduler.schedule() and we have an orphan entry in the this.futures

RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
if (removed != null) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Reply timed out for: " + record + " with correlationId: " + correlationId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we call stop() we perform this.replyContainer.stop() therefore no any replies are going to come back to us.
And from here all the waiting this.futures will be discarded here with this misleading warning.

What I think we need to cancel the Future from this schedule() and clean up this.futures altogether in the stop() as well after stopping this.replyContainer.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceling the futures would mean we'd have to track them. I think we can just do

this.futures.replaceAll((k, v) -> null);

in stop() and then the timeouts will be no-ops.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually CHM doesn't support null, but clear() will do it.

}
}
if (correlationId == null) {
this.logger.error("No correlationId found in reply: " + record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this not enough. Not found. And? What now ? Did we do anything wrong?
I mean let's add some explanation into the log message what is wrong in this case.

private volatile Integer hashCode;

public CorrelationKey(byte[] bytes) {
this.correlationId = bytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert.notNull() - this is public API, so end-user may decide to try to suffer of NPE 😄

@@ -44,7 +44,7 @@
* @author Artem Bilan
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements MessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, SmartLifecycle {
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to SmartLifecycle here?

Hardly ever possible, but still breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The super.super interface (MLC) extends SL. So it was always redundant here.

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
----

The result is a `ListenableFuture` that will asynchronously populated with the result (or an exception, for a timeout).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be populated ?

}
----

The `@KafkaListener` infrastructure echoes the correlation id and determines the reply topic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Period in the end of sentence. Or some text is missed yet...

/*
* Needed if Jackson is not on the classpath since we can't map headers in that case.
*/
private byte[] extractNativeHeader(Object source, String header) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't merge yet; I have found a better solution for when Jackson is not on the CP.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More review after pulling locally.

* @since 2.1.3
*
*/
public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to push this to the core package. Otherwise we have a package tangle between root and that core:

image

@Override
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
CorrelationKey correlationId = createCorrelationId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to propagate record to the createCorrelationId() and allow to do record-based correlation ?

if (correlationId == null) {
this.logger.error("No correlationId found in reply: " + record
+ " - to use request/reply semantics, the responding server must return the correlation id "
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we may think (or wait for request) about a correlationKey property when someone would like to correlate not by out-of-the-box KafkaHeaders.CORRELATION_ID. Seems for me we provide that Spring AMQP.

Right, we may consider that in the separate issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - enhancement.

* @since 2.1.3
*
*/
public class RequestReplyFuture<K, V, R> extends SettableListenableFuture<ConsumerRecord<K, R>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar package tangle we have through this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class should go now to a new package as well.

However I would prefer to just have its inner variant and that's all.

super();
}

protected void setSendFuture(ListenableFuture<SendResult<K, V>> sendFuture) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just don't have it as public ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want the user to be able to set it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point! Why do we need this class on the top level at all?
What doesn't work keeping it as inner one?

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you see a tangle? S101 doesn't see it. We can't use an inner class because then we can't have the ...Operations interface. I agree it should be moved though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I think there was a tangle before, when we didn't have a new requestreply.

But that's good that it is fixed now !

*
* @deprecated in favor of {@link #sendResponse(Object, String, Object)}.
* @param result the result.
* @param topic the topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't @deprecated tag be in the end of JavaDocs?

<module name="AtclauseOrder">
		<property name="target" value="METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
		<property name="tagOrder" value="@param, @return, @throws, @since, @deprecated, @see"/>
</module>

Hm... That is our Checkstyle config here. How has your change bypassed it?

See <<annotation-send-to>> for more information about sending replies; in this case we use the default header `KafKaHeaders.REPLY_TOPIC` to indicate which topic the reply goes to.

IMPORTANT: If you have multiple client instances, each will need a dedicated reply topic for each instance.
An alternative is to set the `KafakHeaders.REPLY_PARTITION` and use a dedicated partition for each instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: s/KafakHeaders/KafkaHeaders


==== ReplyingKafkaTemplate

Starting with _version 2.1.3_ a subclass of `KafkaTemplate` is provided to support request/reply semantics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comma before a ?

@garyrussell
Copy link
Contributor Author

I'll look at the tangles tomorrow - I had to move it up to root in Spring AMQP because there was a tangle between root and core and assumed I'd have the same problem here.

@artembilan
Copy link
Member

I'd have the same problem here

What I see here that we use KafkaException class in the core, but at the same time we use KafkaTemplate here in the ReplyingKafkaTemplate.

byte[] partitionBytes = source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class);
if (partitionBytes != null) {
String partition = new String(partitionBytes, StandardCharsets.UTF_8);
builder.setHeader(KafkaHeaders.PARTITION_ID, Integer.parseInt(partition));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBuffer.wrap(partitionBytes).getInt() ?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that

* limitations under the License.
*/

package org.springframework.kafka.sendreceive;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not requestreply ?

* @since 2.1.3
*
*/
public class RequestReplyFuture<K, V, R> extends SettableListenableFuture<ConsumerRecord<K, R>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class should go now to a new package as well.

However I would prefer to just have its inner variant and that's all.

@@ -944,6 +1053,7 @@ The `#root` object for the evaluation has 3 properties:
- request - the inbound `ConsumerRecord` (or `ConsumerRecords` object for a batch listener))
- source - the `org.springframework.messaging.Message<?>` converted from the `request`.
- result - the method return result.
- `@SendTo` (no properties) - this is treated as `!{headers['kafka_replyTopic']}` (since version _2.1.3_).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source.headers['kafka_replyTopic'] ?

@@ -987,7 +1097,7 @@ public class MultiListenerSendTo {
----

When using `@SendTo`, the `ConcurrentKafkaListenerContainerFactory` must be configured with a `KafkaTemplate` in its `replyTemplate` property, to perform the send.
Note: only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition and/or key...
NOTE: unless you are using <<replying-template,request/reply semantics>> only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition and/or key...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we fix this sentence for the ellipsis in the end?

Thanks

*/
@RunWith(SpringRunner.class)
@DirtiesContext
public class ReplyingKafkaTemplateTests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it is a test class I think it should go to the new package for consistency.

@artembilan
Copy link
Member

Cool!

ReplyingKafkaTemplateTests.java:52:1: Redundant import from the same package - org.springframework.kafka.requestreply.ReplyingKafkaTemplate. [RedundantImport]

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several last questions.

Will do the last run in the morning!

* @since 2.1.3
*
*/
public interface ReplyingKafkaOperations<K, V, R> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we absorb this interface and, therefore, ReplyingKafkaTemplate just to the KafkaTemplate in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd rather keep it as a sub interface/class since it's configuration is more complex. Also, the RabbitTemplate has grown to 2400 LoC, which I don't want to repeat here.

* instead but this wrapper is less expensive. We do use a BigInteger in
* {@link #toString()} though.
*/
public static final class CorrelationKey {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we have this as top-level class?

* The header containing a partition number on which to send the reply.
* Type: binary (int) in byte[].
*/
public static final String REPLY_PARTITION = PREFIX + "replyPartition";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add @since 2.1.3 to all of these new headers ?

@artembilan
Copy link
Member

@garyrussell ,

ping.

I would like to hear answers to my latest comments before merging.
You might decide to fix anything else meanwhile, so I will wait.
Thanks.

@garyrussell
Copy link
Contributor Author

I was waiting for your "last run" before addressing those 😄

- Add `AsyncKafkaTemplate` with `sendAndReceive()`
- In `@KafkaListener`, echo the `correlationId` header
- increase timeout on good test
- add exception to future on timeout
When documenting this, we should recommend that users don't start sending
until the reply partitions are assigned.
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for misleading.

All good now.

Merging...

@artembilan artembilan merged commit 5adf2d1 into spring-projects:master Feb 2, 2018
@garyrussell garyrussell deleted the AsyncKT branch February 3, 2018 15:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants