Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP-283/284 Upgrade to RabbitMQ 3.0.0; AMQP-287 ConnectionTimeout #78

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -41,7 +41,7 @@ subprojects { subproject ->
junitVersion = '4.8.2'
log4jVersion = '1.2.15'
mockitoVersion = '1.8.4'
rabbitmqVersion = '2.8.4'
rabbitmqVersion = '3.0.0'

springVersion = '3.0.7.RELEASE'
}
Expand Down
Expand Up @@ -34,6 +34,10 @@ public abstract class ExchangeTypes {

public static final String SYSTEM = "system";

/**
* @deprecated
*/
@Deprecated
public static final String FEDERATED = "x-federation";

}
Expand Up @@ -21,9 +21,11 @@
/**
*
* @see AmqpAdmin
* @deprecated RabbitMQ no longer supports 'x-federation' exchanges.
*
* @author Gary Russell
*/
@Deprecated
public class FederatedExchange extends AbstractExchange {

public static final FederatedExchange DEFAULT = new FederatedExchange("");
Expand Down
Expand Up @@ -26,6 +26,7 @@
* @author Gary Russell
*
*/
@SuppressWarnings("deprecation")
public class FederatedExchangeParser extends AbstractExchangeParser {

private final static String BACKING_TYPE_ATTRIBUTE = "backing-type";
Expand Down
Expand Up @@ -278,8 +278,16 @@ public void setMandatory(boolean mandatory) {
this.mandatory = mandatory;
}

/**
* @deprecated - RabbitMQ no longer supports this option
* when publishing messages.
*/
@Deprecated
public void setImmediate(boolean immediate) {
this.immediate = immediate;
if (logger.isWarnEnabled()) {
logger.warn("RabbitMQ 3.0.0 and above no longer supports 'immediate'.");
}
}

/**
Expand Down
Expand Up @@ -501,16 +501,16 @@ public AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {

/**
* Retrieve the fatal startup exception if this processor completely failed to locate the broker resources it
* needed. Blocks up to 60 seconds waiting (but should always return promptly in normal circumstances).
* needed. Blocks up to 60 seconds waiting for an exception to occur
* (but should always return promptly in normal circumstances).
* No longer fatal if the processor does not start up in 60 seconds.
*
* @return a startup exception if there was one
* @throws TimeoutException if the consumer hasn't started
* @throws InterruptedException if the consumer startup is interrupted
*/
public FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException {
if (!start.await(60000L, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for startup");
}
start.await(60000L, TimeUnit.MILLISECONDS);
return startupException;
}

Expand Down
Expand Up @@ -174,6 +174,12 @@ public void basicPublish(String exchange, String routingKey,
props, body);
}

public void basicPublish(String exchange, String routingKey,
boolean mandatory, BasicProperties props, byte[] body)
throws IOException {
this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}

public DeclareOk exchangeDeclare(String exchange, String type)
throws IOException {
return this.delegate.exchangeDeclare(exchange, type);
Expand Down