Skip to content

AmqpConnector.receiver is not able retrieve the connection when rabbit mq is down and up. #53

Closed
radhika123456789 opened this Issue Jan 24, 2014 · 5 comments

3 participants

@radhika123456789

I have a mule flow, having amqp connection to interact with rabbitmq.And i do have amqp connector flow it will try to retrieve the connection for every 5 sec, if rabbitmq connection lost.
I was testing scenario like.
Start the rabbitmq server.
Started the mule flow.Once the flow got started, stopped the rabbitmq server, and started again. i do see in the logs amqp connector tried for connection and got the connection.When am trying to submit the message to mule flow, am seeing the below exception repeatedly.

"java.lang.IllegalStateException: This MuleWorkManager .amqpConnector.receiver' is stopped".com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; reason: #method(reply-code=200, reply-text=Closed due to exception from Consumer org.mule.transport.amqp.AmqpMessageReceiver$AmqpConsumer@7bbcc611 (amq.ctag-K5GVFb_36ery2dm-QL7lmQ) method handleDelivery for channel AMQChannel(amqp://admin@127.0.0.1:5672/orchestration-integration,24), class-id=0, method-id=0)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)

@chaserb
chaserb commented Feb 3, 2014

I am seeing this issue as well, but only with the released 3.4.1 version of mule-transport-amqp. Oddly enough the current 3.4.2-SNAPSHOT seems to reconnect and consume messages as expected, but as far as I can tell from the commits, there are no changes between 3.4.1 and the current 3.4.2-SNAPSHOT.

Here is the flow I've used to test this, using a RabbitMQ broker (v 3.2.3, Erlang R16B02) on localhost:

<amqp:connector name="amqpConnector" host="localhost" port="5672" virtualHost="/reconnecting" 
        username="test" password="test" ackMode="MANUAL">
    <reconnect-forever frequency="1000"/>
</amqp:connector>

<flow name="ReconnectingConsumerFlow">
    <amqp:inbound-endpoint connector-ref="amqpConnector"
        exchangeName="reconnectExchange" exchangeType="direct" exchangeAutoDelete="false" exchangeDurable="true"
        queueName="reconnectQueue" queueDurable="true" queueExclusive="false" queueAutoDelete="false"
        routingKey="reconnectKey"/>
    <object-to-string-transformer />
    <logger level="INFO" message="Received a message: #[payload]" category="com.issinc.amqp-consumer-reconnect"/>
    <amqp:acknowledge-message/>
</flow>

With 3.4.2-SNAPSHOT, messages are consumed normally after stopping and restarting the broker. With 3.4.1, I receive the following in an endless loop if the above flow is the only consumer on the queue. It appears I still have a channel, and that my flow client attempts to receive the message, but fails repeatedly and endlessly due to the channel being closed:

WARN  2014-02-03 08:43:40,637 [pool-17-thread-1] org.mule.transport.amqp.AmqpMessageReceiver: Received shutdown signal for consumer tag: amq.ctag-B5Y0bUPHxYGyM_v4-jRKaQ, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; reason: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer org.mule.transport.amqp.AmqpMessageReceiver$AmqpConsumer@6d0fdc04 (amq.ctag-B5Y0bUPHxYGyM_v4-jRKaQ) method handleDelivery for channel AMQChannel(amqp://test@127.0.0.1:5672/reconnecting,2), class-id=0, method-id=0)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:542)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
    at com.rabbitmq.client.impl.DefaultExceptionHandler.handleChannelKiller(DefaultExceptionHandler.java:72)
    at com.rabbitmq.client.impl.DefaultExceptionHandler.handleConsumerException(DefaultExceptionHandler.java:60)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:145)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:76)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
WARN  2014-02-03 08:43:40,637 [pool-17-thread-1] org.mule.transport.amqp.AmqpConnector: Failed to close channel: AMQChannel(amqp://test@127.0.0.1:5672/reconnecting,2)
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
    at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:561)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:494)
    at org.mule.transport.amqp.AmqpConnector.closeChannel(AmqpConnector.java:784)
    at org.mule.transport.amqp.AmqpMessageReceiver.doStop(AmqpMessageReceiver.java:123)
    at org.mule.transport.amqp.AmqpMessageReceiver.restart(AmqpMessageReceiver.java:147)
    at org.mule.transport.amqp.AmqpMessageReceiver$AmqpConsumer.handleShutdownSignal(AmqpMessageReceiver.java:210)
    at com.rabbitmq.client.impl.ConsumerDispatcher.notifyConsumerOfShutdown(ConsumerDispatcher.java:187)
    at com.rabbitmq.client.impl.ConsumerDispatcher.notifyConsumersOfShutdown(ConsumerDispatcher.java:179)
    at com.rabbitmq.client.impl.ConsumerDispatcher.access$200(ConsumerDispatcher.java:36)
    at com.rabbitmq.client.impl.ConsumerDispatcher$6.run(ConsumerDispatcher.java:166)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:76)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
INFO  2014-02-03 08:43:40,637 [pool-17-thread-1] org.mule.transport.amqp.AmqpEndpointUtil: Declared exchange: reconnectExchange of type: direct, durable: true, autoDelete: false, arguments: {}
INFO  2014-02-03 08:43:40,638 [pool-17-thread-1] org.mule.transport.amqp.AmqpEndpointUtil: Declared queue: reconnectQueue, durable: true, exclusive: false, autoDelete: false, arguments: {}
INFO  2014-02-03 08:43:40,638 [pool-17-thread-1] org.mule.transport.amqp.AmqpEndpointUtil: Bound queue: reconnectQueue to exchange: reconnectExchange with routing key: reconnectKey
INFO  2014-02-03 08:43:40,639 [pool-17-thread-1] org.mule.transport.amqp.AmqpMessageReceiver: Started subscription: amq.ctag-QUBr9RJDkgrQdKJSS9FZZQ on channel: AMQChannel(amqp://test@127.0.0.1:5672/reconnecting,2)
DefaultExceptionHandler: Consumer org.mule.transport.amqp.AmqpMessageReceiver$AmqpConsumer@1f335f58 (amq.ctag-QUBr9RJDkgrQdKJSS9FZZQ) method handleDelivery for channel AMQChannel(amqp://test@127.0.0.1:5672/reconnecting,2) threw an exception for channel AMQChannel(amqp://test@127.0.0.1:5672/reconnecting,2):
java.lang.IllegalStateException: This MuleWorkManager '[mule-transport-amqp-consumer-reconnect].amqpConnector.receiver' is stopped
    at org.mule.work.MuleWorkManager.executeWork(MuleWorkManager.java:244)
    at org.mule.work.MuleWorkManager.scheduleWork(MuleWorkManager.java:208)
    at org.mule.transport.TrackingWorkManager.scheduleWork(TrackingWorkManager.java:192)
    at org.mule.transport.amqp.AmqpMessageReceiver.deliverAmqpMessage(AmqpMessageReceiver.java:179)
    at org.mule.transport.amqp.AmqpMessageReceiver.access$500(AmqpMessageReceiver.java:47)
    at org.mule.transport.amqp.AmqpMessageReceiver$AmqpConsumer.handleDelivery(AmqpMessageReceiver.java:226)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:140)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:76)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
@ddossot
ddossot commented Feb 3, 2014

These issues have been fixed for 3.4.1 so it's hard to understand how they can still be present with 3.4.1 but gone with 3.4.2-SNAP, with no code change in between.

When you package your Mule app, can you confirm that both the AMQP transport 3.4.1 and the AMQP client 3.2.1 are in the /lib directory? And also that no other versions of these are in?

@radhika123456789

I am having AMQP transport 3.4.1 and the AMQP client 3.2.1 are in the lib directory.

@chaserb
chaserb commented Feb 26, 2014

Thanks David, you are correct. I've verified the issue is resolved with the released 3.4.1 of mule-transport-amqp.

The SHA signatures of the pom and jar in my .m2 repository did not match the values hosted on the MuleForge repository, and I realized we had an invalid artifact in our Artifactory. Rebuilt without the aid of Artifactory and the mule app reconnected correctly.

@ddossot
ddossot commented Feb 26, 2014

Gad to read this, thanks for taking the time to report back.

@ddossot ddossot closed this Feb 26, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.