Skip to content

Commit

Permalink
AMQP-466: Move Direct reply-to Decoding to Address
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-466

1.4.1 introduced direct reply to and reply to address decoding required
using `AddressUtils`. The code should have been put in the `Address` ctor.
  • Loading branch information
garyrussell committed Jan 10, 2015
1 parent deaff00 commit b60f7e6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -44,6 +44,8 @@ public class Address {

private final String routingKey;

public static final String AMQ_RABBITMQ_REPLY_TO = "amq.rabbitmq.reply-to";

/**
* Create an Address instance from a structured String in the form
*
Expand All @@ -58,6 +60,10 @@ public Address(String address) {
this.exchangeName = "";
this.routingKey = "";
}
else if (address.startsWith(AMQ_RABBITMQ_REPLY_TO)) {
this.routingKey = address;
this.exchangeName = "";
}
else if (address.lastIndexOf('/') <= 0) {
this.routingKey = address.replaceFirst("/", "");
this.exchangeName = "";
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,27 +22,14 @@
*/
public class AddressUtils {

public static final String AMQ_RABBITMQ_REPLY_TO = "amq.rabbitmq.reply-to";

/**
* Decodes the reply-to {@link Address} into exchange/key.
*
* @param request the inbound message.
* @return the Address.
*/
public static Address decodeReplyToAddress(Message request) {
Address replyTo;
String replyToString = request.getMessageProperties().getReplyTo();
if (replyToString == null) {
replyTo = null;
}
else if (replyToString.startsWith(AMQ_RABBITMQ_REPLY_TO)) {
replyTo = new Address("", replyToString);
}
else {
replyTo = new Address(replyToString);
}
return replyTo;
return request.getMessageProperties().getReplyToAddress();
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
* @author Mark Pollack
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
*/
public class AddressTests {

Expand Down Expand Up @@ -83,4 +84,18 @@ public void testEmpty() {
assertEquals("/", address.toString());
}

@Test
public void testDirectReplyTo() {
String replyTo = Address.AMQ_RABBITMQ_REPLY_TO + ".ab/cd/ef";
MessageProperties props = new MessageProperties();
props.setReplyTo(replyTo);
Message message = new Message("foo".getBytes(), props);
Address address = AddressUtils.decodeReplyToAddress(message);
assertEquals("", address.getExchangeName());
assertEquals(replyTo, address.getRoutingKey());
address = props.getReplyToAddress();
assertEquals("", address.getExchangeName());
assertEquals(replyTo, address.getRoutingKey());
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -491,13 +491,13 @@ public Collection<CorrelationData> getUnconfirmed(long age) {

private void evaluateFastReplyTo() {
this.usingFastReplyTo = false;
if (this.replyQueue == null || AddressUtils.AMQ_RABBITMQ_REPLY_TO.equals(this.replyQueue.getName())) {
if (this.replyQueue == null || Address.AMQ_RABBITMQ_REPLY_TO.equals(this.replyQueue.getName())) {
try {
execute(new ChannelCallback<Void>() {

@Override
public Void doInRabbit(Channel channel) throws Exception {
channel.queueDeclarePassive(AddressUtils.AMQ_RABBITMQ_REPLY_TO);
channel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
return null;
}
});
Expand Down Expand Up @@ -910,7 +910,7 @@ public Message doInRabbit(Channel channel) throws Exception {
"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
String replyTo;
if (RabbitTemplate.this.usingFastReplyTo) {
replyTo = AddressUtils.AMQ_RABBITMQ_REPLY_TO;
replyTo = Address.AMQ_RABBITMQ_REPLY_TO;
}
else {
DeclareOk queueDeclaration = channel.queueDeclare();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -59,7 +59,6 @@
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AddressUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
Expand Down Expand Up @@ -1074,7 +1073,7 @@ public void testSendAndReceiveFastImplicit() {

@Test
public void testSendAndReceiveFastExplicit() {
this.template.setReplyQueue(new Queue(AddressUtils.AMQ_RABBITMQ_REPLY_TO));
this.template.setReplyQueue(new Queue(Address.AMQ_RABBITMQ_REPLY_TO));
sendAndReceiveFastGuts();
}

Expand All @@ -1084,7 +1083,7 @@ private void sendAndReceiveFastGuts() {

@Override
public Void doInRabbit(Channel channel) throws Exception {
channel.queueDeclarePassive(AddressUtils.AMQ_RABBITMQ_REPLY_TO);
channel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
return null;
}
});
Expand All @@ -1109,7 +1108,7 @@ public Message handleMessage(Message message) {
Object result = this.template.convertSendAndReceive("foo");
container.stop();
assertEquals("FOO", result);
assertThat(replyToWas.get(), startsWith(AddressUtils.AMQ_RABBITMQ_REPLY_TO));
assertThat(replyToWas.get(), startsWith(Address.AMQ_RABBITMQ_REPLY_TO));
}
catch (Exception e) {
assertThat(e.getCause().getCause().getMessage(), containsString("404"));
Expand Down

0 comments on commit b60f7e6

Please sign in to comment.