Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

AMQP Message Receiver broken in a Cluster #15

Closed
johndemic opened this issue Feb 22, 2013 · 10 comments
Closed

AMQP Message Receiver broken in a Cluster #15

johndemic opened this issue Feb 22, 2013 · 10 comments

Comments

@johndemic
Copy link

AMQP message reception doesn't work when deployed to a cluster. See the below stack trace. The flow in question works fine in standalone.

ERROR 2013-02-21 20:54:27,671 [[poc-1.0.0-SNAPSHOT-v20130221-21+52].amqpConnector.receiver.01] com.hazelcast.nio.AbstractSerializer: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
com.hazelcast.nio.HazelcastSerializationException: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at com.hazelcast.nio.AbstractSerializer.toByte(AbstractSerializer.java:110)
at com.hazelcast.nio.AbstractSerializer.toByteArray(AbstractSerializer.java:131)
at com.hazelcast.nio.Serializer.writeObject(Serializer.java:52)
at com.hazelcast.impl.ThreadContext.toData(ThreadContext.java:110)
at com.hazelcast.nio.IOUtil.toData(IOUtil.java:136)
at com.hazelcast.impl.BaseManager$RequestBasedCall.setLocal(BaseManager.java:474)
at com.hazelcast.impl.BaseManager$ResponseQueueCall.setLocal(BaseManager.java:503)
at com.hazelcast.impl.ConcurrentMapManager$MPut.txnalPut(ConcurrentMapManager.java:1744)
at com.hazelcast.impl.ConcurrentMapManager$MPut.txnalPut(ConcurrentMapManager.java:1699)
at com.hazelcast.impl.ConcurrentMapManager$MPut.put(ConcurrentMapManager.java:1572)
at com.hazelcast.impl.MProxyImpl$MProxyReal.put(MProxyImpl.java:623)
at com.hazelcast.impl.MProxyImpl$MProxyReal.put(MProxyImpl.java:602)
at com.hazelcast.impl.MProxyImpl.put(MProxyImpl.java:173)
at com.hazelcast.impl.MProxyImpl.put(MProxyImpl.java:124)
at com.mulesoft.mule.cluster.hazelcast.HazelcastQueueInfoDelegate.storeObject(HazelcastQueueInfoDelegate.java:348)
at com.mulesoft.mule.cluster.hazelcast.HazelcastObjectStore.store(HazelcastObjectStore.java:157)
at com.mulesoft.mule.cluster.hazelcast.HazecastQueueObjectStoreAdapter.store(HazecastQueueObjectStoreAdapter.java:35)
at org.mule.util.queue.TransactionalQueueManager.doStore(TransactionalQueueManager.java:198)
at org.mule.util.queue.TransactionalQueueSession$QueueImpl.offer(TransactionalQueueSession.java:72)
at org.mule.util.queue.TransactionalQueueSession$QueueImpl.put(TransactionalQueueSession.java:58)
at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:118)
at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:92)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:93)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:93)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:102)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:93)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:57)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.access$001(InterceptingChainLifecycleWrapper.java:29)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper$1.process(InterceptingChainLifecycleWrapper.java:90)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:85)
at org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:194)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.SimpleMessageProcessorChain.doProcess(SimpleMessageProcessorChain.java:47)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:57)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.access$001(InterceptingChainLifecycleWrapper.java:29)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper$1.process(InterceptingChainLifecycleWrapper.java:90)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:85)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:220)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:202)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:194)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:181)
at org.mule.transport.amqp.AmqpMessageReceiver$AmqpMessageRouterWork.run(AmqpMessageReceiver.java:243)
at org.mule.work.WorkerContext.run(WorkerContext.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
at org.apache.commons.collections.map.AbstractHashedMap.doWriteObject(AbstractHashedMap.java:1182)
at org.mule.util.CaseInsensitiveHashMap.writeObject(CaseInsensitiveHashMap.java:145)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
at org.mule.DefaultMuleEvent.writeObject(DefaultMuleEvent.java:881)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeUnshared(ObjectOutputStream.java:413)
at com.hazelcast.nio.DefaultSerializer$ObjectSerializer.writeNormal(DefaultSerializer.java:433)
at com.hazelcast.nio.DefaultSerializer$ObjectSerializer.write(DefaultSerializer.java:387)
at com.hazelcast.nio.DefaultSerializer.write(DefaultSerializer.java:128)
at com.hazelcast.nio.CustomSerializerAdapter.write(CustomSerializerAdapter.java:28)
at com.hazelcast.nio.AbstractSerializer.toByte(AbstractSerializer.java:107)
... 77 more
ERROR 2013-02-21 20:54:27,690 [[poc-1.0.0-SNAPSHOT-v20130221-21+52].amqpConnector.receiver.01] org.mule.transport.amqp.AmqpMessageReceiver$AmqpMessageRouterWork: Impossible to route: org.mule.transport.amqp.AmqpMessage@196f403a[consumerTag=amq.ctag-A-9g_PpjZlJcAEQ__uA4_A,envelope=com.rabbitmq.client.Envelope@559552e4,body={60,102,111,111,50,47,62},properties=#contentHeader(content-type=text/plain;charset=UTF-8, content-encoding=UTF-8, headers={MULE_SESSION=rO0ABXNyACNvcmcubXVsZS5zZXNzaW9uLkRlZmF1bHRNdWxlU2Vzc2lvbi7rdtEW7GGKAwAEWgAFdmFsaWRMAA1mbG93Q29uc3RydWN0dAAmTG9yZy9tdWxlL2FwaS9jb25zdHJ1Y3QvRmxvd0NvbnN0cnVjdDtMAAJpZHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAD3NlY3VyaXR5Q29udGV4dHQAJ0xvcmcvbXVsZS9hcGkvc2VjdXJpdHkvU2VjdXJpdHlDb250ZXh0O3hwAXB0ACQyYmY2ZGRkNy03YzliLTExZTItOTBiYi04YjhiMmIxNzRjZjNwc3IAJWphdmEudXRpbC5Db2xsZWN0aW9ucyRTeW5jaHJvbml6ZWRNYXAbc/kJS0s5ewMAAkwAAW10AA9MamF2YS91dGlsL01hcDtMAAVtdXRleHQAEkxqYXZhL2xhbmcvT2JqZWN0O3hwc3IAJG9yZy5tdWxlLnV0aWwuQ2FzZUluc2Vuc2l0aXZlSGFzaE1hcJ3R2e9nRc4AAwAAeHB3DD9AAAAAAAAQAAAAAnQAFVJFREJPWF9UUkFOU0FDVElPTl9JRHQAJGQwYWEwODdjLTRkYTctNDBiMy1iYWVhLWU4OWZjNzZkMWFlYnQAE1JFREJPWF9NRVNTQUdFX05BTUV0ABJDdXN0b21lclN1YnNjcmliZWR4cQB+AAl4eA==, MULE_CORRELATION_GROUP_SIZE=3, MULE_ENCODING=UTF-8, MULE_CORRELATION_ID=d0aa087c-4da7-40b3-baea-e89fc76d1aeb, MULE_ROOT_MESSAGE_ID=2bf493e5-7c9b-11e2-90bb-8b8b2b174cf3, MULE_ENDPOINT=amqp://Subscription-V1_0, Content-Type=text/plain;charset=UTF-8}, delivery-mode=2, priority=0, correlation-id=d0aa087c-4da7-40b3-baea-e89fc76d1aeb, reply-to=null, expiration=null, message-id=2b45408a-7c9b-11e2-a67c-a10b3cf87a8b, timestamp=Thu Feb 21 20:54:26 CST 2013, type=null, user-id=null, app-id=null, cluster-id=null)]
org.mule.api.service.FailedToQueueEventException: Interrupted while queueing event for "SEDA Stage SubscriptionStatus.BC.stage1". Message payload is of type: byte[]
at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:96)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:93)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:93)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:55)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.AbstractInterceptingMessageProcessorBase.processNext(AbstractInterceptingMessageProcessorBase.java:105)
at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:102)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.DefaultMessageProcessorChain.doProcess(DefaultMessageProcessorChain.java:93)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:57)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.access$001(InterceptingChainLifecycleWrapper.java:29)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper$1.process(InterceptingChainLifecycleWrapper.java:90)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:85)
at org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:194)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.SimpleMessageProcessorChain.doProcess(SimpleMessageProcessorChain.java:47)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.doProcess(InterceptingChainLifecycleWrapper.java:57)
at org.mule.processor.chain.AbstractMessageProcessorChain.process(AbstractMessageProcessorChain.java:66)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.access$001(InterceptingChainLifecycleWrapper.java:29)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper$1.process(InterceptingChainLifecycleWrapper.java:90)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:46)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:43)
at org.mule.processor.chain.InterceptingChainLifecycleWrapper.process(InterceptingChainLifecycleWrapper.java:85)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:220)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:202)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:194)
at org.mule.transport.AbstractMessageReceiver.routeMessage(AbstractMessageReceiver.java:181)
at org.mule.transport.amqp.AmqpMessageReceiver$AmqpMessageRouterWork.run(AmqpMessageReceiver.java:243)
at org.mule.work.WorkerContext.run(WorkerContext.java:311)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.RuntimeException: com.hazelcast.nio.HazelcastSerializationException: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at com.hazelcast.nio.AbstractSerializer.toByteArray(AbstractSerializer.java:139)
at com.hazelcast.nio.Serializer.writeObject(Serializer.java:52)
at com.hazelcast.impl.ThreadContext.toData(ThreadContext.java:110)
at com.hazelcast.nio.IOUtil.toData(IOUtil.java:136)
at com.hazelcast.impl.BaseManager$RequestBasedCall.setLocal(BaseManager.java:474)
at com.hazelcast.impl.BaseManager$ResponseQueueCall.setLocal(BaseManager.java:503)
at com.hazelcast.impl.ConcurrentMapManager$MPut.txnalPut(ConcurrentMapManager.java:1744)
at com.hazelcast.impl.ConcurrentMapManager$MPut.txnalPut(ConcurrentMapManager.java:1699)
at com.hazelcast.impl.ConcurrentMapManager$MPut.put(ConcurrentMapManager.java:1572)
at com.hazelcast.impl.MProxyImpl$MProxyReal.put(MProxyImpl.java:623)
at com.hazelcast.impl.MProxyImpl$MProxyReal.put(MProxyImpl.java:602)
at com.hazelcast.impl.MProxyImpl.put(MProxyImpl.java:173)
at com.hazelcast.impl.MProxyImpl.put(MProxyImpl.java:124)
at com.mulesoft.mule.cluster.hazelcast.HazelcastQueueInfoDelegate.storeObject(HazelcastQueueInfoDelegate.java:348)
at com.mulesoft.mule.cluster.hazelcast.HazelcastObjectStore.store(HazelcastObjectStore.java:157)
at com.mulesoft.mule.cluster.hazelcast.HazecastQueueObjectStoreAdapter.store(HazecastQueueObjectStoreAdapter.java:35)
at org.mule.util.queue.TransactionalQueueManager.doStore(TransactionalQueueManager.java:198)
at org.mule.util.queue.TransactionalQueueSession$QueueImpl.offer(TransactionalQueueSession.java:72)
at org.mule.util.queue.TransactionalQueueSession$QueueImpl.put(TransactionalQueueSession.java:58)
at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:118)
at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:92)
... 56 more
Caused by: com.hazelcast.nio.HazelcastSerializationException: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at com.hazelcast.nio.AbstractSerializer.toByte(AbstractSerializer.java:110)
at com.hazelcast.nio.AbstractSerializer.toByteArray(AbstractSerializer.java:131)
... 76 more
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.ChannelN
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
at org.apache.commons.collections.map.AbstractHashedMap.doWriteObject(AbstractHashedMap.java:1182)
at org.mule.util.CaseInsensitiveHashMap.writeObject(CaseInsensitiveHashMap.java:145)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
at org.mule.DefaultMuleEvent.writeObject(DefaultMuleEvent.java:881)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeUnshared(ObjectOutputStream.java:413)
at com.hazelcast.nio.DefaultSerializer$ObjectSerializer.writeNormal(DefaultSerializer.java:433)
at com.hazelcast.nio.DefaultSerializer$ObjectSerializer.write(DefaultSerializer.java:387)
at com.hazelcast.nio.DefaultSerializer.write(DefaultSerializer.java:128)
at com.hazelcast.nio.CustomSerializerAdapter.write(CustomSerializerAdapter.java:28)
at com.hazelcast.nio.AbstractSerializer.toByte(AbstractSerializer.java:107)
... 77 more

@ddossot
Copy link
Contributor

ddossot commented Feb 22, 2013

So in clustering there is message serialization that kicks in, which fails because of the channel that's added to the message event by the receiver.

When does this serialization occur? Can you show the flow and point where serialization occurs?

@johndemic
Copy link
Author

Here's the flow. I'm assuming the serialization is happening on the AMQP inbound-endpoint. Note that I'm using the byte-array-to-string-transformer to work around an issue with streaming payloads not being supported in the cluster.

https://gist.github.com/johndemic/dea6405df93e93d5f5c9

@ddossot
Copy link
Contributor

ddossot commented Feb 22, 2013

Before trying to figure out a potential fix, I wondering if the serialization is a side-effect of the thread hand-off that occurs between one-way inbound endpoints and the rest of the flow when the processing strategy is not set (default).

Do you mind trying to set the processing strategy of the flow to synchronous? The idea is to try piggybacking on the thread of the inbound endpoint across the flow and see if it prevents serialization.

@johndemic
Copy link
Author

Thanks David, setting the processing strategy of the flow to synchronous fixes the issue. You're additional suggestion to remove the amqp.channel invocation property prior to the message aggregator was also on the money, this fixed the similar java.io.NotSerializableException.

Thanks!

@rhoegg
Copy link

rhoegg commented Jun 5, 2013

Removing the amqp.channel flow variable resolves this issue for me as well, and I don't see any negative consequences so far. Perhaps we can simply not store the Channel as a message property? Requiring flow developers to <remove-variable variableName="amqp.channel" /> when switching exchange patterns in a flow seems sub-optimal.

@ddossot
Copy link
Contributor

ddossot commented Jun 5, 2013

The channel is stored in a flow variable to allow constructs like acknowledge-message, reject-message and recover to work, as they need a Channel instance to do so.

I would be very happy to change this, ie not storing the channel in a flow variable, but I haven't yet found a possible way out. Any suggestion?

@rhoegg
Copy link

rhoegg commented Jun 5, 2013

Your point is good, the Channel that the consumer started with needs to be the one we use to ack, nack, and recover. My current thinking is that we won't be able to use that channel after a SEDA queue hop (because we are potentially on another node or even on the same node after restart). So perhaps we need to require a synchronous processing strategy in order to do manual acknowledgement.

Is there a way for a message processor to force the flow construct to a synchronous processing strategy?

@ddossot
Copy link
Contributor

ddossot commented Jun 5, 2013

I guess a MP can programmatically change its encompassing flow processing strategy but I don't think this would be advisable. I think it's better to have the developer in command.

This said, why not making the AMQP inbound endpoint detect that it is in flow whose processing strategy will clash with storing the channel in a flow var? In that case, it could simply not do it. On top of that, if a dev uses a construct that requires the channel to be present, the error message could be tailored based on the flow strategy too so be as informative as possible.

@ddossot
Copy link
Contributor

ddossot commented Jun 6, 2013

Re-opening in order to give this approach I just suggested a shot.

@ddossot ddossot reopened this Jun 6, 2013
@ddossot
Copy link
Contributor

ddossot commented Jun 7, 2013

It is harder than I thought to reliably detect the condition where the channel shouldn't be added as a flow variable, which is basically when the object store used to store messages between SEDA stages requires objects to be serialized.

So I'm closing this again with the following recommendation:

When using SEDA flows in a cluster environment, configure the connector with ackMode="AMQP_AUTO" or ackMode="MULE_AUTO"

@ddossot ddossot closed this as completed Jun 7, 2013
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants