From 48a8cbc2b9031bd6f23eda8b85b99ff648590764 Mon Sep 17 00:00:00 2001 From: krassowski Date: Fri, 19 Mar 2021 19:40:10 +0000 Subject: [PATCH] WIP test recovery from BrokenPipeError --- .../jupyter_lsp/jupyter_lsp/session.py | 44 ++++++++++++++++- .../jupyter_lsp/jupyter_lsp/stdio.py | 7 ++- .../jupyter_lsp/jupyter_lsp/tests/conftest.py | 19 +++++++ .../jupyter_lsp/tests/test_session.py | 49 +++++++++++++++++++ 4 files changed, 116 insertions(+), 3 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 4b9a6aea7..fc4247eec 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -6,7 +6,7 @@ import string import subprocess from copy import copy -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from tornado.ioloop import IOLoop from tornado.queues import Queue @@ -49,8 +49,12 @@ class LanguageServerSession(LoggingConfigurable): status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED) last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) + allow_server_failure_not_more_often_than = Instance( + timedelta, allow_none=False, default_value=timedelta(minutes=20) + ) _tasks = None + _last_failure = None _skip_serialize = ["argv", "debug_argv"] @@ -169,7 +173,12 @@ async def _read_lsp(self): await self.reader.read() async def _write_lsp(self): - await self.writer.write() + task = self.writer.write() + results = await asyncio.gather(task, return_exceptions=True) + for result in results: + if isinstance(result, BrokenPipeError): + self._handle_server_failure(result) + return results async def _broadcast_from_lsp(self): """loop for reading messages from the queue of messages from the language @@ -179,3 +188,34 @@ async def _broadcast_from_lsp(self): self.last_server_message_at = self.now() await self.parent.on_server_message(message, self) self.from_lsp.task_done() + + def _handle_server_failure(self, error): + description: str + action: str + now = datetime.now() + + allowed = self.allow_server_failure_not_more_often_than + if self._last_failure and now - self._last_failure > allowed: + delta = now - self._last_failure + description = ( + f"giving up as the previous failure was {delta} ago" + f" which is less than te minimum allowed interval ({allowed})" + ) + action = "raise" + else: + action = "restart" + description = "restarting session..." + + text = ( + f"Encountered {self.language_server} language server failure;" + f" {description}" + f" (exception: {error})" + f" (faulty process: {self.process})" + ) + self.log.warning(text) + + if action == "raise": + raise + elif action == "restart": + self.stop() + self.initialize() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py index d9984f31a..b8878ca47 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py @@ -139,7 +139,7 @@ async def _read_content( if len(raw) != length: # pragma: no cover self.log.warning( f"Readout and content-length mismatch: {len(raw)} vs {length};" - f"remaining empties: {max_empties}; remaining parts: {max_parts}" + f" remaining empties: {max_empties}; remaining parts: {max_parts}" ) return raw @@ -191,7 +191,12 @@ async def write(self) -> None: body = message.encode("utf-8") response = "Content-Length: {}\r\n\r\n{}".format(len(body), message) await convert_yielded(self._write_one(response.encode("utf-8"))) + except BrokenPipeError: + self.queue.task_done() + # propagate broken pipe errors + raise except Exception: # pragma: no cover + # catch other (hopefully mild) exceptions self.log.exception("%s couldn't write message: %s", self, response) finally: self.queue.task_done() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py index c70c27c6e..ccaf2c421 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py @@ -109,6 +109,25 @@ def jsonrpc_init_msg(): ) +@fixture +def did_open_message(): + return json.dumps( + { + "id": 0, + "jsonrpc": "2.0", + "method": "textDocument/didOpen", + "params": { + "textDocument": { + "uri": pathlib.Path(__file__).as_uri(), + "languageId": "python", + "version": 0, + "text": "", + } + }, + } + ) + + @fixture def app(): return MockServerApp() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 3e5f3a599..c1af07d4a 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -1,8 +1,12 @@ import asyncio +import logging +import subprocess import pytest +from ..handlers import LanguageServerWebSocketHandler from ..schema import SERVERS_RESPONSE +from ..session import LanguageServerSession def assert_status_set(handler, expected_statuses, language_server=None): @@ -100,3 +104,48 @@ async def test_ping(handlers): assert ws_handler._ping_sent is True ws_handler.on_close() + + +@pytest.mark.asyncio +async def test_broken_pipe(handlers, jsonrpc_init_msg, did_open_message, caplog): + """If the pipe breaks (server dies), can we recover by restarting the server?""" + a_server = "pyls" + + # use real handler in this test rather than a mock + # -> testing broken pipe requires that here + handler, ws_handler = handlers + manager = handler.manager + + manager.initialize() + + assert_status_set(handler, {"not_started"}, a_server) + + ws_handler.open(a_server) + + await ws_handler.on_message(jsonrpc_init_msg) + assert_status_set(handler, {"started"}, a_server) + + session: LanguageServerSession = manager.sessions[a_server] + process: subprocess.Popen = session.process + process.kill() + + with caplog.at_level(logging.WARNING): + # an attempt to write should raise BrokenPipeError + await ws_handler.on_message(did_open_message) + await asyncio.sleep(1) + + # which should be caught + assert "Encountered pyls language server failure" in caplog.text + assert "exception: [Errno 32] Broken pipe" in caplog.text + + # and the server should get restarted + assert "restarting session..." in caplog.text + + assert_status_set(handler, {"started"}, a_server) + + with caplog.at_level(logging.WARNING): + # we should be able to send a message now + await ws_handler.on_message(did_open_message) + assert caplog.text == "" + + ws_handler.on_close()