diff --git a/.vscode/launch.json b/.vscode/launch.json index 3c7b51326..dbab85629 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -21,8 +21,8 @@ "type": "python", "request": "launch", "debugServer": 8765, - "console": "internalConsole", - //"console": "integratedTerminal", + //"console": "internalConsole", + "console": "integratedTerminal", //"console": "externalTerminal", "consoleTitle": "ptvsd.server", //"program": "${file}", diff --git a/src/ptvsd/adapter/contract.py b/src/ptvsd/adapter/contract.py index ab262fa2c..26202d1a7 100644 --- a/src/ptvsd/adapter/contract.py +++ b/src/ptvsd/adapter/contract.py @@ -41,7 +41,7 @@ def __init__(self, message): try: value = validate(value) except Exception as exc: - message.isnt_valid("{0!j} {1}", name, exc) + raise message.isnt_valid("{0!j} {1}", name, exc) assert value != (), fmt( "{0!j} must provide a default value for missing properties.", validate diff --git a/src/ptvsd/adapter/debuggee.py b/src/ptvsd/adapter/debuggee.py index 8c21451c1..11930e21d 100644 --- a/src/ptvsd/adapter/debuggee.py +++ b/src/ptvsd/adapter/debuggee.py @@ -34,6 +34,10 @@ pid = None """Debuggee process ID.""" +_got_pid = threading.Event() +"""A threading.Event that is set when pid is set. +""" + _exited = None """A threading.Event that is set when the debuggee process exits. @@ -110,7 +114,7 @@ def property_or_debug_option(prop_name, flag_name): assert prop_name[0].islower() and flag_name[0].isupper() value = request(prop_name, json.default(flag_name in debug_options)) if value is False and flag_name in debug_options: - request.isnt_valid( + raise request.isnt_valid( '{0!r}:false and "debugOptions":[{1!r}] are mutually exclusive', prop_name, flag_name, @@ -125,7 +129,7 @@ def property_or_debug_option(prop_name, flag_name): ) if console != "internalConsole": if not contract.ide.capabilities["supportsRunInTerminalRequest"]: - request.cant_handle( + raise request.cant_handle( 'Unable to launch via "console":{0!j}, because the IDE is does not ' 'have the "supportsRunInTerminalRequest" capability', console, @@ -136,7 +140,7 @@ def property_or_debug_option(prop_name, flag_name): cmdline = [] if property_or_debug_option("sudo", "Sudo"): if platform.system() == "Windows": - request.cant_handle('"sudo":true is not supported on Windows.') + raise request.cant_handle('"sudo":true is not supported on Windows.') else: cmdline += ["sudo"] @@ -145,7 +149,9 @@ def property_or_debug_option(prop_name, flag_name): python_key = "python" if python_key in request: if "pythonPath" in request: - request.isnt_valid('"pythonPath" is not valid if "python" is specified') + raise request.isnt_valid( + '"pythonPath" is not valid if "python" is specified' + ) elif "pythonPath" in request: python_key = "pythonPath" python = request(python_key, json.array(unicode, vectorize=True, size=(1,))) @@ -185,9 +191,13 @@ def property_or_debug_option(prop_name, flag_name): num_targets = len([x for x in (program, module, code) if x != ()]) if num_targets == 0: - request.isnt_valid('either "program", "module", or "code" must be specified') + raise request.isnt_valid( + 'either "program", "module", or "code" must be specified' + ) elif num_targets != 1: - request.isnt_valid('"program", "module", and "code" are mutually exclusive') + raise request.isnt_valid( + '"program", "module", and "code" are mutually exclusive' + ) cmdline += request("args", json.array(unicode)) @@ -209,7 +219,7 @@ def _spawn_popen(request, spawn_info): try: proc = subprocess.Popen(spawn_info.cmdline, cwd=spawn_info.cwd, env=env) except Exception as exc: - request.cant_handle( + raise request.cant_handle( "Error launching process: {0}\n\nCommand line:{1!r}", exc, spawn_info.cmdline, @@ -220,6 +230,7 @@ def _spawn_popen(request, spawn_info): global pid try: pid = proc.pid + _got_pid.set() ProcessTracker().track(pid) except Exception: # If we can't track it, we won't be able to terminate it if asked; but aside @@ -258,19 +269,60 @@ def _spawn_terminal(request, spawn_info): } try: - result = channels.Channels().ide().request("runInTerminal", body) + channels.Channels().ide().request("runInTerminal", body) except messaging.MessageHandlingError as exc: exc.propagate(request) + # Although "runInTerminal" response has "processId", it's optional, and in practice + # it is not used by VSCode: https://github.com/microsoft/vscode/issues/61640. + # Thus, we can only retrieve the PID via the "process" event, and only then we can + # start tracking it. Until then, nothing else to do. + pass + + +def parse_pid(process_event): + assert process_event.is_event("process") + global pid - pid = result("processId", int) + sys_pid = process_event("systemProcessId", int) - try: - ProcessTracker().track(pid, after_exit=lambda: _exited.set()) - except Exception as exc: - # If we can't track it, we won't be able to detect if it exited or retrieve - # the exit code, so fail immediately. - request.cant_handle("Couldn't get debuggee process handle: {0}", str(exc)) + if not _got_pid.is_set(): + # Launched with "runInTerminal", so we had no way to get the PID before. + # Now that we do, start tracking it as usual. + + def after_exit(code): + global exit_code + exit_code = code + _exited.set() + + try: + pid = sys_pid + _got_pid.set() + ProcessTracker().track(pid, after_exit=after_exit) + except Exception as exc: + # If we can't track it, we won't be able to detect if it exited or retrieve + # the exit code, so fail immediately. + raise process_event.cant_handle( + "Couldn't get debuggee process handle: {0}", str(exc) + ) + + elif pid != sys_pid: + # Launched directly, so we already have the PID, but it doesn't match what + # the debug server says it is - something is very wrong. + raise process_event.isnt_valid( + '"systemProcessId":{0!j} in "process" event does not match actual PID={1}', + sys_pid, + pid, + ) + + +def wait_for_pid(timeout=None): + """Waits for debuggee PID to be determined. + + Returns True if PID was determined, False if the wait timed out. If it returned + True, then pid is guaranteed to be set. + """ + return _got_pid.wait(timeout) def wait_for_exit(timeout=None): @@ -280,6 +332,13 @@ def wait_for_exit(timeout=None): True, then exit_code is guaranteed to be set. """ + if pid is None: + # Debugee was launched with "runInTerminal", but the debug session fell apart + # before we got a "process" event and found out what its PID is. It's not a + # fatal error, but there's nothing to wait on. Debuggee process should have + # exited (or crashed) by now in any case. + return + assert _exited is not None timed_out = not _exited.wait(timeout) if not timed_out: diff --git a/src/ptvsd/adapter/messages.py b/src/ptvsd/adapter/messages.py index 447e320df..53fe73704 100644 --- a/src/ptvsd/adapter/messages.py +++ b/src/ptvsd/adapter/messages.py @@ -46,7 +46,7 @@ def _server(self): """ server = _channels.server() if server is None: - messaging.Message.isnt_valid( + raise messaging.Message.isnt_valid( "Connection to debug server is not established yet" ) return server @@ -62,12 +62,11 @@ def handle_if_allowed(self, message): current_state = state.current() if current_state in states: return handler(self, message) - if isinstance(message, messaging.Request): - message.isnt_valid( - "Request {0!r} is not allowed in adapter state {1!r}.", - message.command, - current_state, - ) + raise message.isnt_valid( + "{0} is not allowed in adapter state {1!r}.", + message.describe(), + current_state, + ) return handle_if_allowed @@ -116,6 +115,9 @@ class IDEMessages(Messages): # so store them here until then. After all messages are replayed, it is set to None. _initial_messages = [] + # "launch" or "attach" request that started debugging. + _start_request = None + # A decorator to add the message to initial_messages if needed before handling it. # Must be applied to the handler for every message that can be received before # connection to the debug server can be established while handling attach/launch, @@ -151,7 +153,7 @@ def initialize_request(self, request): # Handles various attributes common to both "launch" and "attach". def _debug_config(self, request): - assert request.command in ("launch", "attach") + assert request.is_request("launch", "attach") self._shared.start_method = request.command _Shared.readonly_attrs.add("start_method") @@ -170,7 +172,7 @@ def launch_request(self, request): # TODO: nodebug debuggee.spawn_and_connect(request) - return self._configure() + return self._configure(request) @_replay_to_server @_only_allowed_while("initializing") @@ -179,16 +181,16 @@ def attach_request(self, request): _Shared.readonly_attrs.add("terminate_on_disconnect") self._debug_config(request) - options.host = request.arguments.get("host", options.host) - options.port = int(request.arguments.get("port", options.port)) + options.host = request("host", options.host) + options.port = request("port", options.port) _channels.connect_to_server(address=(options.host, options.port)) - return self._configure() + return self._configure(request) # Handles the configuration request sequence for "launch" or "attach", from when # the "initialized" event is sent, to when "configurationDone" is received; see # https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522 - def _configure(self): + def _configure(self, request): log.debug("Replaying previously received messages to server.") assert len(self._initial_messages) @@ -209,28 +211,42 @@ def _configure(self): self._server.propagate(msg) log.debug("Finished replaying messages to server.") - self.initial_messages = None + self._initial_messages = None + self._start_request = request + + # Wait until we have the debuggee PID - we either know it already because we + # have launched it directly, or we'll find out eventually from the "process" + # server event. Either way, we need to know the PID before we can tell the + # server to start debugging, because we need to be able to kill the debuggee + # process if anything goes wrong. + # + # However, we can't block forever, because the debug server can also crash + # before it had a chance to send the event - so wake up periodically, and + # check whether server channel is still alive. + while not debuggee.wait_for_pid(1): + if _channels.server() is None: + raise request.cant_handle("Debug server disconnected unexpectedly.") # Let the IDE know that it can begin configuring the adapter. state.change("configuring") self._ide.send_event("initialized") - - # Process further incoming messages, until we get "configurationDone". - while state.current() == "configuring": - yield + return messaging.NO_RESPONSE # will respond on "configurationDone" @_only_allowed_while("configuring") def configurationDone_request(self, request): - ret = self._server.delegate(request) + assert self._start_request is not None + + result = self._server.delegate(request) state.change("running") ServerMessages().release_events() - return ret + request.respond(result) + self._start_request.respond({}) def _disconnect_or_terminate_request(self, request): assert request.is_request("disconnect") or request.is_request("terminate") if request("restart", json.default(False)): - request.isnt_valid("Restart is not supported") + raise request.isnt_valid("Restart is not supported") terminate = (request.command == "terminate") or request( "terminateDebuggee", json.default(self._shared.terminate_on_disconnect) @@ -364,7 +380,9 @@ def request(self, request): # requests sent over that boundary, since they may contain arbitrary code # that the IDE will execute - e.g. "runInTerminal". The adapter must only # propagate requests that it knows are safe. - request.isnt_valid("Requests from the debug server to the IDE are not allowed.") + raise request.isnt_valid( + "Requests from the debug server to the IDE are not allowed." + ) # Generic event handler, used if there's no specific handler below. def event(self, event): @@ -383,13 +401,24 @@ def initialized_event(self, event): # also remove the 'initialized' event sent from IDE messages. pass + @_only_allowed_while("initializing") + def process_event(self, event): + try: + debuggee.parse_pid(event) + except Exception: + # If we couldn't retrieve or validate PID, we can't safely continue + # debugging, so shut everything down. + self.disconnect() + else: + self._ide.propagate(event) + @_only_allowed_while("running") def ptvsd_subprocess_event(self, event): sub_pid = event("processId", int) try: debuggee.register_subprocess(sub_pid) except Exception as exc: - event.cant_handle("{0}", exc) + raise event.cant_handle("{0}", exc) self._ide.propagate(event) def terminated_event(self, event): @@ -426,7 +455,10 @@ def disconnect(self): # The debuggee process should exit shortly after it has disconnected, but just # in case it gets stuck, don't wait forever, and force-kill it if needed. debuggee.terminate(after=5) - self._ide.send_event("exited", {"exitCode": debuggee.exit_code}) + exit_code = debuggee.exit_code + self._ide.send_event( + "exited", {"exitCode": -1 if exit_code is None else exit_code} + ) self._ide.send_event("terminated") diff --git a/src/ptvsd/common/json.py b/src/ptvsd/common/json.py index d050cee25..142d098a3 100644 --- a/src/ptvsd/common/json.py +++ b/src/ptvsd/common/json.py @@ -103,6 +103,8 @@ def validate(value): if (optional and value == ()) or isinstance(value, classinfo): return value else: + if not optional and value == (): + raise ValueError("must be specified") raise TypeError("must be " + " or ".join(t.__name__ for t in classinfo)) return validate diff --git a/src/ptvsd/common/messaging.py b/src/ptvsd/common/messaging.py index 80a11b789..770ff572d 100644 --- a/src/ptvsd/common/messaging.py +++ b/src/ptvsd/common/messaging.py @@ -14,13 +14,13 @@ import collections import contextlib import functools -import inspect import itertools +import os import sys import threading from ptvsd.common import compat, fmt, json, log -from ptvsd.common._util import new_hidden_thread +from ptvsd.common.compat import unicode class NoMoreMessages(EOFError): @@ -28,7 +28,7 @@ class NoMoreMessages(EOFError): """ def __init__(self, *args, **kwargs): - stream = kwargs.pop("stream", None) + stream = kwargs.pop("stream") args = args if len(args) else ["No more messages"] super(NoMoreMessages, self).__init__(*args, **kwargs) @@ -314,8 +314,7 @@ def __init__(self, message, items=None): super(MessageDict, self).__init__(items) self.message = message - """The Message object that owns this dict. If None, then MessageDict behaves - like a regular dict - i.e. raises KeyError. + """The Message object that owns this dict. For any instance exposed via a Message object corresponding to some incoming message, it is guaranteed to reference that Message object. There is no similar @@ -323,7 +322,7 @@ def __init__(self, message, items=None): """ def __repr__(self): - return dict.__repr__(self) + return fmt("{0!j}", self) def __call__(self, key, validate, optional=False): """Like get(), but with validation. @@ -343,9 +342,8 @@ def __call__(self, key, validate, optional=False): it returns - thus, the validator can e.g. replace () with a suitable default value for the property. - If validate() raises TypeError or ValueError, and self.message is not None, - __call__ raises InvalidMessageError that applies_to(self.message) with the - same text. If self.message is None, the exception is propagated as is. + If validate() raises TypeError or ValueError, raises InvalidMessageError with + the same text that applies_to(self.messages). See ptvsd.common.json for reusable validators. """ @@ -359,10 +357,8 @@ def __call__(self, key, validate, optional=False): try: value = validate(value) except (TypeError, ValueError) as exc: - if self.message is None: - raise - else: - self.message.isnt_valid("{0!r} {1}", key, exc) + message = Message if self.message is None else self.message + raise message.isnt_valid("{0!r} {1}", key, exc) return value def _invalid_if_no_key(func): @@ -370,10 +366,8 @@ def wrap(self, key, *args, **kwargs): try: return func(self, key, *args, **kwargs) except KeyError: - if self.message is None: - raise - else: - self.message.isnt_valid("missing property {0!r}", key) + message = Message if self.message is None else self.message + raise message.isnt_valid("missing property {0!r}", key) return wrap @@ -384,11 +378,36 @@ def wrap(self, key, *args, **kwargs): del _invalid_if_no_key +def _payload(value): + """JSON validator for message payload. + + If that value is missing or null, it is treated as if it were {}. + """ + + if value is not None and value != (): + if isinstance(value, dict): # can be int, str, list... + assert isinstance(value, MessageDict) + return value + + # Missing payload. Construct a dummy MessageDict, and make it look like it was + # deserialized. See JsonMessageChannel._parse_incoming_message for why it needs + # to have associate_with(). + + def associate_with(message): + value.message = message + + value = MessageDict(None) + value.associate_with = associate_with + return value + + class Message(object): """Represents a fully parsed incoming or outgoing message. + + https://microsoft.github.io/debug-adapter-protocol/specification#protocolmessage """ - def __init__(self, channel, seq): + def __init__(self, channel, seq, json=None): self.channel = channel self.seq = seq @@ -397,6 +416,22 @@ def __init__(self, channel, seq): This can be None for synthesized Responses. """ + self.json = json + """For incoming messages, the MessageDict containing raw JSON from which + this message was originally parsed. + """ + + def __str__(self): + return fmt("{0!j}", self.json) if self.json is not None else repr(self) + + def describe(self): + """A brief description of the message that is enough to identify its handler, + but does not include its payload or metadata that uniquely identifies it. + + Examples: 'request "launch"', 'response to request "launch"'. + """ + raise NotImplementedError + @property def payload(self): """Payload of the message - self.body or self.arguments, depending on the @@ -412,31 +447,39 @@ def __contains__(self, key): """Same as (key in self.payload).""" return key in self.payload - def is_event(self, event=None): + def is_event(self, *event): + """Returns True if this message is an Event of one of the specified types. + """ if not isinstance(self, Event): return False - return event is None or self.event == event + return event == () or self.event in event - def is_request(self, command=None): + def is_request(self, *command): + """Returns True if this message is a Request of one of the specified types. + """ if not isinstance(self, Request): return False - return command is None or self.command == command + return command == () or self.command in command - def is_response(self, command=None): + def is_response(self, *command): + """Returns True if this message is a Response to a request of one of the + specified types. + """ if not isinstance(self, Response): return False - return command is None or self.request.command == command + return command == () or self.request.command in command @staticmethod - def raise_error(*args, **kwargs): - """raise_error([self], exc_type, format_string, *args, **kwargs) + def error(*args, **kwargs): + """error([self], exc_type, format_string, *args, **kwargs) - Raises a new exception of the specified type from the point at which it is + Returns a new exception of the specified type from the point at which it is invoked, with the specified formatted message as the reason. This method can be used either as a static method, or as an instance method. If invoked as an instance method, the resulting exception will have its cause - set to the Message object on which raise_error() was called. + set to the Message object on which error() was called. Additionally, if the + message is a Request, a failure response is immediately sent. """ if isinstance(args[0], Message): @@ -449,27 +492,119 @@ def raise_error(*args, **kwargs): assert issubclass(exc_type, MessageHandlingError) reason = fmt(format_string, *args, **kwargs) - raise exc_type(reason, cause) # will log it + exc = exc_type(reason, cause) # will log it + + if isinstance(cause, Request): + cause.respond(exc) + return exc def isnt_valid(*args, **kwargs): """isnt_valid([self], format_string, *args, **kwargs) - Same as raise_error(InvalidMessageError, ...). + Same as error(InvalidMessageError, ...). """ if isinstance(args[0], Message): - args[0].raise_error(InvalidMessageError, *args[1:], **kwargs) + return args[0].error(InvalidMessageError, *args[1:], **kwargs) else: - Message.raise_error(InvalidMessageError, *args, **kwargs) + return Message.error(InvalidMessageError, *args, **kwargs) def cant_handle(*args, **kwargs): """cant_handle([self], format_string, *args, **kwargs) - Same as raise_error(MessageHandlingError, ...). + Same as error(MessageHandlingError, ...). """ if isinstance(args[0], Message): - args[0].raise_error(MessageHandlingError, *args[1:], **kwargs) + return args[0].error(MessageHandlingError, *args[1:], **kwargs) else: - Message.raise_error(MessageHandlingError, *args, **kwargs) + return Message.error(MessageHandlingError, *args, **kwargs) + + +class Event(Message): + """Represents an incoming event. + + https://microsoft.github.io/debug-adapter-protocol/specification#event + + It is guaranteed that body is a MessageDict associated with this Event, and so + are all the nested dicts in it. If "body" was missing or null in JSON, body is + an empty dict. + + To handle the event, JsonMessageChannel tries to find a handler for this event in + JsonMessageChannel.handlers. Given event="X", if handlers.X_event exists, then it + is the specific handler for this event. Otherwise, handlers.event must exist, and + it is the generic handler for this event. A missing handler is a fatal error. + + No further incoming messages are processed until the handler returns, except for + responses to requests that have wait_for_response() invoked on them. + + To report failure to handle the event, the handler must raise an instance of + MessageHandlingError that applies_to() the Event object it was handling. Any such + failure is logged, after which the message loop moves on to the next message. + + Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise + the appropriate exception type that applies_to() the Event object. + """ + + def __init__(self, channel, seq, event, body, json=None): + super(Event, self).__init__(channel, seq, json) + + self.event = event + + if isinstance(body, MessageDict) and hasattr(body, "associate_with"): + body.associate_with(self) + self.body = body + + def describe(self): + return fmt("event {0!j}", self.event) + + @property + def payload(self): + return self.body + + @staticmethod + def _parse(channel, message_dict): + seq = message_dict("seq", int) + event = message_dict("event", unicode) + body = message_dict("body", _payload) + message = Event(channel, seq, event, body, json=message_dict) + channel._enqueue_handlers(message, message._handle) + + def _handle(self): + channel = self.channel + handler = channel._get_handler_for("event", self.event) + try: + result = handler(self) + assert result is None, fmt( + "Handler {0} tried to respond to {1}.", + compat.srcnameof(handler), + self.describe(), + ) + except MessageHandlingError as exc: + if not exc.applies_to(self): + raise + log.error( + "Handler {0} couldn't handle {1} in channel {2}: {3}\n\n{4}", + compat.srcnameof(handler), + self.describe(), + self.channel, + str(exc), + self, + ) + except Exception as exc: + raise log.exception( + "Handler {0} couldn't handle {1} in channel {2}:\n\n{3}\n\n", + compat.srcnameof(handler), + self.describe(), + self.channel, + self, + ) + + +NO_RESPONSE = object() +"""Can be returned from a request handler in lieu of the response body, to indicate +that no response is to be sent. + +Request.respond() must be invoked explicitly at some later point to provide a response. +""" class Request(Message): @@ -477,126 +612,262 @@ class Request(Message): Incoming requests are represented directly by instances of this class. - Outgoing requests are represented by instances of OutgoingRequest, which - provides additional functionality to handle responses. + Outgoing requests are represented by instances of OutgoingRequest, which provides + additional functionality to handle responses. + + For incoming requests, it is guaranteed that arguments is a MessageDict associated + with this Request, and so are all the nested dicts in it. If "arguments" was missing + or null in JSON, arguments is an empty dict. + + To handle the request, JsonMessageChannel tries to find a handler for this request + in JsonMessageChannel.handlers. Given command="X", if handlers.X_request exists, + then it is the specific handler for this request. Otherwise, handlers.request must + exist, and it is the generic handler for this request. A missing handler is a fatal + error. + + The handler is then invoked with the Request object as its sole argument. + + If the handler itself invokes respond() on the Request at any point, then it must + not return any value. + + Otherwise, if the handler returns NO_RESPONSE, no response to the request is sent. + It must be sent manually at some later point via respond(). + + Otherwise, a response to the request is sent with the returned value as the body. + + To fail the request, the handler can return an instance of MessageHandlingError, + or respond() with one, or raise one such that it applies_to() the Request object + being handled. + + Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise + the appropriate exception type that applies_to() the Request object. """ - def __init__(self, channel, seq, command, arguments): - super(Request, self).__init__(channel, seq) + def __init__(self, channel, seq, command, arguments, json=None): + super(Request, self).__init__(channel, seq, json) self.command = command + if isinstance(arguments, MessageDict) and hasattr(arguments, "associate_with"): + arguments.associate_with(self) self.arguments = arguments - """Request arguments. - - For incoming requests, it is guaranteed that this is a MessageDict, and that - any nested dicts are also MessageDict instances. If "arguments" was missing - or null in JSON, arguments is an empty MessageDict - it is never None. - """ self.response = None - """Set to Response object for the corresponding response, once the request - is handled. + """Response to this request. For incoming requests, it is set as soon as the request handler returns. For outgoing requests, it is set as soon as the response is received, and - before Response.on_request is invoked. + before self._handle_response is invoked. """ + def describe(self): + return fmt("request {0!j}", self.command) + @property def payload(self): return self.arguments + def respond(self, body): + assert self.response is None + d = {"type": "response", "request_seq": self.seq, "command": self.command} + + if isinstance(body, Exception): + d["success"] = False + err_text = str(body) + try: + err_text = compat.force_unicode(err_text, "utf-8") + except Exception: + # On Python 2, the error message might not be Unicode, and we don't + # really know what encoding it is. So if treating it as UTF-8 failed, + # use repr() as a fallback - it should escape all non-ASCII chars in + # the string. + err_text = compat.force_unicode(repr(body), "ascii", errors="replace") + d["message"] = err_text + else: + d["success"] = True + if body is not None and body != {}: + d["body"] = body + + with self.channel._send_message(d) as seq: + pass + self.response = Response(self.channel, seq, self, body) + + @staticmethod + def _parse(channel, message_dict): + seq = message_dict("seq", int) + command = message_dict("command", unicode) + arguments = message_dict("arguments", _payload) + message = Request(channel, seq, command, arguments, json=message_dict) + channel._enqueue_handlers(message, message._handle) + + def _handle(self): + channel = self.channel + handler = channel._get_handler_for("request", self.command) + try: + try: + result = handler(self) + except MessageHandlingError as exc: + if not exc.applies_to(self): + raise + result = exc + log.error( + "Handler {0} couldn't handle {1} in channel {2}: {3}\n\n{4}", + compat.srcnameof(handler), + self.describe(), + self.channel, + str(exc), + self, + ) + + if result is NO_RESPONSE: + assert self.response is None, fmt( + "Handler {0} for {1} must not return NO_RESPONSE if it has already " + "invoked request.respond().", + compat.srcnameof(handler), + self.describe(), + ) + elif self.response is not None: + assert result is None or result is self.response.body, fmt( + "Handler {0} for {1} must not return a response body if it has " + "already invoked request.respond().", + compat.srcnameof(handler), + self.describe(), + ) + else: + assert result is not None, fmt( + "Handler {0} for {1} must either call request.respond() before it " + "returns, or return the response body, or return NO_RESPONSE.", + compat.srcnameof(handler), + self.describe(), + ) + self.respond(result) + + except Exception as exc: + raise log.exception( + "Handler {0} couldn't handle {1} in channel {2}:\n\n{3}\n\n", + compat.srcnameof(handler), + self.describe(), + self.channel, + self, + ) + class OutgoingRequest(Request): """Represents an outgoing request, for which it is possible to wait for a - response to be received, and register a response callback. + response to be received, and register a response handler. """ + _parse = _handle = None + def __init__(self, channel, seq, command, arguments): super(OutgoingRequest, self).__init__(channel, seq, command, arguments) - self._got_response = threading.Event() - self._callback = lambda _: None - - def _handle_response(self, response): - assert self is response.request - assert self.response is None - assert self.channel is response.channel - - with self.channel: - self.response = response - callback = self._callback - - callback(response) - self._got_response.set() + self._response_handlers = [] def wait_for_response(self, raise_if_failed=True): """Waits until a response is received for this request, records the Response object for it in self.response, and returns response.body. If no response was received from the other party before the channel closed, - self.response is a synthesized Response, which has NoMoreMessages() as its body. + self.response is a synthesized Response with body=NoMoreMessages(). If raise_if_failed=True and response.success is False, raises response.body instead of returning. """ - self._got_response.wait() + + with self.channel: + while self.response is None: + self.channel._handlers_enqueued.wait() + if raise_if_failed and not self.response.success: raise self.response.body return self.response.body - def on_response(self, callback): - """Registers a callback to invoke when a response is received for this request. - The callback is invoked with Response as its sole argument. - - If response has already been received, invokes the callback immediately. + def on_response(self, response_handler): + """Registers a handler to invoke when a response is received for this request. + The handler is invoked with Response as its sole argument. - It is guaranteed that self.response is set before the callback is invoked. + If response has already been received, invokes the handler immediately. + It is guaranteed that self.response is set before the handler is invoked. If no response was received from the other party before the channel closed, - a Response with body=NoMoreMessages() is synthesized. + self.response is a dummy Response with body=NoMoreMessages(). + + The handler is always invoked asynchronously on an unspecified background + thread - thus, the caller of on_response() can never be blocked or deadlocked + by the handler. - The callback may be invoked on an unspecified background thread that performs - processing of incoming messages; in that case, no further message processing - on the same channel will be performed until the callback returns. + No further incoming messages are processed until the handler returns, except for + responses to requests that have wait_for_response() invoked on them. """ - # Locking the channel ensures that there's no race condition with disconnect - # calling no_response(). Either we already have the synthesized response from - # there, in which case we will invoke it below; or we don't, in which case - # no_response() is yet to be called, and will invoke the callback. with self.channel: - response = self.response - if response is None: - self._callback = callback - return + self._response_handlers.append(response_handler) + self._enqueue_response_handlers() - callback(response) + def _enqueue_response_handlers(self): + response = self.response + if response is None: + # Response._parse() will submit the handlers when response is received. + return - def no_response(self): - """Indicates that this request is never going to receive a proper response. + def run_handlers(): + for handler in handlers: + try: + handler(response) + except MessageHandlingError as exc: + if not exc.applies_to(self): + raise + # Detailed exception info was already logged by its constructor. + log.error( + "Handler {0} couldn't handle {1}: {2}", + compat.srcnameof(handler), + self.describe(), + str(exc), + ) - Synthesizes the appopriate dummy Response, and invokes the callback with it. - """ - response = Response( - self.channel, - None, - self, - NoMoreMessages("No response", stream=self.channel.stream), - ) - self._handle_response(response) + handlers = self._response_handlers[:] + self.channel._enqueue_handlers(response, run_handlers) + del self._response_handlers[:] class Response(Message): """Represents an incoming or an outgoing response to a Request. + + https://microsoft.github.io/debug-adapter-protocol/specification#response + + error_message corresponds to "message" in JSON, and is renamed for clarity. + + If success is False, body is None. Otherwise, it is a MessageDict associated + with this Response, and so are all the nested dicts in it. If "body" was missing + or null in JSON, body is an empty dict. + + If this is a response to an outgoing request, it will be handled by the handler + registered via self.request.on_response(), if any. + + Regardless of whether there is such a handler, OutgoingRequest.wait_for_response() + can also be used to retrieve and handle the response. If there is a handler, it is + executed before wait_for_response() returns. + + No further incoming messages are processed until the handler returns, except for + responses to requests that have wait_for_response() invoked on them. + + To report failure to handle the event, the handler must raise an instance of + MessageHandlingError that applies_to() the Response object it was handling. Any + such failure is logged, after which the message loop moves on to the next message. + + Helper methods Message.isnt_valid() and Message.cant_handle() can be used to raise + the appropriate exception type that applies_to() the Response object. """ - def __init__(self, channel, seq, request, body): - super(Response, self).__init__(channel, seq) + def __init__(self, channel, seq, request, body, json=None): + super(Response, self).__init__(channel, seq, json) self.request = request + """The request to which this is the response.""" + if isinstance(body, MessageDict) and hasattr(body, "associate_with"): + body.associate_with(self) self.body = body """Body of the response if the request was successful, or an instance of some class derived from Exception it it was not. @@ -610,6 +881,9 @@ def __init__(self, channel, seq, request, body): it is an instance of NoMoreMessages. """ + def describe(self): + return fmt("response to request {0!j}", self.request.command) + @property def payload(self): return self.body @@ -630,19 +904,47 @@ def result(self): else: raise self.body + @staticmethod + def _parse(channel, message_dict): + seq = message_dict("seq", int) + request_seq = message_dict("request_seq", int) + command = message_dict("command", unicode) + success = message_dict("success", bool) + if success: + body = message_dict("body", _payload) + else: + error_message = message_dict("message", unicode) + exc_type = MessageHandlingError + if error_message.startswith(InvalidMessageError.PREFIX): + error_message = error_message[len(InvalidMessageError.PREFIX) :] + exc_type = InvalidMessageError + body = exc_type(error_message, silent=True) -class Event(Message): - """Represents an incoming event. - """ + try: + with channel: + request = channel._sent_requests.pop(request_seq) + known_request = True + except KeyError: + # Synthetic Request that only has seq and command as specified in response + # JSON, for error reporting purposes. + request = OutgoingRequest(channel, request_seq, command, "") + known_request = False - def __init__(self, channel, seq, event, body): - super(Event, self).__init__(channel, seq) - self.event = event - self.body = body + if not success: + body.cause = request - @property - def payload(self): - return self.body + response = Response(channel, seq, request, body, json=message_dict) + + with channel: + request.response = response + request._enqueue_response_handlers() + + if known_request: + return response + else: + raise response.isnt_valid( + "request_seq={0} does not match any known request", request_seq + ) class MessageHandlingError(Exception): @@ -673,11 +975,12 @@ class MessageHandlingError(Exception): by that time they have already been logged by their __init__ (when instantiated). """ - def __init__(self, reason, cause=None): + def __init__(self, reason, cause=None, silent=False): """Creates a new instance of this class, and immediately logs the exception. - Message handling errors are logged immediately, so that the precise context - in which they occured can be determined from the surrounding log entries. + Message handling errors are logged immediately unless silent=True, so that the + precise context in which they occured can be determined from the surrounding + log entries. """ self.reason = reason @@ -691,11 +994,11 @@ def __init__(self, reason, cause=None): to unknown requests, this is a synthetic Request. """ - try: - raise self - except MessageHandlingError: - # TODO: change to E after unifying logging with tests - log.exception(level="info") + if not silent: + try: + raise self + except MessageHandlingError: + log.exception() def __hash__(self): return hash((self.reason, id(self.cause))) @@ -780,25 +1083,21 @@ class JsonMessageChannel(object): channel.send_event(...) """ - report_unhandled_events = True - """If True, any event that couldn't be handled successfully will be reported - by sending a corresponding "event_not_handled" event in response. Can be set - per-instance. - - This helps diagnose why important events are seemingly ignored, when the only - message log that is available is the one for the other end of the channel. - """ - def __init__(self, stream, handlers=None, name=None): self.stream = stream self.handlers = handlers self.name = name if name is not None else stream.name self._lock = threading.RLock() - self._stop = threading.Event() + self._closed = False self._seq_iter = itertools.count(1) - self._requests = {} - self._worker = new_hidden_thread(repr(self), self._process_incoming_messages) - self._worker.daemon = True + self._sent_requests = {} # {seq: Request} + self._handler_queue = [] # [(what, handler)] + self._handlers_enqueued = threading.Condition(self._lock) + self._handler_thread = None + self._parser_thread = None + + def __str__(self): + return self.name def __repr__(self): return fmt("{0}({1!r})", type(self).__name__, self.name) @@ -813,46 +1112,65 @@ def __exit__(self, exc_type, exc_value, exc_tb): def close(self): """Closes the underlying stream. - This does not immediately terminate any handlers that were already running, - but they will be unable to respond. + This does not immediately terminate any handlers that are already executing, + but they will be unable to respond. No new request or event handlers will + execute after this method is called, even for messages that have already been + received. However, response handlers will continue to executed for any request + that is still pending, as will any handlers registered via on_response(). """ - self.stream.close() + with self: + if not self._closed: + self._closed = True + self.stream.close() def start(self): - """Starts a message loop on a background thread, which invokes on_message - for every new incoming message, until the channel is closed. + """Starts a message loop which parses incoming messages and invokes handlers + for them on a background thread, until the channel is closed. - Incoming messages will not be processed at all until this is invoked. + Incoming messages, including responses to requests, will not be processed at + all until this is invoked. """ - self._worker.start() + self._parser_thread = threading.Thread( + target=self._parse_incoming_messages, name=fmt("{0} message parser", self) + ) + self._parser_thread.pydev_do_not_trace = True + self._parser_thread.is_pydev_daemon_thread = True + self._parser_thread.daemon = True + self._parser_thread.start() def wait(self): - """Waits until the message loop terminates. + """Waits for the message loop to terminate, and for all enqueued Response + message handlers to finish executing. """ - self._worker.join() - - @staticmethod - def _prettify(message_dict): + parser_thread = self._parser_thread + if parser_thread is not None: + parser_thread.join() + handler_thread = self._handler_thread + if handler_thread is not None: + handler_thread.join() + + # Order of keys for _prettify() - follows the order of properties in + # https://microsoft.github.io/debug-adapter-protocol/specification + _prettify_order = ( + "seq", + "type", + "request_seq", + "success", + "command", + "event", + "message", + "arguments", + "body", + "error", + ) + + def _prettify(self, message_dict): """Reorders items in a MessageDict such that it is more readable. """ - # https://microsoft.github.io/debug-adapter-protocol/specification - keys = ( - "seq", - "type", - "request_seq", - "success", - "command", - "event", - "message", - "arguments", - "body", - "error", - ) - for key in keys: - try: - value = message_dict[key] - except KeyError: + for key in self._prettify_order: + if key not in message_dict: continue + value = message_dict[key] del message_dict[key] message_dict[key] = value @@ -904,7 +1222,7 @@ def send_request(self, command, arguments=None, on_before_send=None): request = OutgoingRequest(self, seq, command, arguments) if on_before_send is not None: on_before_send(request) - self._requests[seq] = request + self._sent_requests[seq] = request return request def send_event(self, event, body=None): @@ -932,351 +1250,75 @@ def propagate(self, message): If it was a request, returns the new OutgoingRequest object for it. """ - if isinstance(message, Request): + assert message.is_request() or message.is_event() + if message.is_request(): return self.send_request(message.command, message.arguments) else: self.send_event(message.event, message.body) - def delegate(self, request): - """Like propagate(request).wait_for_response(), but will also propagate + def delegate(self, message): + """Like propagate(message).wait_for_response(), but will also propagate any resulting MessageHandlingError back. """ - assert isinstance(request, Request) try: - return self.propagate(request).wait_for_response() + result = self.propagate(message) + if result.is_request(): + result = result.wait_for_response() + return result except MessageHandlingError as exc: - exc.propagate(request) - - def _send_response(self, request, body): - d = {"type": "response", "request_seq": request.seq, "command": request.command} - - if isinstance(body, Exception): - d["success"] = False - d["message"] = str(body) - else: - d["success"] = True - if body != {}: - d["body"] = body - - with self._send_message(d) as seq: - pass - - response = Response(self, seq, request.seq, body) - response.request = request - return response - - @staticmethod - def _get_payload(message, name): - """Retrieves payload from a deserialized message. - - Same as message[name], but if that value is missing or null, it is treated - as if it were {}. - """ - - payload = message.get(name, None) - if payload is not None: - if isinstance(payload, dict): # can be int, str, list... - assert isinstance(payload, MessageDict) - return payload - - # Missing payload. Construct a dummy MessageDict, and make it look like - # it was deserialized. See _process_incoming_message for why it needs to - # have associate_with(). - - def associate_with(message): - payload.message = message - - payload = MessageDict(None) - payload.associate_with = associate_with - return payload - - def _on_message(self, message): - """Invoked for every incoming message after deserialization, but before any - further processing. - - The default implementation invokes _on_request, _on_response or _on_event, - according to the type of the message. - """ - - seq = message["seq"] - typ = message["type"] - if typ == "request": - command = message["command"] - arguments = self._get_payload(message, "arguments") - return self._on_request(seq, command, arguments) - elif typ == "event": - event = message["event"] - body = self._get_payload(message, "body") - return self._on_event(seq, event, body) - elif typ == "response": - request_seq = message["request_seq"] - success = message["success"] - command = message["command"] - error_message = message.get("message", None) - body = self._get_payload(message, "body") if success else None - return self._on_response( - seq, request_seq, success, command, error_message, body - ) - else: - message.isnt_valid('invalid "type": {0!r}', message.type) - - def _get_handler_for(self, type, name): - for handler_name in (name + "_" + type, type): - try: - return getattr(self.handlers, handler_name) - except AttributeError: - continue - raise AttributeError( - fmt( - "{0} has no {1} handler for {2!r}", - compat.srcnameof(self.handlers), - type, - name, - ) - ) - - def _on_request(self, seq, command, arguments): - """Invoked for every incoming request after deserialization and parsing, but - before handling. - - It is guaranteed that arguments is a MessageDict, and all nested dicts in it are - also MessageDict instances. If "arguments" was missing or null in JSON, this - method receives an empty MessageDict. All dicts have owner=None, but it can be - changed with arguments.associate_with(). - - The default implementation tries to find a handler for command in self.handlers, - and invoke it. Given command=X, if handlers.X_request exists, then it is the - specific handler for this request. Otherwise, handlers.request must exist, and - it is the generic handler for this request. A missing handler is a fatal error. - - The handler is then invoked with the Request object as its sole argument. It can - either be a simple function that returns a value directly, or a generator that - yields. - - If the handler returns a value directly, the response is sent immediately, with - Response.body as the returned value. If the value is None, it is a fatal error. - No further incoming messages are processed until the handler returns. - - If the handler returns a generator object, it will be iterated until it yields - a non-None value. Every yield of None is treated as request to process another - pending message recursively (which may cause re-entrancy in the handler), after - which the generator is resumed with the Message object for that message. - - Once a non-None value is yielded from the generator, it is treated the same as - in non-generator case. It is a fatal error for the generator to not yield such - a value before it stops. - - Thus, when a request handler needs to wait until another request or event is - handled before it can respond, it should yield in a loop, so that any other - messages can be processed until that happens:: - - while True: - msg = yield - if msg.is_event('party'): - break - - or when it's waiting for some change in state: - - self.ready = False - while not self.ready: - yield # some other handler must set self.ready = True - - To fail the request, the handler must raise an instance of MessageHandlingError - that applies_to() the Request object it was handling. Use Message.isnt_valid - to report invalid requests, and Message.cant_handle to report valid requests - that could not be processed. - - The handler can raise an instance of NoMoreMessages with either stream=None or - stream=self.stream to indicate that this is the last incoming message, and no - further messages should be read and processed from the stream. - """ - - handler = self._get_handler_for("request", command) - request = Request(self, seq, command, arguments) - - if isinstance(arguments, dict): - arguments.associate_with(request) - - def _assert_response(result): - assert result is not None, fmt( - "Request handler {0} must provide a response for {1!r}.", - compat.srcnameof(handler), - command, - ) - - try: - result = handler(request) - except MessageHandlingError as exc: - if not exc.applies_to(request): - raise - result = exc - _assert_response(result) - - if inspect.isgenerator(result): - gen = result - else: - # Wrap a non-generator return into a generator, to unify processing below. - def gen(): - yield result - - gen = gen() - - # Process messages recursively until generator yields the response. - last_message = None - while True: - try: - response_body = gen.send(last_message) - except MessageHandlingError as exc: - if not exc.applies_to(request): - raise - response_body = exc - break - except StopIteration: - response_body = {} - - if response_body is not None: - gen.close() - break - - last_message = self._process_incoming_message() # re-entrant - - _assert_response(response_body) - request.response = self._send_response(request, response_body) - return request - - def _on_event(self, seq, event, body): - """Invoked for every incoming event after deserialization and parsing, but - before handling. - - It is guaranteed that body is a MessageDict, and all nested dicts in it are - also MessageDict instances. If "body" was missing or null in JSON, this method - receives an empty MessageDict. All dicts have owner=None, but it can be changed - with body.associate_with(). - - The default implementation tries to find a handler for event in self.handlers, - and invoke it. Given event=X, if handlers.X_event exists, then it is the - specific handler for this event. Otherwise, handlers.event must exist, and - it is the generic handler for this event. A missing handler is a fatal error. - - No further incoming messages are processed until the handler returns. - - To report failure to handle the event, the handler must raise an instance of - MessageHandlingError that applies_to() the Event object it was handling. Use - Message.isnt_valid to report invalid events, and Message.cant_handle to report - valid events that could not be processed. - - If report_unhandled_events is True, then failure to handle the event will be - reported to the sender as an "event_not_handled" event. Otherwise, the sender - does not receive any notifications. - - The handler can raise an instance of NoMoreMessages with either stream=None or - stream=self.stream to indicate that this is the last incoming message, and no - further messages should be read and processed from the stream. - """ - - handler = self._get_handler_for("event", event) - event = Event(self, seq, event, body) - - if isinstance(body, dict): - body.associate_with(event) + exc.propagate(message) + def _parse_incoming_messages(self): + log.debug("Starting message loop for channel {0}", self) try: - result = handler(event) - except MessageHandlingError as exc: - if not exc.applies_to(event): - raise - if self.report_unhandled_events: - message = exc.reason - if isinstance(exc, InvalidMessageError): - message = InvalidMessageError.PREFIX + message - self.send_event( - "event_not_handled", {"event_seq": seq, "message": message} - ) - - assert result is None, fmt( - "Event handler {0} tried to respond to {1!r}.", - compat.srcnameof(handler), - event.event, - ) - - return event - - def _on_response(self, seq, request_seq, success, command, error_message, body): - """Invoked for every incoming response after deserialization and parsing, but - before handling. - - error_message corresponds to "message" in JSON, and is renamed for clarity. - - If success is False, body is None. Otherwise, it is guaranteed that body is - a MessageDict, and all nested dicts in it are also MessageDict instances. If - "body" was missing or null in JSON, this method receives an empty MessageDict. - All dicts have owner=None, but it can be changed with body.associate_with(). - - The default implementation delegates to the OutgoingRequest object for the - request to which this is the response for further handling. If there is no - such object - i.e. it is an unknown request - the response logged and ignored. - - See OutgoingRequest.on_response and OutgoingRequest.wait_for_response for - high-level response handling facilities. - - No further incoming messages are processed until the handler returns. - - The handler can raise an instance of NoMoreMessages with either stream=None or - stream=self.stream to indicate that this is the last incoming message, and no - further messages should be read and processed from the stream. - """ - - # Synthetic Request that only has seq and command as specified in response JSON. - # It is replaced with the actual Request later, if we can find it. - request = OutgoingRequest(self, request_seq, command, "") - - if not success: - error_message = str(error_message) - exc_type = MessageHandlingError - if error_message.startswith(InvalidMessageError.PREFIX): - error_message = error_message[len(InvalidMessageError.PREFIX) :] - exc_type = InvalidMessageError - body = exc_type(error_message, request) - - response = Response(self, seq, request, body) - - if isinstance(body, dict): - body.associate_with(response) + while True: + self._parse_incoming_message() - try: + except NoMoreMessages as exc: + log.debug("Exiting message loop for channel {0}: {1}", self, exc) with self: - request = self._requests.pop(request_seq) - except KeyError: - response.isnt_valid( - "request_seq={0} does not match any known request", request_seq - ) + # Generate dummy responses for all outstanding requests. + err_message = compat.force_unicode(str(exc), "utf-8", errors="replace") + + # Response._parse() will remove items from _sent_requests, so + # make a snapshot before iterating. + sent_requests = list(self._sent_requests.values()) + + for request in sent_requests: + response_json = MessageDict( + None, + { + "seq": -1, + "request_seq": request.seq, + "command": request.command, + "success": False, + "message": err_message, + }, + ) - # Replace synthetic Request with real one. - response.request = request - if isinstance(response.body, MessageHandlingError): - response.body.request = request + response = Response._parse(self, response_json) + response.seq = None + response.body = exc - request._handle_response(response) + response_json.message = request.response = response + request._enqueue_response_handlers() - def on_disconnect(self): - """Invoked when the channel is closed. + assert not len(self._sent_requests) - No further message handlers will be invoked after this one returns. + self._enqueue_handlers("disconnect", self._handle_disconnect) + self.close() - The default implementation ensures that any requests that are still outstanding - automatically receive synthesized "no response" responses, and then invokes - handlers.disconnect with no arguments, if it exists. - """ + _message_parsers = { + "event": Event._parse, + "request": Request._parse, + "response": Response._parse, + } - # Lock the channel to properly synchronize with the instant callback logic - # in Request.on_response(). - with self: - for request in self._requests.values(): - request.no_response() - - getattr(self.handlers, "disconnect", lambda: None)() + def _parse_incoming_message(self): + """Reads incoming messages, parses them, and puts handlers into the queue + for _run_handlers() to invoke, until the channel is closed. + """ - def _process_incoming_message(self): # Set up a dedicated decoder for this message, to create MessageDict instances # for all JSON objects, and track them so that they can be later wired up to # the Message they belong to, once it is instantiated. @@ -1294,9 +1336,10 @@ def object_hook(d): # dicts are created during deserialization. # # So, upon deserialization, every dict in the message payload gets a method - # that can be called to set MessageDict.message for _all_ dicts in that message. - # Then, _on_request, _on_event, and _on_response can use it once they have parsed - # the dicts, and created the appropriate Request/Event/Response instance. + # that can be called to set MessageDict.message for *all* dicts belonging to + # that message. This method can then be invoked on the top-level dict by the + # parser, after it has parsed enough of the dict to create the appropriate + # instance of Event, Request, or Response for this message. def associate_with(message): for d in message_dicts: d.message = message @@ -1304,40 +1347,137 @@ def associate_with(message): message_dicts = [] decoder = self.stream.json_decoder_factory(object_hook=object_hook) - message = self.stream.read_json(decoder) - assert isinstance(message, MessageDict) # make sure stream used decoder + message_dict = self.stream.read_json(decoder) + assert isinstance(message_dict, MessageDict) # make sure stream used decoder + msg_type = message_dict("type", json.enum("event", "request", "response")) + parser = self._message_parsers[msg_type] try: - return self._on_message(message) - except NoMoreMessages: - raise - except Exception: - raise log.exception( - "Fatal error while processing message for {0}:\n\n{1!j}", - self.name, - message, + parser(self, message_dict) + except InvalidMessageError as exc: + log.error( + "Failed to parse message in channel {0}: {1} in:\n{2!j}", + self, + str(exc), + message_dict, ) + except Exception as exc: + if isinstance(exc, NoMoreMessages) and exc.stream is self.stream: + raise + log.exception( + "Fatal error in channel {0} while parsing:\n{1!j}", self, message_dict + ) + os._exit(1) + + def _enqueue_handlers(self, what, *handlers): + """Enqueues handlers for _run_handlers() to run. + + `what` describes what is being handled, and is used for logging purposes. + Normally it's a Message instance, but it can be anything printable. + + If the background thread with _run_handlers() isn't running yet, starts it. + """ + + with self: + self._handler_queue.extend((what, handler) for handler in handlers) + self._handlers_enqueued.notify_all() + + # If there is anything to handle, but there's no handler thread yet, + # spin it up. This will normally happen only once, on the first call + # to _enqueue_handlers(), and that thread will run all the handlers + # for parsed messages. However, this can also happen is somebody calls + # Request.on_response() - possibly concurrently from multiple threads - + # after the channel has already been closed, and the initial handler + # thread has exited. In this case, we spin up a new thread just to run + # the enqueued response handlers, and it will exit as soon as it's out + # of handlers to run. + if len(self._handler_queue) and self._handler_thread is None: + self._handler_thread = threading.Thread( + target=self._run_handlers, name=fmt("{0} message handler", self) + ) + self._handler_thread.pydev_do_not_trace = True + self._handler_thread.is_pydev_daemon_thread = True + self._handler_thread.start() + + def _run_handlers(self): + """Runs enqueued handlers until the channel is closed, or until the handler + queue is empty once the channel is closed. + """ + + while True: + with self: + closed = self._closed + if closed: + # Wait for the parser thread to wrap up and enqueue any remaining + # handlers, if it is still running. + self._parser_thread.join() + # From this point on, _enqueue_handlers() can only get called + # from Request.on_response(). + + with self: + if not closed and not len(self._handler_queue): + # Wait for something to process. + self._handlers_enqueued.wait() + + # Make a snapshot before releasing the lock. + handlers = self._handler_queue[:] + del self._handler_queue[:] + + if closed and not len(handlers): + # Nothing to process, channel is closed, and parser thread is + # not running anymore - time to quit! If Request.on_response() + # needs to call _enqueue_handlers() later, it will spin up + # a new handler thread. + self._handler_thread = None + return + + for what, handler in handlers: + # If the channel is closed, we don't want to process any more events + # or requests - only responses and the final disconnect handler. This + # is to guarantee that if a handler calls close() on its own channel, + # the corresponding request or event is the last thing to be processed. + if closed and handler in (Event._handle, Request._handle): + continue - def _process_incoming_messages(self): - try: - log.debug("Starting message loop for {0}", self.name) - while True: try: - self._process_incoming_message() - except NoMoreMessages as exc: - if exc.stream is None or exc.stream is self.stream: - log.debug( - "Exiting message loop for {0}: {1}", self.name, str(exc) - ) - return False - else: - raise - finally: + handler() + except Exception as exc: + log.exception( + "Fatal error in channel {0} while handling {1}:", self, what + ) + self.close() + os._exit(1) + + def _get_handler_for(self, type, name): + """Returns the handler for a message of a given type. + """ + + for handler_name in (name + "_" + type, type): try: - self.on_disconnect() - except Exception: - log.exception("Error while processing disconnect for {0}", self.name) - raise + return getattr(self.handlers, handler_name) + except AttributeError: + continue + + raise AttributeError( + fmt( + "channel {0} has no handler for {1} {2!r}", + compat.srcnameof(self.handlers), + type, + name, + ) + ) + + def _handle_disconnect(self): + handler = getattr(self.handlers, "disconnect", lambda: None) + try: + handler() + except Exception: + log.exception( + "Handler {0} couldn't handle disconnect in channel {1}:", + compat.srcnameof(handler), + self.channel, + ) + os._exit(1) class MessageHandlers(object): diff --git a/tests/debug.py b/tests/debug.py index eca8596c9..9066a5ded 100644 --- a/tests/debug.py +++ b/tests/debug.py @@ -801,7 +801,7 @@ def _process_event(self, event): 'Received "terminated" event from {0}; stopping message processing.', self, ) - raise messaging.NoMoreMessages(fmt("{0} terminated", self)) + self.channel.close() def _process_request(self, request): self.timeline.record_request(request, block=False) diff --git a/tests/ptvsd/common/test_messaging.py b/tests/ptvsd/common/test_messaging.py index c5059e518..8cef09704 100644 --- a/tests/ptvsd/common/test_messaging.py +++ b/tests/ptvsd/common/test_messaging.py @@ -8,6 +8,7 @@ """ import collections +import functools import json import io import pytest @@ -44,17 +45,26 @@ def __init__(self, input, output, name="memory"): def close(self): pass + def _log_message(self, dir, data): + format_string = "{0} {1} " + ( + "{2!j:indent=None}" if isinstance(data, list) else "{2!j}" + ) + return log.debug(format_string, self.name, dir, data) + def read_json(self, decoder=None): decoder = decoder if decoder is not None else self.json_decoder_factory() try: value = next(self.input) except StopIteration: raise messaging.NoMoreMessages(stream=self) - return decoder.decode(json.dumps(value)) + value = decoder.decode(json.dumps(value)) + self._log_message("-->", value) + return value def write_json(self, value, encoder=None): encoder = encoder if encoder is not None else self.json_encoder_factory() value = json.loads(encoder.encode(value)) + self._log_message("<--", value) self.output.append(value) @@ -67,7 +77,9 @@ class TestJsonIOStream(object): def setup_class(cls): for seq in range(0, 3): message_body = cls.MESSAGE_BODY_TEMPLATE % seq - message = json.loads(message_body, object_pairs_hook=collections.OrderedDict) + message = json.loads( + message_body, object_pairs_hook=collections.OrderedDict + ) message_body = message_body.encode("utf-8") cls.MESSAGES.append(message) message_header = "Content-Length: %d\r\n\r\n" % len(message_body) @@ -115,6 +127,48 @@ def test_write(self): assert messages == self.MESSAGES +class MessageHandlerRecorder(list): + def __call__(self, handler): + @functools.wraps(handler) + def record_and_handle(instance, message): + name = handler.__name__ + if isinstance(name, bytes): + name = name.decode("utf-8") + record = {"channel": message.channel, "handler": name} + + if isinstance(message, messaging.Event): + record.update( + {"type": "event", "event": message.event, "body": message.body} + ) + elif isinstance(message, messaging.Request): + record.update( + { + "type": "request", + "command": message.command, + "arguments": message.arguments, + } + ) + + self.append(record) + return handler(instance, message) + + return record_and_handle + + def expect(self, channel, inputs, handlers): + expected_records = [] + for input, handler in zip(inputs, handlers): + expected_record = {"channel": channel, "handler": handler} + expected_record.update( + { + key: value + for key, value in input.items() + if key in ("type", "event", "command", "body", "arguments") + } + ) + expected_records.append(expected_record) + assert expected_records == self + + class TestJsonMessageChannel(object): @staticmethod def iter_with_event(collection): @@ -146,26 +200,23 @@ def test_events(self): }, ] - events_received = [] + recorder = MessageHandlerRecorder() class Handlers(object): + @recorder def stopped_event(self, event): assert event.event == "stopped" - events_received.append((event.channel, event.body)) + @recorder def event(self, event): - events_received.append((event.channel, event.event, event.body)) + assert event.event == "unknown" - input, input_exhausted = self.iter_with_event(EVENTS) - stream = JsonMemoryStream(input, []) + stream = JsonMemoryStream(EVENTS, []) channel = messaging.JsonMessageChannel(stream, Handlers()) channel.start() - input_exhausted.wait() + channel.wait() - assert events_received == [ - (channel, EVENTS[0]["body"]), - (channel, "unknown", EVENTS[1]["body"]), - ] + recorder.expect(channel, EVENTS, ["stopped_event", "event"]) def test_requests(self): REQUESTS = [ @@ -178,50 +229,59 @@ def test_requests(self): { "seq": 2, "type": "request", + "command": "launch", + "arguments": {"program": "main.py"}, + }, + { + "seq": 3, + "type": "request", "command": "unknown", "arguments": {"answer": 42}, }, { - "seq": 3, + "seq": 4, "type": "request", "command": "pause", "arguments": {"threadId": 5}, }, ] - requests_received = [] + recorder = MessageHandlerRecorder() class Handlers(object): + @recorder def next_request(self, request): assert request.command == "next" - requests_received.append((request.channel, request.arguments)) return {"threadId": 7} + @recorder + def launch_request(self, request): + assert request.command == "launch" + self._launch = request + return messaging.NO_RESPONSE + + @recorder def request(self, request): - requests_received.append( - (request.channel, request.command, request.arguments) - ) - return {} + request.respond({}) + @recorder def pause_request(self, request): assert request.command == "pause" - requests_received.append((request.channel, request.arguments)) - request.cant_handle("pause error") + self._launch.respond({"processId": 9}) + raise request.cant_handle("pause error") - input, input_exhausted = self.iter_with_event(REQUESTS) - output = [] - stream = JsonMemoryStream(input, output) + stream = JsonMemoryStream(REQUESTS, []) channel = messaging.JsonMessageChannel(stream, Handlers()) channel.start() - input_exhausted.wait() + channel.wait() - assert requests_received == [ - (channel, REQUESTS[0]["arguments"]), - (channel, "unknown", REQUESTS[1]["arguments"]), - (channel, REQUESTS[2]["arguments"]), - ] + recorder.expect( + channel, + REQUESTS, + ["next_request", "launch_request", "request", "pause_request"], + ) - assert output == [ + assert stream.output == [ { "seq": 1, "type": "response", @@ -233,14 +293,22 @@ def pause_request(self, request): { "seq": 2, "type": "response", - "request_seq": 2, + "request_seq": 3, "command": "unknown", "success": True, }, { "seq": 3, "type": "response", - "request_seq": 3, + "request_seq": 2, + "command": "launch", + "success": True, + "body": {"processId": 9}, + }, + { + "seq": 4, + "type": "response", + "request_seq": 4, "command": "pause", "success": False, "message": "pause error", @@ -251,6 +319,7 @@ def test_responses(self): request1_sent = threading.Event() request2_sent = threading.Event() request3_sent = threading.Event() + request4_sent = threading.Event() def iter_responses(): request1_sent.wait() @@ -283,6 +352,8 @@ def iter_responses(): "body": {"threadId": 5}, } + request4_sent.wait() + stream = JsonMemoryStream(iter_responses(), []) channel = messaging.JsonMessageChannel(stream, None) channel.start() @@ -290,6 +361,7 @@ def iter_responses(): # Blocking wait. request1 = channel.send_request("next") request1_sent.set() + log.info("Waiting for response...") response1_body = request1.wait_for_response() response1 = request1.response @@ -307,8 +379,11 @@ def response2_handler(resp): response2.append(resp) response2_received.set() + log.info("Registering callback") request2.on_response(response2_handler) request2_sent.set() + + log.info("Waiting for callback...") response2_received.wait() response2, = response2 @@ -330,7 +405,10 @@ def response3_handler(resp): response3.append(resp) response3_received.set() + log.info("Registering callback") request3.on_response(response3_handler) + + log.info("Waiting for callback...") response3_received.wait() response3, = response3 @@ -339,229 +417,28 @@ def response3_handler(resp): assert response3 is request3.response assert response3.body == {"threadId": 5} - def test_yield(self): - REQUESTS = [ - { - "seq": 10, - "type": "request", - "command": "launch", - "arguments": {"noDebug": False}, - }, - { - "seq": 20, - "type": "request", - "command": "setBreakpoints", - "arguments": {"main.py": 1}, - }, - {"seq": 30, "type": "event", "event": "expected"}, - { - "seq": 40, - "type": "request", - "command": "launch", - "arguments": {"noDebug": True}, - }, # test re-entrancy - { - "seq": 50, - "type": "request", - "command": "setBreakpoints", - "arguments": {"main.py": 2}, - }, - {"seq": 60, "type": "event", "event": "unexpected"}, - {"seq": 80, "type": "request", "command": "configurationDone"}, - { - "seq": 90, - "type": "request", - "command": "launch", - }, # test handler yielding empty body - ] + # Async callback, registered after channel is closed. + request4 = channel.send_request("next") + request4_sent.set() + channel.wait() + response4 = [] + response4_received = threading.Event() - class Handlers(object): + def response4_handler(resp): + response4.append(resp) + response4_received.set() - received = { - "launch": 0, - "setBreakpoints": 0, - "configurationDone": 0, - "expected": 0, - "unexpected": 0, - } + log.info("Registering callback") + request4.on_response(response4_handler) - def launch_request(self, request): - assert request.seq in (10, 40, 90) - self.received["launch"] += 1 - - if request.seq == 10: # launch #1 - assert self.received == { - "launch": 1, - "setBreakpoints": 0, - "configurationDone": 0, - "expected": 0, - "unexpected": 0, - } + log.info("Waiting for callback...") + response4_received.wait() + response4, = response4 - msg = yield # setBreakpoints #1 - assert msg.seq == 20 - assert self.received == { - "launch": 1, - "setBreakpoints": 1, - "configurationDone": 0, - "expected": 0, - "unexpected": 0, - } - - msg = yield # expected - assert msg.seq == 30 - assert self.received == { - "launch": 1, - "setBreakpoints": 1, - "configurationDone": 0, - "expected": 1, - "unexpected": 0, - } - - msg = yield # launch #2 + nested messages - assert msg.seq == 40 - assert self.received == { - "launch": 2, - "setBreakpoints": 2, - "configurationDone": 0, - "expected": 1, - "unexpected": 1, - } - - # We should see that it failed, but no exception bubbling up here. - assert not msg.response.success - assert msg.response.body == messaging.MessageHandlingError( - "test failure", msg - ) - - msg = yield # configurationDone - assert msg.seq == 80 - assert self.received == { - "launch": 2, - "setBreakpoints": 2, - "configurationDone": 1, - "expected": 1, - "unexpected": 1, - } - - yield {"answer": 42} - - elif request.seq == 40: # launch #1 - assert self.received == { - "launch": 2, - "setBreakpoints": 1, - "configurationDone": 0, - "expected": 1, - "unexpected": 0, - } - - msg = yield # setBreakpoints #2 - assert msg.seq == 50 - assert self.received == { - "launch": 2, - "setBreakpoints": 2, - "configurationDone": 0, - "expected": 1, - "unexpected": 0, - } - - msg = yield # unexpected - assert msg.seq == 60 - assert self.received == { - "launch": 2, - "setBreakpoints": 2, - "configurationDone": 0, - "expected": 1, - "unexpected": 1, - } - - request.cant_handle("test failure") - - elif request.seq == 90: # launch #3 - assert self.received == { - "launch": 3, - "setBreakpoints": 2, - "configurationDone": 1, - "expected": 1, - "unexpected": 1, - } - # yield {} - - def setBreakpoints_request(self, request): - assert request.seq in (20, 50, 70) - self.received["setBreakpoints"] += 1 - return {"which": self.received["setBreakpoints"]} - - def request(self, request): - assert request.seq == 80 - assert request.command == "configurationDone" - self.received["configurationDone"] += 1 - return {} - - def expected_event(self, event): - assert event.seq == 30 - self.received["expected"] += 1 - - def event(self, event): - assert event.seq == 60 - assert event.event == "unexpected" - self.received["unexpected"] += 1 - - input, input_exhausted = self.iter_with_event(REQUESTS) - output = [] - stream = JsonMemoryStream(input, output) - channel = messaging.JsonMessageChannel(stream, Handlers()) - channel.start() - input_exhausted.wait() - - assert output == [ - { - "seq": 1, - "type": "response", - "request_seq": 20, - "command": "setBreakpoints", - "success": True, - "body": {"which": 1}, - }, - { - "seq": 2, - "type": "response", - "request_seq": 50, - "command": "setBreakpoints", - "success": True, - "body": {"which": 2}, - }, - { - "seq": 3, - "type": "response", - "request_seq": 40, - "command": "launch", - "success": False, - "message": "test failure", - }, - { - "seq": 4, - "type": "response", - "request_seq": 80, - "command": "configurationDone", - "success": True, - }, - { - "seq": 5, - "type": "response", - "request_seq": 10, - "command": "launch", - "success": True, - "body": {"answer": 42}, - }, - { - "seq": 6, - "type": "response", - "request_seq": 90, - "command": "launch", - "success": True, - }, - ] + assert not response4.success + assert response4.request is request4 + assert response4 is request4.response + assert isinstance(response4.body, messaging.NoMoreMessages) def test_invalid_request_handling(self): REQUESTS = [ @@ -587,12 +464,11 @@ def request(self, request): def pause_request(self, request): request.arguments["DDD"] - input, input_exhausted = self.iter_with_event(REQUESTS) output = [] - stream = JsonMemoryStream(input, output) + stream = JsonMemoryStream(REQUESTS, output) channel = messaging.JsonMessageChannel(stream, Handlers()) channel.start() - input_exhausted.wait() + channel.wait() def missing_property(name): return some.str.matching("Invalid message:.*" + re.escape(name) + ".*") diff --git a/tests/ptvsd/server/test_breakpoints.py b/tests/ptvsd/server/test_breakpoints.py index f1fa9dad3..b9b3d244d 100644 --- a/tests/ptvsd/server/test_breakpoints.py +++ b/tests/ptvsd/server/test_breakpoints.py @@ -266,6 +266,7 @@ def test_package_launch(): test_py = cwd / "pkg1" / "__main__.py" with debug.Session("launch") as session: + session.expected_returncode = 42 session.initialize(target=("module", "pkg1"), cwd=cwd) session.set_breakpoints(test_py, ["two"]) session.start_debugging() diff --git a/tests/test_data/testpkgs/pkg1/__main__.py b/tests/test_data/testpkgs/pkg1/__main__.py index e143ac5d3..9426f13eb 100644 --- a/tests/test_data/testpkgs/pkg1/__main__.py +++ b/tests/test_data/testpkgs/pkg1/__main__.py @@ -1,3 +1,5 @@ +import sys print('one') # @one print('two') # @two print('three') # @three +sys.exit(42) \ No newline at end of file