diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py index 0c01bdfb1c69ed..b5c635a77b5c08 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py @@ -236,7 +236,7 @@ def expect_lldb_gdbserver_replay( if sequence_entry.is_output_matcher(): try: # Grab next entry from the output queue. - content = pump_queues.output_queue().get(True, timeout_seconds) + content = pump.get_output(timeout_seconds) except queue.Empty: if logger: logger.warning( @@ -247,7 +247,7 @@ def expect_lldb_gdbserver_replay( pump.get_accumulated_output())) else: try: - content = pump_queues.packet_queue().get(True, timeout_seconds) + content = pump.get_packet(timeout_seconds) except queue.Empty: if logger: logger.warning( diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py index 3de76345896dc3..6c41ed473b45cc 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py @@ -5,6 +5,7 @@ import re import select import threading +import time import traceback from six.moves import queue @@ -74,8 +75,6 @@ def __init__(self, pump_socket, pump_queues, logger=None): if not pump_socket: raise Exception("pump_socket cannot be None") - self._thread = None - self._stop_thread = False self._socket = pump_socket self._logger = logger self._receive_buffer = "" @@ -83,29 +82,42 @@ def __init__(self, pump_socket, pump_queues, logger=None): self._pump_queues = pump_queues def __enter__(self): - """Support the python 'with' statement. - - Start the pump thread.""" - self.start_pump_thread() + self._receive_buffer = "" + self._accumulated_output = "" return self def __exit__(self, exit_type, value, the_traceback): - """Support the python 'with' statement. - - Shut down the pump thread.""" - self.stop_pump_thread() + pass + + def _read(self, timeout_seconds, q): + now = time.monotonic() + deadline = now + timeout_seconds + while q.empty() and now <= deadline: + can_read, _, _ = select.select([self._socket], [], [], deadline-now) + now = time.monotonic() + if can_read and self._socket in can_read: + try: + new_bytes = seven.bitcast_to_string(self._socket.recv(4096)) + if self._logger and new_bytes and len(new_bytes) > 0: + self._logger.debug( + "pump received bytes: {}".format(new_bytes)) + except: + # Likely a closed socket. Done with the pump thread. + if self._logger: + self._logger.debug( + "socket read failed, stopping pump read thread\n" + + traceback.format_exc(3)) + break + self._process_new_bytes(new_bytes) + if q.empty(): + raise queue.Empty() + return q.get(True) - def start_pump_thread(self): - if self._thread: - raise Exception("pump thread is already running") - self._stop_thread = False - self._thread = threading.Thread(target=self._run_method) - self._thread.start() + def get_output(self, timeout_seconds): + return self._read(timeout_seconds, self._pump_queues.output_queue()) - def stop_pump_thread(self): - self._stop_thread = True - if self._thread: - self._thread.join() + def get_packet(self, timeout_seconds): + return self._read(timeout_seconds, self._pump_queues.packet_queue()) def _process_new_bytes(self, new_bytes): if not new_bytes: @@ -162,34 +174,6 @@ def _process_new_bytes(self, new_bytes): # packet. Stop trying until we read more. has_more = False - def _run_method(self): - self._receive_buffer = "" - self._accumulated_output = "" - - if self._logger: - self._logger.info("socket pump starting") - - # Keep looping around until we're asked to stop the thread. - while not self._stop_thread: - can_read, _, _ = select.select([self._socket], [], [], 0) - if can_read and self._socket in can_read: - try: - new_bytes = seven.bitcast_to_string(self._socket.recv(4096)) - if self._logger and new_bytes and len(new_bytes) > 0: - self._logger.debug( - "pump received bytes: {}".format(new_bytes)) - except: - # Likely a closed socket. Done with the pump thread. - if self._logger: - self._logger.debug( - "socket read failed, stopping pump read thread\n" + - traceback.format_exc(3)) - break - self._process_new_bytes(new_bytes) - - if self._logger: - self._logger.info("socket pump exiting") - def get_accumulated_output(self): return self._accumulated_output