Skip to content

Commit

Permalink
Fix issue where listener startup failure can cause extra exceptions.
Browse files Browse the repository at this point in the history
Sets the callback thread to daemon so that in all cases the failure to
start the server stops this thread and creates a specific method
for the callback thread shutdown so that it is shutdown in case
of http/https server exception during startup.

This fixes issue #3157 where the failure of the http server
in pywbemcli startup causes the subprocess to hang because the callback
thread was already created and was not shutdown in the exception.
  • Loading branch information
kschopmeyer authored and andy-maier committed Apr 18, 2024
1 parent 9986832 commit 55e2415
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ Released: not yet

**Bug fixes:**

* Fix issue where pywbemlistener that fails startup can cause thread
exception. See issue #3157

**Enhancements:**

**Cleanup:**
Expand Down
67 changes: 43 additions & 24 deletions pywbem/_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,11 +855,9 @@ def __init__(self, host, http_port=None, https_port=None,

self._callbacks = [] # Registered callback functions

# Set up callback queue and callback thread.
self.rcvd_indication_queue = queue.Queue()
self.callback_thread = StoppableThread(
target=self.deliver_indications_from_queue,
args=(self.rcvd_indication_queue,))
# define callback queue and callback thread.
self.rcvd_indication_queue = None
self.callback_thread = None

def __str__(self):
"""
Expand Down Expand Up @@ -1001,6 +999,22 @@ def logger(self):
"""
return self._logger

def stop_callback_thread(self):
"""
Stop the indication delivery thread and queue and eliminate them. This
allows the delivery of indications already in the queue. and then
closes the thread and queue.
"""
assert self.callback_thread
assert self.rcvd_indication_queue
self.logger.info("Stopping queued delivery")
self.callback_thread.stop()
self.logger.info("Joining queued delivery")
self.callback_thread.join()
self.logger.info("Join finished threaded Deliver queue")
self.callback_thread = None
self.rcvd_indication_queue = None

def start(self):
"""
Start the WBEM listener threads, if they are not yet running.
Expand Down Expand Up @@ -1038,9 +1052,16 @@ def start(self):
:exc:`py:OSError`: Other error
:exc:`py:IOError`: Other error (Python 2.7 only)
"""
# Start delivery queue
self.callback_thread.start()
self.logger.info("Callback queue thread started")
# Start delivery queue and thread if not started
if not self.callback_thread:
self.rcvd_indication_queue = queue.Queue()
self.callback_thread = StoppableThread(
target=self.deliver_indications_from_queue,
args=(self.rcvd_indication_queue,))
# This insures that thread stops when process stopped
self.callback_thread.daemon = True
self.callback_thread.start()
self.logger.info("Callback queue thread started")

if self._http_port:
if not self._http_server:
Expand All @@ -1050,6 +1071,7 @@ def start(self):
server = ThreadedHTTPServer((self._host, self._http_port),
ListenerRequestHandler)
except (IOError, OSError) as exc:
self.stop_callback_thread()
# Linux/macOS on py2: socket.error (derived from IOError);
# Linux/macOS on py3: OSError;
# Windows does not raise any exception if port is used
Expand Down Expand Up @@ -1087,6 +1109,7 @@ def start(self):
server = ThreadedHTTPServer((self._host, self._https_port),
ListenerRequestHandler)
except (IOError, OSError) as exc:
self.stop_callback_thread()
# Linux/macOS on py2: socket.error (derived from IOError);
# Linux/macOS on py3: OSError;
# Windows does not raise any exception if port is used
Expand Down Expand Up @@ -1174,7 +1197,7 @@ def password_prompt():

def stop(self):
"""
Stop the WBEM listener threads, if they are running.
Stop the WBEM listener threads and callback thread, if they are running.
"""

# Stopping the server will cause its `serve_forever()` method
Expand All @@ -1196,37 +1219,33 @@ def stop(self):
self._https_thread = None
self.logger.info("Stopped threaded Queue")

self.logger.info("Stopping queued delivery")
self.callback_thread.stop()
self.logger.info("Joining queued delivery")
self.callback_thread.join()
self.logger.info("Join finished threaded Deliver queue")
if self.callback_thread:
self.stop_callback_thread()

def deliver_indications_from_queue(self, delivery_queue):
"""
Deliver indications from delivery_queue to the defined consumer. This
function runs a loop in its own thread and only exits the thread when
the listener is shut down (i.e. when the stop_event is set).
function runs a loop in its own thread and only returns when the
stop_event is set.
It delivers remaining indications in the queue before it returns.
"""
# queue get wait timeout in seconds. This is short because it impacts
# Queue get wait timeout in seconds. This is short because it impacts
# time to stop the thread.
queue_timeout = 2
queue_get_timeout = 2
self.logger.debug("Start deliver indications from queue.")
# NOTE: Loop continues to deliver queued indications until empty
# even if the stop_event is set.
# Continues to deliver queued indications until empty after stop_event
# set.
while True:
try:
self.logger.debug("Get from queue. %d in queue",
delivery_queue.qsize())
indication_tuple = delivery_queue.get(block=True,
timeout=queue_timeout)
timeout=queue_get_timeout)
# self.logger.debug("Got from queue")
self.deliver_indication_to_callback(indication_tuple[0],
indication_tuple[1])
# FUTURE: Should we be able to turn off this thread if there are
# no indications received for a defined time? Now just continues
# the receive process with wait loop.
# FUTURE: add test for excessive indications in queue.
# Probably stop and get out with some indication of problem.
except queue.Empty:
Expand Down

0 comments on commit 55e2415

Please sign in to comment.