diff --git a/spring-amqp/src/main/java/org/springframework/amqp/AmqpResourceNotAvailableException.java b/spring-amqp/src/main/java/org/springframework/amqp/AmqpResourceNotAvailableException.java new file mode 100644 index 0000000000..845687671b --- /dev/null +++ b/spring-amqp/src/main/java/org/springframework/amqp/AmqpResourceNotAvailableException.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp; + +/** + * The {@link AmqpException} thrown when some resource can't be accessed. + * For example when {@code channelMax} limit is reached and connect can't + * create a new channel at the moment. + * + * @author Artem Bilan + * + * @since 1.7.7 + */ +public class AmqpResourceNotAvailableException extends AmqpException { + + public AmqpResourceNotAvailableException(String message) { + super(message); + } + + public AmqpResourceNotAvailableException(Throwable cause) { + super(cause); + } + + public AmqpResourceNotAvailableException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java index 69907c5211..edfa437e1d 100755 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.InetAddress; +import org.springframework.amqp.AmqpResourceNotAvailableException; import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; import org.springframework.util.ObjectUtils; @@ -54,6 +55,9 @@ public SimpleConnection(com.rabbitmq.client.Connection delegate, public Channel createChannel(boolean transactional) { try { Channel channel = this.delegate.createChannel(); + if (channel == null) { + throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later."); + } if (transactional) { // Just created so we want to start the transaction channel.txSelect(); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java index 548b295b8b..9f3af78b3e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2017 the original author or authors. + * Copyright 2010-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; /** @@ -176,6 +177,7 @@ public void testCloseInvalidConnection() throws Exception { .thenReturn(mockConnection1, mockConnection2); // simulate a dead connection when(mockConnection1.isOpen()).thenReturn(false); + when(mockConnection2.createChannel()).thenReturn(mock(Channel.class)); AbstractConnectionFactory connectionFactory = createConnectionFactory(mockConnectionFactory); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java index 9e7c6fd5d4..ce6c79f8d4 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,6 +64,7 @@ import org.springframework.amqp.AmqpAuthenticationException; import org.springframework.amqp.AmqpException; import org.springframework.amqp.AmqpIOException; +import org.springframework.amqp.AmqpResourceNotAvailableException; import org.springframework.amqp.AmqpTimeoutException; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; @@ -89,6 +90,7 @@ * @author Gunnar Hillert * @author Gary Russell * @author Artem Bilan + * * @since 1.0 * */ @@ -571,6 +573,14 @@ public void hangOnClose() throws Exception { factory.destroy(); } + @Test(expected = AmqpResourceNotAvailableException.class) + public void testChannelMax() { + this.connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(1); + Connection connection = this.connectionFactory.createConnection(); + connection.createChannel(true); + connection.createChannel(false); + } + private Log spyOnLogger(CachingConnectionFactory connectionFactory2) { DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory2); Log logger = spy((Log) dfa.getPropertyValue("logger")); diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 76bf7547e9..df0fad3250 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -377,6 +377,9 @@ Starting with _version 2.0.2_, the `RabbitTemplate` has a configuration option t See <> for more information. The `ConnectionNameStrategy` for the publisher connection is the same as the primary strategy with `.publisher` appended to the result of calling the method. +Starting with _version 1.7.7_, an `AmqpResourceNotAvailableException` is provided, which is thrown now when `SimpleConnection.createChannel()` can't create a `Channel`, for example, because the `channelMax` limit is reached and there are no available channels in the cache. +This exception can be used in the `RetryPolicy` to recover the operation after some back-off. + [[connection-factory]] ===== Configuring the Underlying Client Connection Factory