Skip to content

Commit

Permalink
Merge 34f9cb8 into 991c85f
Browse files Browse the repository at this point in the history
  • Loading branch information
hluk committed Feb 20, 2024
2 parents 991c85f + 34f9cb8 commit dfcebd4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
23 changes: 15 additions & 8 deletions resultsdb/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import abc
import json
from threading import Lock

import pkg_resources
import stomp
Expand Down Expand Up @@ -216,6 +217,9 @@ def __init__(self, **kwargs):
if getattr(self, attr, None) is None:
raise ValueError(f"Missing {attr!r} option for STOMP messaging plugin")

self.conn = None
self.conn_lock = Lock()

def publish(self, msg):
# Add telemetry information. This includes an extra key
# traceparent.
Expand All @@ -227,17 +231,20 @@ def publish(self, msg):

@retry(stop=STOMP_RETRY_STOP, wait=STOMP_RETRY_WAIT, reraise=True)
def _publish_with_retry(self, **kwargs):
conn = stomp.connect.StompConnection11(**self.connection)
with self.conn_lock:
if self.conn is None or not self.conn.is_connected():
log.info("Connecting to message bus")
self.conn = stomp.connect.StompConnection11(**self.connection)

if self.use_ssl:
conn.set_ssl(**self.ssl_args)
if self.use_ssl:
self.conn.set_ssl(**self.ssl_args)

conn.connect(wait=True)
try:
conn.send(**kwargs)
# Connection will be forcefully closed after the app exits.
# This is not ideal but WSGI apps have no suitable handler.
self.conn.connect(wait=True)

self.conn.send(**kwargs)
log.debug("Published message through stomp: %s", kwargs["body"])
finally:
conn.disconnect()


def load_messaging_plugin(name, plugin_args):
Expand Down
3 changes: 0 additions & 3 deletions testing/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def test_stomp_publish(self, mock_stomp):
plugin.publish({})
mock_stomp().connect.assert_called_once()
mock_stomp().send.assert_called_once()
mock_stomp().disconnect.assert_called_once()

def test_stomp_publish_connect_failed(self, mock_stomp):
plugin = messaging.load_messaging_plugin("stomp", MESSAGE_BUS_KWARGS)
Expand All @@ -214,7 +213,6 @@ def test_stomp_publish_connect_failed(self, mock_stomp):

assert mock_stomp().connect.call_count == 3
mock_stomp().send.assert_not_called()
mock_stomp().disconnect.assert_not_called()

def test_stomp_publish_send_failed(self, mock_stomp):
plugin = messaging.load_messaging_plugin("stomp", MESSAGE_BUS_KWARGS)
Expand All @@ -225,7 +223,6 @@ def test_stomp_publish_send_failed(self, mock_stomp):

assert mock_stomp().connect.call_count == 3
assert mock_stomp().send.call_count == 3
assert mock_stomp().disconnect.call_count == 3


class TestGetResultsParseArgs:
Expand Down

0 comments on commit dfcebd4

Please sign in to comment.