Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add auto_delete to kwargs of EventSubscribers

  • Loading branch information...
commit 4a9ef16fa3f7d0ccc84f9a536dfd14e8340ec896 1 parent 80c521f
@daf daf authored
View
13 pyon/event/event.py
@@ -188,7 +188,7 @@ class EventSubscriber(Subscriber, BaseEventSubscriberMixin):
ALL_EVENTS = "#"
def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None, callback=None,
- sub_type=None, origin_type=None, pattern=None, *args, **kwargs):
+ sub_type=None, origin_type=None, pattern=None, auto_delete=None, *args, **kwargs):
"""
Initializer.
@@ -199,6 +199,7 @@ def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None,
Note: an EventSubscriber needs to be closed to free broker resources
"""
self._cbthread = None
+ self._auto_delete = auto_delete
BaseEventSubscriberMixin.__init__(self, xp_name=xp_name, event_type=event_type, origin=origin,
queue_name=queue_name, sub_type=sub_type, origin_type=origin_type, pattern=pattern)
@@ -229,6 +230,16 @@ def stop(self):
def __str__(self):
return "EventSubscriber at %s:\n\trecv_name: %s\n\tcb: %s" % (hex(id(self)), str(self._recv_name), str(self._callback))
+ def _create_channel(self, **kwargs):
+ """
+ Override to set the channel's queue_auto_delete property.
+ """
+ ch = Subscriber._create_channel(self, **kwargs)
+ if self._auto_delete is not None:
+ ch.queue_auto_delete = self._auto_delete
+
+ return ch
+
class EventRepository(object):
"""
Class that uses a data store to provide a persistent repository for ION events.
View
14 pyon/event/test/test_event.py
@@ -24,9 +24,21 @@
from pyon.core.exception import FilesystemError, StreamingError, CorruptionError
+@attr('UNIT',group='event')
+class TestEvents(IonUnitTestCase):
+ def test_event_subscriber_auto_delete(self):
+ mocknode = Mock()
+ ev = EventSubscriber(event_type="ProcessLifecycleEvent", callback=lambda *a,**kw: None, auto_delete=sentinel.auto_delete, node=mocknode)
+ self.assertEquals(ev._auto_delete, sentinel.auto_delete)
+
+ # we don't want to have to patch out everything here, so call initialize directly, which calls create_channel for us
+ ev._setup_listener = Mock()
+ ev.initialize(sentinel.binding)
+
+ self.assertEquals(ev._chan.queue_auto_delete, sentinel.auto_delete)
@attr('INT',group='event')
-class TestEvents(IonIntegrationTestCase):
+class TestEventsInt(IonIntegrationTestCase):
def setUp(self):
self._listens = []
View
14 pyon/ion/endpoint.py
@@ -411,7 +411,9 @@ def __str__(self):
#
class ProcessEventSubscriber(ProcessSubscriber, BaseEventSubscriberMixin):
def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None, callback=None,
- sub_type=None, origin_type=None, process=None, routing_call=None, *args, **kwargs):
+ sub_type=None, origin_type=None, process=None, routing_call=None, auto_delete=None, *args, **kwargs):
+
+ self._auto_delete = auto_delete
BaseEventSubscriberMixin.__init__(self, xp_name=xp_name, event_type=event_type, origin=origin,
queue_name=queue_name, sub_type=sub_type, origin_type=origin_type)
@@ -423,3 +425,13 @@ def __init__(self, xp_name=None, event_type=None, origin=None, queue_name=None,
def __str__(self):
return "ProcessEventSubscriber at %s:\n\trecv_name: %s\n\tprocess: %s\n\tcb: %s" % (hex(id(self)), str(self._recv_name), str(self._process), str(self._callback))
+ def _create_channel(self, **kwargs):
+ """
+ Override to set the channel's queue_auto_delete property.
+ """
+ ch = ProcessSubscriber._create_channel(self, **kwargs)
+ if self._auto_delete is not None:
+ ch.queue_auto_delete = self._auto_delete
+
+ return ch
+
Please sign in to comment.
Something went wrong with that request. Please try again.