Permalink
Browse files

Health check for bidir pooled channels (OOIION-693)

  • Loading branch information...
1 parent 18040ff commit 42509ad9777606aa7fc056fadce79ab8b70c100d @daf daf committed Jan 25, 2013
Showing with 97 additions and 12 deletions.
  1. +62 −12 pyon/net/messaging.py
  2. +35 −0 pyon/net/test/test_messaging.py
View
@@ -144,6 +144,7 @@ def __init__(self):
self._pool = IDPool()
self._bidir_pool = {} # maps inactive/active our numbers (from self._pool) to channels
self._pool_map = {} # maps active pika channel numbers to our numbers (from self._pool)
+ self._dead_pool = [] # channels removed from pool for failing health test, for later forensics
BaseNode.__init__(self)
@@ -184,6 +185,23 @@ def _new_transport(self, ch_number=None):
transport = AMQPTransport(amq_chan)
return transport
+ def _check_pooled_channel_health(self, ch):
+ """
+ Returns true if the channel has the proper callbacks in pika for delivery.
+
+ We're seeing an issue where channels are considered open and consuming by pika, RabbitMQ,
+ and our layer, but the "callbacks" mechanism in pika does not have any entries for
+ delivering messages to our layer, therefore messages are being dropped. Rabbit is happily
+ sending messages along, resulting in large numbers of UNACKED messages.
+
+ If this method returns false, the channel should be discarded and a new one created.
+ """
+ cbs = self.client.callbacks._callbacks
+ if "_on_basic_deliver" not in cbs[ch.get_channel_id()].iterkeys():
+ return False
+
+ return True
+
def channel(self, ch_type, transport=None):
"""
Creates a Channel object with an underlying transport callback and returns it.
@@ -194,18 +212,50 @@ def channel(self, ch_type, transport=None):
with self._lock:
# having _queue_auto_delete on is a pre-req to being able to pool.
if ch_type == channel.BidirClientChannel and not ch_type._queue_auto_delete:
- chid = self._pool.get_id()
- if chid in self._bidir_pool:
- log.debug("BidirClientChannel requested, pulling from pool (%d)", chid)
- assert not chid in self._pool_map.values()
- ch = self._bidir_pool[chid]
- self._pool_map[ch.get_channel_id()] = chid
- else:
- log.debug("BidirClientChannel requested, no pool items available, creating new (%d)", chid)
- ch = self._new_channel(ch_type, transport=transport)
- ch.set_close_callback(self.on_channel_request_close)
- self._bidir_pool[chid] = ch
- self._pool_map[ch.get_channel_id()] = chid
+
+ # only attempt this 5 times - somewhat arbitrary but we can't have an infinite loop here
+ attempts = 5
+ while attempts > 0:
+ attempts -= 1
+
+ chid = self._pool.get_id()
+ if chid in self._bidir_pool:
+ log.debug("BidirClientChannel requested, pulling from pool (%d)", chid)
+ assert not chid in self._pool_map.values()
+
+ # we need to check the health of this bidir channel
+ ch = self._bidir_pool[chid]
+ if not self._check_pooled_channel_health(ch):
+ log.warning("Channel (%d) failed health check, removing from pool", ch.get_channel_id())
+
+ # return chid to the id pool
+ self._pool.release_id(chid)
+
+ # remove this channel from the pool, put into dead pool
+ self._dead_pool.append(ch)
+ del self._bidir_pool[chid]
+
+ # now close the channel (must remove our close callback which returns it to the pool)
+ assert ch._close_callback == self.on_channel_request_close
+ ch._close_callback = None
+ ch.close()
+
+ # resume the loop to attempt to get one again
+ continue
+
+ self._pool_map[ch.get_channel_id()] = chid
+ else:
+ log.debug("BidirClientChannel requested, no pool items available, creating new (%d)", chid)
+ ch = self._new_channel(ch_type, transport=transport)
+ ch.set_close_callback(self.on_channel_request_close)
+ self._bidir_pool[chid] = ch
+ self._pool_map[ch.get_channel_id()] = chid
+
+ # channel here is valid, exit out of attempts loop
+ break
+ else: # while loop didn't get a valid channel in X attempts
+ raise StandardError("Could not get a valid channel")
+
else:
ch = self._new_channel(ch_type, transport=transport)
assert ch
@@ -16,6 +16,7 @@
import time
from pyon.util.containers import DotDict
from pika.exceptions import NoFreeChannels
+from interface.services.icontainer_agent import ContainerAgentClient
@attr('UNIT')
class TestNodeB(PyonTestCase):
@@ -355,3 +356,37 @@ def test__next_channel_number_all_used_either_channels_or_bad(self):
self.conn.mark_bad_channel(x)
self.assertRaises(NoFreeChannels, self.conn._next_channel_number)
+
+@attr('INT')
+class TestNodeBInt(IonIntegrationTestCase):
+ def setUp(self):
+ self._start_container()
+ self.ccc = ContainerAgentClient(to_name=self.container.name)
+ self.node = self.container.node
+
+ patcher = patch('pyon.net.channel.RecvChannel._queue_auto_delete', False)
+ patcher.start()
+ self.addCleanup(patcher.stop)
+
+ def test_pool_health_check(self):
+
+ # make a request, thus making a bidir item
+ self.ccc.status()
+ self.assertEquals(1, len(self.node._bidir_pool))
+ curpoolchids = [o.get_channel_id() for o in self.node._bidir_pool.itervalues()]
+
+ # fake that this channel has been corrupted in pika
+ ch = self.node._bidir_pool.values()[0]
+ chnum = ch.get_channel_id()
+ del self.node.client.callbacks._callbacks[chnum]['_on_basic_deliver']
+
+ # make another request
+ self.ccc.status()
+
+ # should have killed our last channel, gotten a new one
+ self.assertEquals(1, len(self.node._bidir_pool))
+ self.assertNotEquals(curpoolchids, [o.get_channel_id() for o in self.node._bidir_pool.itervalues()])
+ self.assertNotIn(ch, self.node._bidir_pool.itervalues())
+ self.assertIn(ch, self.node._dead_pool)
+
+

0 comments on commit 42509ad

Please sign in to comment.