Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixed issue with stomper adding a space in ACK message-id header - AM…

…Q 5.6 does not allow this
  • Loading branch information...
commit 968556e016ee79ab9c437c7eb9fda1d038c3af66 1 parent ea9cb08
Roger Hoover theduderog authored
8 stompest/async.py
View
@@ -153,7 +153,13 @@ def _ack(self, messageId):
"""Send ack command
"""
self.log.debug('Sending ack command for message: %s' % messageId)
- cmd = stomper.ack(messageId)
+ #NOTE: cannot use stomper.ack(messageId) with ActiveMQ 5.6.0
+ # b/c stomper adds a space to the header and AMQ no longer accepts it
+ frame = stomper.Frame()
+ frame.cmd = 'ACK'
+ frame.headers = {'message-id': messageId}
+ cmd = frame.pack()
+ #cmd = stomper.ack(messageId)
# self.log.debug('Writing cmd: %s' % cmd)
self.transport.write(cmd)
69 stompest/tests/async_stomp_client_integration_test.py
View
@@ -30,38 +30,12 @@
class StompestTestError(Exception):
pass
-def getClientAckMode():
- mode = 'client'
- if supportsClientIndividual():
- mode = 'client-individual'
- return mode
-
-def supportsClientIndividual():
- supported = False
- queue = '/queue/testClientAckMode'
- stomp = Stomp(HOST, PORT)
- stomp.connect()
- stomp.send(queue, 'test')
- stomp.subscribe(queue, {'ack': 'client-individual'})
- frame = stomp.receiveFrame()
- #Do not ACK. If client-individual mode is supported, the messages will still be on the broker
- stomp.disconnect()
- stomp.connect()
- stomp.subscribe(queue, {'ack': 'client'})
- if stomp.canRead(1):
- frame = stomp.receiveFrame()
- stomp.ack(frame)
- supported = True
- stomp.disconnect()
- return supported
-
class HandlerExceptionWithErrorQueueIntegrationTestCase(unittest.TestCase):
msg1 = 'choke on this'
msg1Hdrs = {'food': 'barf', 'persistent': 'true'}
msg2 = 'follow up message'
queue = '/queue/asyncStompestHandlerExceptionWithErrorQueueUnitTest'
errorQueue = '/queue/zzz.error.asyncStompestHandlerExceptionWithErrorQueueUnitTest'
- ackMode = getClientAckMode()
def cleanQueues(self):
self.cleanQueue(self.queue)
@@ -78,6 +52,8 @@ def cleanQueue(self, queue):
stomp.disconnect()
def setUp(self):
+ self.cleanQueues()
+
self.unhandledMsg = None
self.errQMsg = None
self.consumedMsg = None
@@ -85,8 +61,6 @@ def setUp(self):
@defer.inlineCallbacks
def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_disconnect(self):
- self.cleanQueues()
-
config = StompConfig(HOST, PORT)
creator = StompCreator(config, alwaysDisconnectOnUnhandledMsg=True)
@@ -99,7 +73,7 @@ def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_discon
#Barf on first message so it will get put in error queue
#Use selector to guarantee message order. AMQ doesn't not guarantee order by default
- stomp.subscribe(self.queue, self._saveMsgAndBarf, {'ack': self.ackMode, 'activemq.prefetchSize': 1, 'selector': "food = 'barf'"}, errorDestination=self.errorQueue)
+ stomp.subscribe(self.queue, self._saveMsgAndBarf, {'ack': 'client-individual', 'activemq.prefetchSize': 1, 'selector': "food = 'barf'"}, errorDestination=self.errorQueue)
#Client disconnected and returned error
try:
@@ -111,14 +85,14 @@ def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_discon
#Reconnect and subscribe again - consuming second message then disconnecting
stomp = yield creator.getConnection()
- stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': self.ackMode, 'activemq.prefetchSize': 1}, errorDestination=self.errorQueue)
+ stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': 'client-individual', 'activemq.prefetchSize': 1}, errorDestination=self.errorQueue)
#Client disconnects without error
yield stomp.getDisconnectedDeferred()
#Reconnect and subscribe to error queue
stomp = yield creator.getConnection()
- stomp.subscribe(self.errorQueue, self._saveErrMsgAndDisconnect, {'ack': self.ackMode, 'activemq.prefetchSize': 1})
+ stomp.subscribe(self.errorQueue, self._saveErrMsgAndDisconnect, {'ack': 'client-individual', 'activemq.prefetchSize': 1})
#Wait for disconnect
yield stomp.getDisconnectedDeferred()
@@ -133,8 +107,6 @@ def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_discon
@defer.inlineCallbacks
def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_no_disconnect(self):
- self.cleanQueues()
-
config = StompConfig(HOST, PORT)
creator = StompCreator(config)
@@ -146,14 +118,14 @@ def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_no_dis
stomp.send(self.queue, self.msg2)
#Barf on first msg, disconnect on second msg
- stomp.subscribe(self.queue, self._barfOneEatOneAndDisonnect, {'ack': self.ackMode, 'activemq.prefetchSize': 1}, errorDestination=self.errorQueue)
+ stomp.subscribe(self.queue, self._barfOneEatOneAndDisonnect, {'ack': 'client-individual', 'activemq.prefetchSize': 1}, errorDestination=self.errorQueue)
#Client disconnects without error
yield stomp.getDisconnectedDeferred()
#Reconnect and subscribe to error queue
stomp = yield creator.getConnection()
- stomp.subscribe(self.errorQueue, self._saveErrMsgAndDisconnect, {'ack': self.ackMode, 'activemq.prefetchSize': 1})
+ stomp.subscribe(self.errorQueue, self._saveErrMsgAndDisconnect, {'ack': 'client-individual', 'activemq.prefetchSize': 1})
#Wait for disconnect
yield stomp.getDisconnectedDeferred()
@@ -164,8 +136,6 @@ def test_onhandlerException_ackMessage_filterReservedHdrs_send2ErrorQ_and_no_dis
@defer.inlineCallbacks
def test_onhandlerException_disconnect(self):
- self.cleanQueues()
-
config = StompConfig(HOST, PORT)
creator = StompCreator(config)
@@ -176,7 +146,7 @@ def test_onhandlerException_disconnect(self):
stomp.send(self.queue, self.msg1, self.msg1Hdrs)
#Barf on first msg (implicit disconnect)
- stomp.subscribe(self.queue, self._saveMsgAndBarf, {'ack': self.ackMode, 'activemq.prefetchSize': 1})
+ stomp.subscribe(self.queue, self._saveMsgAndBarf, {'ack': 'client-individual', 'activemq.prefetchSize': 1})
#Client disconnected and returned error
try:
@@ -188,7 +158,7 @@ def test_onhandlerException_disconnect(self):
#Reconnect and subscribe again - consuming retried message and disconnecting
stomp = yield creator.getConnection()
- stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': self.ackMode, 'activemq.prefetchSize': 1})
+ stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': 'client-individual', 'activemq.prefetchSize': 1})
#Client disconnects without error
yield stomp.getDisconnectedDeferred()
@@ -224,8 +194,23 @@ class GracefulDisconnectTestCase(unittest.TestCase):
msgCount = 0
msg = 'test'
queue = '/queue/aysncStompestGracefulDisconnectUnitTest'
- ackMode = getClientAckMode()
+ def setUp(self):
+ self.cleanQueues()
+
+ def cleanQueues(self):
+ self.cleanQueue(self.queue)
+
+ def cleanQueue(self, queue):
+ stomp = Stomp(HOST, PORT)
+ stomp.connect()
+ stomp.subscribe(queue, {'ack': 'client'})
+ while stomp.canRead(1):
+ frame = stomp.receiveFrame()
+ stomp.ack(frame)
+ print "Dequeued old message: %s" % frame
+ stomp.disconnect()
+
@defer.inlineCallbacks
def test_onDisconnect_waitForOutstandingMessagesToFinish(self):
self.config = StompConfig(HOST, PORT)
@@ -239,7 +224,7 @@ def test_onDisconnect_waitForOutstandingMessagesToFinish(self):
for i in range(self.numMsgs):
stomp.send(self.queue, self.msg)
- stomp.subscribe(self.queue, self._msgHandler, {'ack': self.ackMode, 'activemq.prefetchSize': self.numMsgs})
+ stomp.subscribe(self.queue, self._msgHandler, {'ack': 'client-individual', 'activemq.prefetchSize': self.numMsgs})
#Wait for disconnect
yield stomp.getDisconnectedDeferred()
@@ -248,7 +233,7 @@ def test_onDisconnect_waitForOutstandingMessagesToFinish(self):
stomp = yield creator.getConnection()
self.timeExpired = False
self.timeoutDelayedCall = reactor.callLater(1, self._timesUp, stomp)
- stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': self.ackMode, 'activemq.prefetchSize': self.numMsgs})
+ stomp.subscribe(self.queue, self._eatOneMsgAndDisconnect, {'ack': 'client-individual', 'activemq.prefetchSize': self.numMsgs})
#Wait for disconnect
yield stomp.getDisconnectedDeferred()
Please sign in to comment.
Something went wrong with that request. Please try again.