Skip to content

Commit

Permalink
Adding logic to ensure teardown is called before run_forever exits, o…
Browse files Browse the repository at this point in the history
…therwise resources like the ping thread are leaked. (#918)

* Adding logic to ensure teardown is called before run_forever exits, otherwise resources like the ping thread are leaked.

* Minor edit.

* Updating the select logic to make it more efficient.
  • Loading branch information
QuinnDamerell committed Jun 17, 2023
1 parent eda6724 commit 323baea
Showing 1 changed file with 44 additions and 28 deletions.
72 changes: 44 additions & 28 deletions websocket/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ class Dispatcher(DispatcherBase):
Dispatcher
"""
def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
while self.app.keep_running:
sel = selectors.DefaultSelector()
sel.register(self.app.sock.sock, selectors.EVENT_READ)

r = sel.select(self.ping_timeout)
if r:
if not read_callback():
break
check_callback()
sel = selectors.DefaultSelector()
sel.register(self.app.sock.sock, selectors.EVENT_READ)
try:
while self.app.keep_running:
r = sel.select(self.ping_timeout)
if r:
if not read_callback():
break
check_callback()
finally:
sel.close()


Expand All @@ -87,23 +88,25 @@ class SSLDispatcher(DispatcherBase):
SSLDispatcher
"""
def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
while self.app.keep_running:
r = self.select()
if r:
if not read_callback():
break
check_callback()

def select(self) -> list:
sock = self.app.sock.sock
if sock.pending():
return [sock,]

sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ)
try:
while self.app.keep_running:
r = self.select(sock, sel)
if r:
if not read_callback():
break
check_callback()
finally:
sel.close()

def select(self, sock, sel:selectors.DefaultSelector):
sock = self.app.sock.sock
if sock.pending():
return [sock,]

r = sel.select(self.ping_timeout)
sel.close()

if len(r) > 0:
return r[0][0]
Expand Down Expand Up @@ -226,6 +229,8 @@ def __init__(self, url: str, header: list or dict = None,
self.subprotocols = subprotocols
self.prepared_socket = socket
self.has_errored = False
self.has_done_teardown = False
self.has_done_teardown_lock = threading.Lock()

def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None:
"""
Expand Down Expand Up @@ -268,9 +273,9 @@ def _stop_ping_thread(self) -> None:
self.last_ping_tm = self.last_pong_tm = 0

def _send_ping(self) -> None:
if self.stop_ping.wait(self.ping_interval):
if self.stop_ping.wait(self.ping_interval) or self.keep_running is False:
return
while not self.stop_ping.wait(self.ping_interval):
while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True:
if self.sock:
self.last_ping_tm = time.time()
try:
Expand Down Expand Up @@ -373,6 +378,13 @@ def teardown(close_frame: ABNF = None):
with the statusCode and reason from the provided frame.
"""

# teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired.
# To ensure the work is only done once, we use this bool and lock.
with self.has_done_teardown_lock:
if self.has_done_teardown:
return
self.has_done_teardown = True

self._stop_ping_thread()
self.keep_running = False
if self.sock:
Expand Down Expand Up @@ -484,11 +496,15 @@ def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool:
custom_dispatcher = bool(dispatcher)
dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3])

setSock()
if not custom_dispatcher and reconnect:
while self.keep_running:
_logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
dispatcher.reconnect(reconnect, setSock)
try:
setSock()
if not custom_dispatcher and reconnect:
while self.keep_running:
_logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
dispatcher.reconnect(reconnect, setSock)
finally:
# Ensure teardown was called before returning from run_forever
teardown()

return self.has_errored

Expand Down

0 comments on commit 323baea

Please sign in to comment.