Skip to content

Commit

Permalink
AMQP-633: Ensure Resources are Closed
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-633

Close connection for non-transactional template.
  • Loading branch information
garyrussell committed Aug 20, 2016
1 parent b25a3d2 commit a71fff0
Showing 1 changed file with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1374,37 +1374,38 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
Assert.notNull(action, "Callback object must not be null");
Channel channel;
RabbitResourceHolder resourceHolder = null;
Connection connection = null;
if (isChannelTransacted()) {
resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true);
channel = resourceHolder.getChannel();
if (channel == null) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
throw new IllegalStateException("Resource holder returned a null channel");
}
}
else {
Connection connection = connectionFactory.createConnection();
connection = connectionFactory.createConnection(); // NOSONAR - RabbitUtils
if (connection == null) {
throw new IllegalStateException("Connection factory returned a null connection");
}
channel = connection.createChannel(false);
if (channel == null) {
throw new IllegalStateException("Connection returned a null channel");
}
}
if (this.confirmsOrReturnsCapable == null) {
if (connectionFactory instanceof PublisherCallbackChannelConnectionFactory) {
PublisherCallbackChannelConnectionFactory pcccf =
(PublisherCallbackChannelConnectionFactory) connectionFactory;
this.confirmsOrReturnsCapable = pcccf.isPublisherConfirms() || pcccf.isPublisherReturns();
try {
channel = connection.createChannel(false);
if (channel == null) {
throw new IllegalStateException("Connection returned a null channel");
}
}
else {
this.confirmsOrReturnsCapable = Boolean.FALSE;
catch (RuntimeException e) {
RabbitUtils.closeConnection(connection);
throw e;
}
}
if (this.confirmsOrReturnsCapable) {
addListener(channel);
}
try {
if (this.confirmsOrReturnsCapable == null) {
determineConfirmsReturnsCapability(connectionFactory);
}
if (this.confirmsOrReturnsCapable) {
addListener(channel);
}
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on RabbitMQ Channel: " + channel);
}
Expand All @@ -1422,10 +1423,22 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
}
else {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
}
}
}

public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
if (connectionFactory instanceof PublisherCallbackChannelConnectionFactory) {
PublisherCallbackChannelConnectionFactory pcccf =
(PublisherCallbackChannelConnectionFactory) connectionFactory;
this.confirmsOrReturnsCapable = pcccf.isPublisherConfirms() || pcccf.isPublisherReturns();
}
else {
this.confirmsOrReturnsCapable = Boolean.FALSE;
}
}

/**
* Send the given message to the specified exchange.
*
Expand Down

0 comments on commit a71fff0

Please sign in to comment.