Skip to content

Commit

Permalink
[VOTR-185] autodelete component queues (#94)
Browse files Browse the repository at this point in the history
* Revert "[Inf 250] feat(queue clean up): unbind component at shutdown (#86)"

This reverts commit 7099422.

* autodelete component queues
  • Loading branch information
zachschubert committed Aug 11, 2022
1 parent 3cc25fa commit b9e537f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 7 deletions.
8 changes: 2 additions & 6 deletions ergo/amqp_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ def __init__(self, invocable: FunctionInvocable) -> None:
component_queue_name = f"{self._invocable.config.func}".replace("/", ":")
if component_queue_name.startswith(":"):
component_queue_name = component_queue_name[1:]
self._component_queue = kombu.Queue(name=component_queue_name, exchange=self._exchange, routing_key=str(SubTopic(self._invocable.config.subtopic)), durable=False, channel=self._connection.channel())
self._component_queue = kombu.Queue(name=component_queue_name, exchange=self._exchange, routing_key=str(SubTopic(self._invocable.config.subtopic)), durable=False, autodelete=True)
instance_queue_name = f"{component_queue_name}:{instance_id()}"
self._instance_queue = kombu.Queue(name=instance_queue_name, exchange=self._exchange, routing_key=str(SubTopic(instance_id())), auto_delete=True)
error_queue_name = f"{component_queue_name}:error"
self._error_queue = kombu.Queue(name=error_queue_name, exchange=self._exchange, routing_key=error_queue_name, durable=False)

self._terminating = threading.Event()
self._pending_invocations = threading.Semaphore()
self._handler_lock = threading.Lock()
Expand All @@ -78,12 +79,8 @@ def start(self) -> int:
consumer: kombu.Consumer = conn.Consumer(queues=[self._component_queue, self._instance_queue], prefetch_count=PREFETCH_COUNT, accept=["json"])
consumer.register_callback(self._start_handle_message_thread)
consumer.consume()

while not self._terminating.is_set():
try:
# lazy load binding
# to prevent unbinding all consumers w same subtopic at shutdown
self._component_queue.queue_bind(channel=self._connection.channel())
# wait up to 1s for the next message before sending a heartbeat
conn.drain_events(timeout=1)
except socket.timeout:
Expand Down Expand Up @@ -149,6 +146,5 @@ def _producer(self) -> kombu.Producer:
def _shutdown(self, signum: int, *_: Any) -> None:
self._terminating.set()
self._pending_invocations.acquire(blocking=True, timeout=TERMINATION_GRACE_PERIOD)
self._component_queue.queue_unbind()
self._connection.close()
os.kill(os.getpid(), 0)
2 changes: 1 addition & 1 deletion ergo/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import subprocess
import sys

VERSION = '0.9.2'
VERSION = '0.10.0'


def get_version() -> str:
Expand Down

0 comments on commit b9e537f

Please sign in to comment.