-
Notifications
You must be signed in to change notification settings - Fork 619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQP-797: Defer caching publisher callback channel #757
Conversation
JIRA: https://jira.spring.io/browse/AMQP-797 Defer caching publisher callback channels until acks received. - another user might perform an erroneous call that forces the channel closed and the confirmations will be lost. - if the factory is destroyed with channels in that state, force them closed to generate nacks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's very smart!
Just a couple concerns which may end up just with an answer.
Also are you going to add some docs on the matter?
if (CachingConnectionFactory.this.active && this.publisherConfirms | ||
&& proxy instanceof PublisherCallbackChannel) { | ||
this.theConnection.channelsAwaitingAcks.add(proxy); | ||
((PublisherCallbackChannel) proxy).setAfterAckCallback(this::returnToCache, proxy); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This a bit awkward, but I guess we don't have choice since PublisherCallbackChannelImpl
is not ChannelProxy
instance.
I mean we we might not be able just to use this
there in the processAck()
. Correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - this
doesn't work (that's what I had initially) - we need a reference to the proxy. We can look at reworking in 2.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... or we need to keep a dictionary for PublisherCallbackChannel <-> ChannelProxy
cross-referencing.
* @param proxy the proxy. | ||
* @since 2.1 | ||
*/ | ||
void setAfterAckCallback(Consumer<ChannelProxy> callback, ChannelProxy proxy); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's hope we don't have any package tangles with this !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I'll check 😄
Yes; docs needed after review 😄 and before merge. |
* @since 2.1 | ||
*/ | ||
void setAfterAckCallback(Consumer<ChannelProxy> callback, ChannelProxy proxy); | ||
void setAfterAckCallback(Consumer<Channel> callback, Channel proxyForThis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Still confusing: the PublisherCallbackChannel
is a Channel
per se.
Doesn't my idea about relationship Map
work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's all, but, yeah, we still need docs.
public PublisherCallbackChannelImpl(Channel delegate) { | ||
delegate.addShutdownListener(this); | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public synchronized void setAfterAckCallback(java.util.function.Consumer<Channel> callback) { | ||
if (getPendingConfirmsCount() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert.notNull()
for the callback
?
@@ -839,8 +839,7 @@ public AmqpTemplate rabbitTemplate() { | |||
---- | |||
|
|||
Starting with _version 1.4_, in addition to the `retryTemplate` property, the `recoveryCallback` option is supported on the `RabbitTemplate`. | |||
It is used as a second argument for the `RetryTemplate.execute(RetryCallback<T, E> retryCallback, | |||
RecoveryCallback<T>recoveryCallback)`. | |||
It is used as a second argument for the `RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace before recoveryCallback
word.
Will fix on merge
Merged as 2e99acd. Looks like I combined the previous commit as well, but I don't think that there are too much destruction. |
JIRA: https://jira.spring.io/browse/AMQP-797
Defer caching publisher callback channels until acks received.
will be lost.