Skip to content

Commit

Permalink
[AMBARI-24679] Fix race condition in agent during registration and to…
Browse files Browse the repository at this point in the history
…pology updates. (apache#2368)
  • Loading branch information
avijayanhwx authored and Vishal Suvagia committed Oct 1, 2018
1 parent 8436744 commit 217a5d8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
6 changes: 5 additions & 1 deletion ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def register(self):
logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
raise
finally:
listener.enabled = True
with listener.event_queue_lock:
logger.info("Enabling events for listener {0}".format(listener))
listener.enabled = True
# Process queued messages if any
listener.dequeue_unprocessed_events()

self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)

Expand Down
38 changes: 35 additions & 3 deletions ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,40 @@
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
from ambari_agent import Constants
from ambari_agent.Utils import Utils
from Queue import Queue
import threading

logger = logging.getLogger(__name__)

class EventListener(ambari_stomp.ConnectionListener):

unprocessed_messages_queue = Queue(100)

"""
Base abstract class for event listeners on specific topics.
"""
def __init__(self, initializer_module):
self.initializer_module = initializer_module
self.enabled = True
self.event_queue_lock = threading.RLock()

def dequeue_unprocessed_events(self):
while not self.unprocessed_messages_queue.empty():
payload = self.unprocessed_messages_queue.get_nowait()
if payload:
logger.info("Processing event from unprocessed queue {0} {1}".format(payload[0], payload[1]))
destination = payload[0]
headers = payload[1]
message_json = payload[2]
message = payload[3]
try:
self.on_event(headers, message_json)
except Exception as ex:
logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
self.report_status_to_sender(headers, message, ex)
else:
self.report_status_to_sender(headers, message)


def on_message(self, headers, message):
"""
Expand All @@ -58,13 +82,21 @@ def on_message(self, headers, message):
logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json))))

if not self.enabled:
logger.info("Ignoring event to {0} since event listener is disabled".format(destination))
return
with self.event_queue_lock:
if not self.enabled:
logger.info("Queuing event as unprocessed {0} since event "
"listener is disabled".format(destination))
try:
self.unprocessed_messages_queue.put_nowait((destination, headers, message_json, message))
except Exception as ex:
logger.warning("Cannot queue any more unprocessed events since "
"queue is full! {0} {1}".format(destination, message))
return

try:
self.on_event(headers, message_json)
except Exception as ex:
logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
self.report_status_to_sender(headers, message, ex)
else:
self.report_status_to_sender(headers, message)
Expand Down

0 comments on commit 217a5d8

Please sign in to comment.