You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Originally posted by Antxl May 21, 2023
I'm making a custom cloud-stream-binder for a jms middleware, and I'm using JmsMessageDrivenEndpoint as MessageProducer.
It requires a ChannelPublishingJmsMessageListener where I had difficulties in applying the RetryTemplate built by AbstractBinder to that I can make maxAttempts property take effect.
Let's have a look at method public void onMessage(javax.jms.Message jmsMessage, Session session) in it.
Message<?> requestMessage;
try {
finalObjectresult;
if (this.extractRequestPayload) {
result = this.messageConverter.fromMessage(jmsMessage);
this.logger.debug(...);
}
else {
result = jmsMessage;
}
Map<String, Object> headers = this.headerMapper.toHeaders(jmsMessage);
requestMessage = // Here we get the integration message
(resultinstanceofMessage<?>) ?
this.messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
this.messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
}
catch (RuntimeExceptione) {
...
return;
}
if (!this.expectReply) { // Send to downstream directly herethis.gatewayDelegate.send(requestMessage);
}
else { ... // Won't go here
As you can see, it's impossible to capture the requestMessage from outside and put it into the ReteyContext unless repeating the whole process to build an Integration Message totally for error handling purpose. (And messageConverter, headerMapper, etc. cannot be accessed from sub-classes)
Maybe I should write a new MessageListener which means I must give up JmsMessageDrivenEndpoint as well (obviously not an option for me). Now I "solved" it by producing a identical message in the subclass, like code attached below:
@OverridepublicvoidonMessage(javax.jms.MessagejmsMessage, Sessionsession) throwsJMSException {
Message<?> requestMessage = getIntegrationMessage(jmsMessage);
if (retryOp != null)
retryOp.execute(context -> {
context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, requestMessage);
super.onMessage(jmsMessage, session);
returnnull;
}, recoveryCallback);
elsesuper.onMessage(jmsMessage, session);
}
protectedMessage<?> getIntegrationMessage(javax.jms.MessagejmsMessage) throwsJMSException {
try {
Objectresult = messageConverter.fromMessage(jmsMessage);
Map<String, Object> headers = headerMapper.toHeaders(jmsMessage);
returnresultinstanceofMessage<?> ?
messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
} catch (RuntimeExceptione) {
returnnull; // It doesn't matter here since it will fail anyway later
}
}
Would it possible to extract the send process to a method like sendAndReceiveReply or allow a custom MessageGateway? Thanks.
The text was updated successfully, but these errors were encountered:
if you are interested, you can look into this.
If I remember correct, we need to expose RetryTemplate and RecoveryCallback options need to be exposed on the ChannelPublishingJmsMessageListener.
See more info in the mentioned discussion.
Discussed in #8629
Originally posted by Antxl May 21, 2023
I'm making a custom cloud-stream-binder for a jms middleware, and I'm using
JmsMessageDrivenEndpoint
asMessageProducer
.It requires a
ChannelPublishingJmsMessageListener
where I had difficulties in applying theRetryTemplate
built byAbstractBinder
to that I can makemaxAttempts
property take effect.Let's have a look at method
public void onMessage(javax.jms.Message jmsMessage, Session session)
in it.As you can see, it's impossible to capture the
requestMessage
from outside and put it into theReteyContext
unless repeating the whole process to build an Integration Message totally for error handling purpose. (AndmessageConverter
,headerMapper
, etc. cannot be accessed from sub-classes)Maybe I should write a new
MessageListener
which means I must give upJmsMessageDrivenEndpoint
as well (obviously not an option for me). Now I "solved" it by producing a identical message in the subclass, like code attached below:Would it possible to extract the send process to a method like
sendAndReceiveReply
or allow a customMessageGateway
? Thanks.The text was updated successfully, but these errors were encountered: