Skip to content

Commit

Permalink
GH-723: Add AmqpResourceNotAvailableException (#737)
Browse files Browse the repository at this point in the history
* GH-723: Add AmqpResourceNotAvailableException

Fixes #723

To avoid an `NPE` when connection returns `null` for the
`createChannel()` in case of `channelMax` is reached, throw newly
introduced `AmqpResourceNotAvailableException`.
This exception can be used in the `RetryPolicy` to retry the original
operation after some back-off - the channel permit may be released in
between

**Cherry-pick to 2.0.x and 1.7.x**

* * Fix `AbstractConnectionFactoryTests` for proper mock
* Fix `amqp.adoc` according PR comments
  • Loading branch information
artembilan authored and garyrussell committed Apr 3, 2018
1 parent 5423233 commit 6f72cf6
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 3 deletions.
@@ -0,0 +1,42 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp;

/**
* The {@link AmqpException} thrown when some resource can't be accessed.
* For example when {@code channelMax} limit is reached and connect can't
* create a new channel at the moment.
*
* @author Artem Bilan
*
* @since 1.7.7
*/
public class AmqpResourceNotAvailableException extends AmqpException {

public AmqpResourceNotAvailableException(String message) {
super(message);
}

public AmqpResourceNotAvailableException(Throwable cause) {
super(cause);
}

public AmqpResourceNotAvailableException(String message, Throwable cause) {
super(message, cause);
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.InetAddress;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.util.ObjectUtils;

Expand Down Expand Up @@ -54,6 +55,9 @@ public SimpleConnection(com.rabbitmq.client.Connection delegate,
public Channel createChannel(boolean transactional) {
try {
Channel channel = this.delegate.createChannel();
if (channel == null) {
throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later.");
}
if (transactional) {
// Just created so we want to start the transaction
channel.txSelect();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2017 the original author or authors.
* Copyright 2010-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,7 @@
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;

/**
Expand Down Expand Up @@ -176,6 +177,7 @@ public void testCloseInvalidConnection() throws Exception {
.thenReturn(mockConnection1, mockConnection2);
// simulate a dead connection
when(mockConnection1.isOpen()).thenReturn(false);
when(mockConnection2.createChannel()).thenReturn(mock(Channel.class));

AbstractConnectionFactory connectionFactory = createConnectionFactory(mockConnectionFactory);

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,6 +64,7 @@
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
Expand All @@ -89,6 +90,7 @@
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.0
*
*/
Expand Down Expand Up @@ -571,6 +573,14 @@ public void hangOnClose() throws Exception {
factory.destroy();
}

@Test(expected = AmqpResourceNotAvailableException.class)
public void testChannelMax() {
this.connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(1);
Connection connection = this.connectionFactory.createConnection();
connection.createChannel(true);
connection.createChannel(false);
}

private Log spyOnLogger(CachingConnectionFactory connectionFactory2) {
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory2);
Log logger = spy((Log) dfa.getPropertyValue("logger"));
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -377,6 +377,9 @@ Starting with _version 2.0.2_, the `RabbitTemplate` has a configuration option t
See <<separate-connection>> for more information.
The `ConnectionNameStrategy` for the publisher connection is the same as the primary strategy with `.publisher` appended to the result of calling the method.

Starting with _version 1.7.7_, an `AmqpResourceNotAvailableException` is provided, which is thrown now when `SimpleConnection.createChannel()` can't create a `Channel`, for example, because the `channelMax` limit is reached and there are no available channels in the cache.
This exception can be used in the `RetryPolicy` to recover the operation after some back-off.

[[connection-factory]]
===== Configuring the Underlying Client Connection Factory

Expand Down

0 comments on commit 6f72cf6

Please sign in to comment.