Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel creation - error handling during publishing support #41

Closed
pmackowski opened this issue Dec 21, 2018 · 5 comments
Closed

Channel creation - error handling during publishing support #41

pmackowski opened this issue Dec 21, 2018 · 5 comments
Labels
effort-low type/enhancement A general enhancement
Milestone

Comments

@pmackowski
Copy link
Contributor

ExceptionHandler assumes existence of a channel, whereas Sender#send or Sender#sendWithPublisherConfirms might fail earlier during channel creation (with exception AlreadyClosedException|ShutdownSignalException). In such case publishing will not be retried.

Currently we can only customize retrying channel.basicPublish through SendOptions.
It would be great to have such an option for a channel creation. Are you planning to add it in future releases?

Simple workaround is to use reactor retry or retryWhen operators so it is not must-have but it can be misleading especially to new users expecting retry for ShutdownSignalException.

@acogoluegnes acogoluegnes added type/enhancement A general enhancement effort-low labels Dec 21, 2018
@acogoluegnes acogoluegnes added this to the 1.1.0 milestone Dec 21, 2018
@acogoluegnes
Copy link
Contributor

You'd like to be able to customize the channel creation, right? This is certainly doable by adding a property to SenderOptions and even a higher-priority one in SendOptions. Feel free to submit a PR or even share your workaround in this thread.

@pmackowski
Copy link
Contributor Author

Yes, I would like to customize channel creation retry policy.

My workaround is to replace default SendOptions#exceptionHandler with one that rethrows exception

SendOptions sendOptions = new SendOptions().exceptionHandler(((sendContext, e) -> {
            throw new RabbitFluxException(e);
}));

and use the following connectionFailure method as an substitute for RetrySendingExceptionHandler. Retry class is taken from reactor-extra library.

    private Retry<?> connectionFailure() {
        // retry every 200 milliseconds for 10 seconds, same as default behaviour in reactor-rabbitmq
        return Retry.onlyIf(tRetryContext -> ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE.test(tRetryContext.exception()) ||
                    ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE.test(tRetryContext.exception().getCause()))
                .doOnRetry(retryContext -> log.error("Retrying due to exception...", retryContext.exception()))
                .timeout(Duration.ofSeconds(10))
                .fixedBackoff(Duration.ofMillis(200));
    }

Putting it all together:

Sender sender = RabbitFlux.createSender();
sender.sendWithPublishConfirms(msgFlux, sendOptions)
           .retryWhen(connectionFailure())
           .subscribe();

This workaround works only for Sender#sendWithPublishConfirms. It does not work for Sender#send because of cache() operator. It also does not work in case first subscription is unsuccessful due to connection caching.

I would love to create a PR but before that I need some clarifications (I will ask in next thread)

@acogoluegnes
Copy link
Contributor

Thanks for the workaround, this is very informative. Can you please elaborate on the reason the workaround doesn't work because of caching?

Note the connection caching can be disabled by passing in a custom Mono<Connection>.

Note also that Sender#send and Sender#sendWithPublishConfirms should behave the same, I don't remember any reason to cache the channel in one case and not in the other, so this is likely to be an unfortunate bug. That'll be fixed if we introduce a Mono<Channel> in SendOptions for 1.1.0.

@pmackowski
Copy link
Contributor Author

When caching is enabled for Mono or channel creation retry always gets the same signal from upstream which might be an error signal. In such case retry does not help even if rabbitmq is again up and running.

I haven't noticed that we can customize Mono through SenderOptions. Thanks for information.

Regarding Sender#send and Sender#sendWithPublishConfirms the difference is here send and here sendWithPublishConfirms. It seems that the latter implementation without caching is better.

@pmackowski pmackowski mentioned this issue Dec 27, 2018
@acogoluegnes
Copy link
Contributor

Closed by #42.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
effort-low type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

2 participants