Skip to content

Commit

Permalink
Fix a file descriptor and thread leak in wptserver
Browse files Browse the repository at this point in the history
Always ensure that wptserve closes file descriptors that it opens for
HTTP2 requests. For the WebSocket handler, use Python contexts. This
isn't possible for non-WebSocket requests so make sure to close them
manually. In addition, when frame.stream_ended always end the stream's
thread. The code which spawns the thread assumes this is the case and
will forget about the thread as soon as it sees this property.

Probably fixes #23905.
  • Loading branch information
mrobinson authored and pull[bot] committed Oct 20, 2023
1 parent 221e533 commit 1051292
Showing 1 changed file with 74 additions and 65 deletions.
139 changes: 74 additions & 65 deletions tools/wptserve/wptserve/server.py
Expand Up @@ -414,7 +414,7 @@ def handle_one_request(self):
stream_queues[frame.stream_id] = (self.start_stream_thread(frame, queue), queue)
stream_queues[frame.stream_id][1].put(frame)

if isinstance(frame, StreamEnded) or (hasattr(frame, "stream_ended") and frame.stream_ended):
if isinstance(frame, StreamEnded) or getattr(frame, "stream_ended", False):
del stream_queues[frame.stream_id]

except OSError as e:
Expand Down Expand Up @@ -470,71 +470,71 @@ def _stream_ws_thread(self, stream_id, queue):
if frame is None:
return

# Needs to be unbuffered for websockets.
rfile, wfile = os.pipe()
rfile, wfile = os.fdopen(rfile, 'rb'), os.fdopen(wfile, 'wb', 0) # needs to be unbuffer for websockets
stream_handler = H2HandlerCopy(self, frame, rfile)
with os.fdopen(rfile, 'rb') as rfile, os.fdopen(wfile, 'wb', 0) as wfile:
stream_handler = H2HandlerCopy(self, frame, rfile)

h2request = H2Request(stream_handler)
h2response = H2Response(stream_handler, h2request)
h2request = H2Request(stream_handler)
h2response = H2Response(stream_handler, h2request)

dispatcher = dispatch.Dispatcher(self.server.ws_doc_root, None, False)
if not dispatcher.get_handler_suite(stream_handler.path):
h2response.set_error(404)
h2response.write()
return

request_wrapper = _WebSocketRequest(stream_handler, h2response)
dispatcher = dispatch.Dispatcher(self.server.ws_doc_root, None, False)
if not dispatcher.get_handler_suite(stream_handler.path):
h2response.set_error(404)
h2response.write()
return

handshaker = WsH2Handshaker(request_wrapper, dispatcher)
try:
handshaker.do_handshake()
except HandshakeException as e:
self.logger.info('Handshake failed for error: %s' % e)
h2response.set_error(e.status)
h2response.write()
return
except AbortedByUserException:
h2response.write()
return
request_wrapper = _WebSocketRequest(stream_handler, h2response)

# h2 Handshaker prepares the headers but does not send them down the
# wire. Flush the headers here.
try:
h2response.write_status_headers()
except StreamClosedError:
# work around https://github.com/web-platform-tests/wpt/issues/27786
# The stream was already closed.
return
handshaker = WsH2Handshaker(request_wrapper, dispatcher)
try:
handshaker.do_handshake()
except HandshakeException as e:
self.logger.info('Handshake failed for error: %s' % e)
h2response.set_error(e.status)
h2response.write()
return
except AbortedByUserException:
h2response.write()
return

request_wrapper._dispatcher = dispatcher
# h2 Handshaker prepares the headers but does not send them down the
# wire. Flush the headers here.
try:
h2response.write_status_headers()
except StreamClosedError:
# work around https://github.com/web-platform-tests/wpt/issues/27786
# The stream was already closed.
return

# we need two threads:
# - one to handle the frame queue
# - one to handle the request (dispatcher.transfer_data is blocking)
# the alternative is to have only one (blocking) thread. That thread
# will call transfer_data. That would require a special case in
# handle_one_request, to bypass the queue and write data to wfile
# directly.
t = threading.Thread(
target=Http2WebTestRequestHandler._stream_ws_sub_thread,
args=(self, request_wrapper, stream_handler, queue)
)
t.start()
request_wrapper._dispatcher = dispatcher

# we need two threads:
# - one to handle the frame queue
# - one to handle the request (dispatcher.transfer_data is blocking)
# the alternative is to have only one (blocking) thread. That thread
# will call transfer_data. That would require a special case in
# handle_one_request, to bypass the queue and write data to wfile
# directly.
t = threading.Thread(
target=Http2WebTestRequestHandler._stream_ws_sub_thread,
args=(self, request_wrapper, stream_handler, queue)
)
t.start()

while not self.close_connection:
try:
frame = queue.get(True, 1)
except Empty:
continue
while not self.close_connection:
try:
frame = queue.get(True, 1)
except Empty:
continue

if isinstance(frame, DataReceived):
wfile.write(frame.data)
if frame.stream_ended:
raise NotImplementedError("frame.stream_ended")
wfile.close()
elif frame is None or isinstance(frame, (StreamReset, StreamEnded, ConnectionTerminated)):
self.logger.debug(f'({self.uid} - {stream_id}) Stream Reset, Thread Closing')
break
if isinstance(frame, DataReceived):
wfile.write(frame.data)
if frame.stream_ended:
raise NotImplementedError("frame.stream_ended")
elif frame is None or isinstance(frame, (StreamReset, StreamEnded, ConnectionTerminated)):
self.logger.error(f'({self.uid} - {stream_id}) Stream Reset, Thread Closing')
break

t.join()

Expand Down Expand Up @@ -568,9 +568,11 @@ def _stream_thread(self, stream_id, queue):

# The file-like pipe object that will be used to share data to request object if data is received
wfile = None
rfile = None
request = None
response = None
req_handler = None

while not self.close_connection:
try:
frame = queue.get(True, 1)
Expand All @@ -579,10 +581,14 @@ def _stream_thread(self, stream_id, queue):
continue

self.logger.debug(f'({self.uid} - {stream_id}) {str(frame)}')

if isinstance(frame, RequestReceived):
rfile, wfile = os.pipe()
rfile, wfile = os.fdopen(rfile, 'rb'), os.fdopen(wfile, 'wb')
if rfile:
rfile.close()
if wfile:
wfile.close()

pipe_rfile, pipe_wfile = os.pipe()
(rfile, wfile) = os.fdopen(pipe_rfile, 'rb'), os.fdopen(pipe_wfile, 'wb')

stream_handler = H2HandlerCopy(self, frame, rfile)

Expand All @@ -605,22 +611,25 @@ def _stream_thread(self, stream_id, queue):
if hasattr(req_handler, 'handle_data'):
req_handler.handle_data(frame, request, response)

if frame.stream_ended:
wfile.close()
elif frame is None or isinstance(frame, (StreamReset, StreamEnded, ConnectionTerminated)):
self.logger.debug(f'({self.uid} - {stream_id}) Stream Reset, Thread Closing')
break

if request is not None:
request.frames.append(frame)

if hasattr(frame, "stream_ended") and frame.stream_ended:
if getattr(frame, "stream_ended", False):
try:
self.finish_handling(request, response, req_handler)
except StreamClosedError:
self.logger.debug('(%s - %s) Unable to write response; stream closed' %
(self.uid, stream_id))
break
(self.uid, stream_id))
break

if rfile:
rfile.close()
if wfile:
wfile.close()

def frame_handler(self, request, response, handler):
try:
Expand Down

0 comments on commit 1051292

Please sign in to comment.