Skip to content

Commit

Permalink
Fix MessagingGatewaySupport implementors
Browse files Browse the repository at this point in the history
The `AmqpInboundGateway` and `JmsInboundGateway` don't delegate to `super`
in their `doStart()/doStop()` implementations causing the problem with
an internal `replyMessageCorrelator` missed the proper lifecycle
management

**Cherry-pick to 5.1.x & 5.0.x**
  • Loading branch information
artembilan authored and garyrussell committed May 8, 2019
1 parent bcf5929 commit 0492889
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
*/
public class AmqpInboundGateway extends MessagingGatewaySupport {

private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<AttributeAccessor>();
private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();

private final AbstractMessageListenerContainer messageListenerContainer;

Expand Down Expand Up @@ -203,11 +203,13 @@ protected void onInit() {

@Override
protected void doStart() {
super.doStart();
this.messageListenerContainer.start();
}

@Override
protected void doStop() {
super.doStop();
this.messageListenerContainer.stop();
}

Expand Down Expand Up @@ -269,18 +271,18 @@ public void onMessage(final Message message, final Channel channel) throws Excep
org.springframework.messaging.Message<Object> converted = convert(message, channel);
if (converted != null) {
AmqpInboundGateway.this.retryTemplate.execute(context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(converted).incrementAndGet();
process(message, converted);
return null;
},
(RecoveryCallback<Object>) AmqpInboundGateway.this.recoveryCallback);
StaticMessageHeaderAccessor.getDeliveryAttempt(converted).incrementAndGet();
process(message, converted);
return null;
},
(RecoveryCallback<Object>) AmqpInboundGateway.this.recoveryCallback);
}
}
}

private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
Map<String, Object> headers = null;
Object payload = null;
Map<String, Object> headers;
Object payload;
boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer
.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
try {
Expand All @@ -299,17 +301,17 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
if (errorChannel != null) {
setAttributesIfNecessary(message, null);
AmqpInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null,
EndpointUtils.errorMessagePayload(message, channel, isManualAck, e)));
EndpointUtils.errorMessagePayload(message, channel, isManualAck, e)));
}
else {
throw e;
}
return null;
}
return getMessageBuilderFactory()
.withPayload(payload)
.copyHeaders(headers)
.build();
.withPayload(payload)
.copyHeaders(headers)
.build();
}

private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class JmsInboundGateway extends MessagingGatewaySupport implements Dispos

public JmsInboundGateway(AbstractMessageListenerContainer listenerContainer,
ChannelPublishingJmsMessageListener listener) {

this.endpoint = new JmsMessageDrivenEndpoint(listenerContainer, listener);
}

Expand Down Expand Up @@ -138,11 +139,13 @@ public ChannelPublishingJmsMessageListener getListener() {

@Override
protected void doStart() {
super.doStart();
this.endpoint.start();
}

@Override
protected void doStop() {
super.doStop();
this.endpoint.stop();
}

Expand Down

0 comments on commit 0492889

Please sign in to comment.