Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Some files had DOS Newlines (CRLF) instead of Unix (LF). Use 'git log -p -b' to verify only white space changes.
- Loading branch information
1 parent
50f4288
commit 662ea5c
Showing
18 changed files
with
2,819 additions
and
2,819 deletions.
There are no files selected for viewing
84 changes: 42 additions & 42 deletions
84
spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,42 +1,42 @@ | |||
/** | /** | ||
* | * | ||
*/ | */ | ||
package org.springframework.amqp.rabbit.connection; | package org.springframework.amqp.rabbit.connection; | ||
|
|
||
import org.springframework.amqp.AmqpException; | import org.springframework.amqp.AmqpException; | ||
|
|
||
import com.rabbitmq.client.Channel; | import com.rabbitmq.client.Channel; | ||
|
|
||
/** | /** | ||
* @author Dave Syer | * @author Dave Syer | ||
* | * | ||
*/ | */ | ||
public interface Connection { | public interface Connection { | ||
|
|
||
/** | /** | ||
* Create a new channel, using an internally allocated channel number. | * Create a new channel, using an internally allocated channel number. | ||
* @param transactional true if the channel should support transactions | * @param transactional true if the channel should support transactions | ||
* @return a new channel descriptor, or null if none is available | * @return a new channel descriptor, or null if none is available | ||
* @throws AmqpException if an I/O problem is encountered | * @throws AmqpException if an I/O problem is encountered | ||
*/ | */ | ||
Channel createChannel(boolean transactional) throws AmqpException; | Channel createChannel(boolean transactional) throws AmqpException; | ||
|
|
||
/** | /** | ||
* Close this connection and all its channels | * Close this connection and all its channels | ||
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code | * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code | ||
* and message 'OK'. | * and message 'OK'. | ||
* | * | ||
* Waits for all the close operations to complete. | * Waits for all the close operations to complete. | ||
* | * | ||
* @throws AmqpException if an I/O problem is encountered | * @throws AmqpException if an I/O problem is encountered | ||
*/ | */ | ||
void close() throws AmqpException; | void close() throws AmqpException; | ||
|
|
||
/** | /** | ||
* Flag to indicate the status of the connection. | * Flag to indicate the status of the connection. | ||
* | * | ||
* @return true if the connection is open | * @return true if the connection is open | ||
*/ | */ | ||
boolean isOpen(); | boolean isOpen(); | ||
|
|
||
} | } |
104 changes: 52 additions & 52 deletions
104
spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,52 +1,52 @@ | |||
/* | /* | ||
* Copyright 2002-2010 the original author or authors. | * Copyright 2002-2010 the original author or authors. | ||
* | * | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | * the License. You may obtain a copy of the License at | ||
* | * | ||
* http://www.apache.org/licenses/LICENSE-2.0 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
* | * | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | * specific language governing permissions and limitations under the License. | ||
*/ | */ | ||
package org.springframework.amqp.rabbit.connection; | package org.springframework.amqp.rabbit.connection; | ||
|
|
||
import java.io.IOException; | import java.io.IOException; | ||
|
|
||
import com.rabbitmq.client.Channel; | import com.rabbitmq.client.Channel; | ||
|
|
||
public class SimpleConnection implements Connection { | public class SimpleConnection implements Connection { | ||
|
|
||
private final com.rabbitmq.client.Connection delegate; | private final com.rabbitmq.client.Connection delegate; | ||
|
|
||
public SimpleConnection(com.rabbitmq.client.Connection delegate) { | public SimpleConnection(com.rabbitmq.client.Connection delegate) { | ||
this.delegate = delegate; | this.delegate = delegate; | ||
} | } | ||
|
|
||
public Channel createChannel(boolean transactional) { | public Channel createChannel(boolean transactional) { | ||
try { | try { | ||
Channel channel = delegate.createChannel(); | Channel channel = delegate.createChannel(); | ||
if (transactional) { | if (transactional) { | ||
// Just created so we want to start the transaction | // Just created so we want to start the transaction | ||
channel.txSelect(); | channel.txSelect(); | ||
} | } | ||
return channel; | return channel; | ||
} catch (IOException e) { | } catch (IOException e) { | ||
throw RabbitUtils.convertRabbitAccessException(e); | throw RabbitUtils.convertRabbitAccessException(e); | ||
} | } | ||
} | } | ||
|
|
||
public void close() { | public void close() { | ||
try { | try { | ||
delegate.close(); | delegate.close(); | ||
} catch (IOException e) { | } catch (IOException e) { | ||
throw RabbitUtils.convertRabbitAccessException(e); | throw RabbitUtils.convertRabbitAccessException(e); | ||
} | } | ||
} | } | ||
|
|
||
public boolean isOpen() { | public boolean isOpen() { | ||
return delegate != null && delegate.isOpen(); | return delegate != null && delegate.isOpen(); | ||
} | } | ||
|
|
||
} | } |
100 changes: 50 additions & 50 deletions
100
.../java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumerIntegrationTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,50 +1,50 @@ | |||
package org.springframework.amqp.rabbit.listener; | package org.springframework.amqp.rabbit.listener; | ||
|
|
||
import static org.junit.Assert.assertNull; | import static org.junit.Assert.assertNull; | ||
|
|
||
import org.apache.log4j.Level; | import org.apache.log4j.Level; | ||
import org.junit.Rule; | import org.junit.Rule; | ||
import org.junit.Test; | import org.junit.Test; | ||
import org.springframework.amqp.core.AcknowledgeMode; | import org.springframework.amqp.core.AcknowledgeMode; | ||
import org.springframework.amqp.core.Queue; | import org.springframework.amqp.core.Queue; | ||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | ||
import org.springframework.amqp.rabbit.core.RabbitTemplate; | import org.springframework.amqp.rabbit.core.RabbitTemplate; | ||
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; | import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; | ||
import org.springframework.amqp.rabbit.test.BrokerRunning; | import org.springframework.amqp.rabbit.test.BrokerRunning; | ||
import org.springframework.amqp.rabbit.test.BrokerTestUtils; | import org.springframework.amqp.rabbit.test.BrokerTestUtils; | ||
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster; | import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster; | ||
|
|
||
public class BlockingQueueConsumerIntegrationTests { | public class BlockingQueueConsumerIntegrationTests { | ||
|
|
||
private static Queue queue = new Queue("test.queue"); | private static Queue queue = new Queue("test.queue"); | ||
|
|
||
@Rule | @Rule | ||
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); | public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue); | ||
|
|
||
@Rule | @Rule | ||
public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, | public Log4jLevelAdjuster logLevels = new Log4jLevelAdjuster(Level.INFO, RabbitTemplate.class, | ||
SimpleMessageListenerContainer.class, BlockingQueueConsumer.class, | SimpleMessageListenerContainer.class, BlockingQueueConsumer.class, | ||
BlockingQueueConsumerIntegrationTests.class); | BlockingQueueConsumerIntegrationTests.class); | ||
|
|
||
@Test | @Test | ||
public void testTransactionalLowLevel() throws Exception { | public void testTransactionalLowLevel() throws Exception { | ||
|
|
||
RabbitTemplate template = new RabbitTemplate(); | RabbitTemplate template = new RabbitTemplate(); | ||
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); | CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); | ||
connectionFactory.setPort(BrokerTestUtils.getPort()); | connectionFactory.setPort(BrokerTestUtils.getPort()); | ||
template.setConnectionFactory(connectionFactory); | template.setConnectionFactory(connectionFactory); | ||
|
|
||
BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(connectionFactory, | BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(connectionFactory, | ||
new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<BlockingQueueConsumer>(), | new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<BlockingQueueConsumer>(), | ||
AcknowledgeMode.AUTO, true, 1, queue.getName()); | AcknowledgeMode.AUTO, true, 1, queue.getName()); | ||
blockingQueueConsumer.start(); | blockingQueueConsumer.start(); | ||
connectionFactory.destroy(); | connectionFactory.destroy(); | ||
|
|
||
// TODO: make this into a proper assertion. An exception can be thrown here by the Rabbit client and printed to | // TODO: make this into a proper assertion. An exception can be thrown here by the Rabbit client and printed to | ||
// stderr without being rethrown (so hard to make a test fail). | // stderr without being rethrown (so hard to make a test fail). | ||
blockingQueueConsumer.stop(); | blockingQueueConsumer.stop(); | ||
assertNull(template.receiveAndConvert(queue.getName())); | assertNull(template.receiveAndConvert(queue.getName())); | ||
|
|
||
} | } | ||
|
|
||
} | } |
Oops, something went wrong.