Skip to content

Commit

Permalink
Use a separate exception type NoMoreMessages to indicate end-of-strea…
Browse files Browse the repository at this point in the history
…m for JSON messaging, to avoid leaking EOFError from disk I/O or from another stream not being properly reported as errors.
  • Loading branch information
int19h committed Aug 1, 2019
1 parent 0ea5046 commit 768386d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/ptvsd/adapter/channels.py
Expand Up @@ -29,7 +29,7 @@ def server(self):
Created when handling the "attach" or "launch" request.
When the server disconnects, the channel remains, but is closed, and will raise
EOFError on writes.
NoMoreMessages on writes.
"""
return self._server

Expand Down
76 changes: 58 additions & 18 deletions src/ptvsd/common/messaging.py
Expand Up @@ -23,6 +23,25 @@
from ptvsd.common._util import new_hidden_thread


class NoMoreMessages(EOFError):
"""Indicates that there are no more messages to be read from the stream.
"""

def __init__(self, *args, **kwargs):
stream = kwargs.pop("stream", None)
args = args if len(args) else ["No more messages"]
super(NoMoreMessages, self).__init__(*args, **kwargs)

self.stream = stream
"""The stream that doesn't have any more messages.
Set by JsonIOStream.read_json().
JsonMessageChannel relies on this value to decide whether a NoMoreMessages
instance that bubbles up to the message loop is related to that loop.
"""


class JsonIOStream(object):
"""Implements a JSON value stream over two byte streams (input and output).
Expand Down Expand Up @@ -116,25 +135,24 @@ def _log_message(self, dir, data, logger=log.debug):
)
return logger(format_string, self.name, dir, data)

@staticmethod
def _read_line(reader):
def _read_line(self, reader):
line = b""
while True:
try:
line += reader.readline()
except Exception as ex:
raise EOFError(str(ex))
raise NoMoreMessages(str(ex), stream=self)
if not line:
raise EOFError("No more data")
raise NoMoreMessages(stream=self)
if line.endswith(b"\r\n"):
line = line[0:-2]
return line

def read_json(self, decoder=None):
"""Read a single JSON value from reader.
Returns JSON value as parsed by decoder.decode(), or raises EOFError if
there are no more values to be read.
Returns JSON value as parsed by decoder.decode(), or raises NoMoreMessages
if there are no more values to be read.
"""

decoder = decoder if decoder is not None else self.json_decoder_factory()
Expand Down Expand Up @@ -166,7 +184,7 @@ def log_message_and_exception(format_string="", *args, **kwargs):
# Only log it if we have already read some headers, and are looking
# for a blank line terminating them. If this is the very first read,
# there's no message data to log in any case, and the caller might
# be anticipating the error - e.g. EOFError on disconnect.
# be anticipating the error - e.g. NoMoreMessages on disconnect.
if headers:
raise log_message_and_exception(
"Error while reading message headers:"
Expand Down Expand Up @@ -197,10 +215,10 @@ def log_message_and_exception(format_string="", *args, **kwargs):
try:
chunk = reader.read(body_remaining)
if not chunk:
raise EOFError("No more data")
except Exception:
if self._is_closing:
raise EOFError
except Exception as exc:
if self._is_closing:
raise NoMoreMessages(str(exc), stream=self)
else:
raise log_message_and_exception(
"Couldn't read the expected {0} bytes of body:", length
Expand Down Expand Up @@ -518,7 +536,7 @@ def wait_for_response(self, raise_if_failed=True):
object for it in self.response, and returns response.body.
If no response was received from the other party before the channel closed,
self.response is a synthesized Response, which has EOFError() as its body.
self.response is a synthesized Response, which has NoMoreMessages() as its body.
If raise_if_failed=True and response.success is False, raises response.body
instead of returning.
Expand All @@ -537,7 +555,7 @@ def on_response(self, callback):
It is guaranteed that self.response is set before the callback is invoked.
If no response was received from the other party before the channel closed,
a Response with body=EOFError() is synthesized.
a Response with body=NoMoreMessages() is synthesized.
The callback may be invoked on an unspecified background thread that performs
processing of incoming messages; in that case, no further message processing
Expand All @@ -561,7 +579,12 @@ def no_response(self):
Synthesizes the appopriate dummy Response, and invokes the callback with it.
"""
response = Response(self.channel, None, self, EOFError("No response"))
response = Response(
self.channel,
None,
self,
NoMoreMessages("No response", stream=self.channel.stream),
)
self._handle_response(response)


Expand All @@ -584,7 +607,7 @@ def __init__(self, channel, seq, request, body):
the InvalidMessageError specifically, and that prefix is stripped.
If no response was received from the other party before the channel closed,
it is an instance of EOFError.
it is an instance of NoMoreMessages.
"""

@property
Expand Down Expand Up @@ -1062,6 +1085,10 @@ def _on_request(self, seq, command, arguments):
that applies_to() the Request object it was handling. Use Message.isnt_valid
to report invalid requests, and Message.cant_handle to report valid requests
that could not be processed.
The handler can raise an instance of NoMoreMessages with either stream=None or
stream=self.stream to indicate that this is the last incoming message, and no
further messages should be read and processed from the stream.
"""

handler = self._get_handler_for("request", command)
Expand Down Expand Up @@ -1141,6 +1168,10 @@ def _on_event(self, seq, event, body):
If report_unhandled_events is True, then failure to handle the event will be
reported to the sender as an "event_not_handled" event. Otherwise, the sender
does not receive any notifications.
The handler can raise an instance of NoMoreMessages with either stream=None or
stream=self.stream to indicate that this is the last incoming message, and no
further messages should be read and processed from the stream.
"""

handler = self._get_handler_for("event", event)
Expand Down Expand Up @@ -1189,6 +1220,10 @@ def _on_response(self, seq, request_seq, success, command, error_message, body):
high-level response handling facilities.
No further incoming messages are processed until the handler returns.
The handler can raise an instance of NoMoreMessages with either stream=None or
stream=self.stream to indicate that this is the last incoming message, and no
further messages should be read and processed from the stream.
"""

# Synthetic Request that only has seq and command as specified in response JSON.
Expand Down Expand Up @@ -1274,7 +1309,7 @@ def associate_with(message):

try:
return self._on_message(message)
except EOFError:
except NoMoreMessages:
raise
except Exception:
raise log.exception(
Expand All @@ -1289,9 +1324,14 @@ def _process_incoming_messages(self):
while True:
try:
self._process_incoming_message()
except EOFError as ex:
log.debug("Exiting message loop for {0}: {1}", self.name, str(ex))
return False
except NoMoreMessages as exc:
if exc.stream is None or exc.stream is self.stream:
log.debug(
"Exiting message loop for {0}: {1}", self.name, str(exc)
)
return False
else:
raise
finally:
try:
self.on_disconnect()
Expand Down
2 changes: 1 addition & 1 deletion tests/debug.py
Expand Up @@ -801,7 +801,7 @@ def _process_event(self, event):
'Received "terminated" event from {0}; stopping message processing.',
self,
)
raise EOFError(fmt("{0} terminated", self))
raise messaging.NoMoreMessages(fmt("{0} terminated", self))

def _process_request(self, request):
self.timeline.record_request(request, block=False)
Expand Down
8 changes: 5 additions & 3 deletions tests/ptvsd/common/test_messaging.py
Expand Up @@ -49,7 +49,7 @@ def read_json(self, decoder=None):
try:
value = next(self.input)
except StopIteration:
raise EOFError
raise messaging.NoMoreMessages(stream=self)
return decoder.decode(json.dumps(value))

def write_json(self, value, encoder=None):
Expand Down Expand Up @@ -79,8 +79,9 @@ def test_read(self):
for expected_message in self.MESSAGES:
message = stream.read_json()
assert message == expected_message
with pytest.raises(EOFError):
with pytest.raises(messaging.NoMoreMessages) as exc_info:
stream.read_json()
assert exc_info.value.stream is stream

def test_write(self):
data = io.BytesIO()
Expand All @@ -102,8 +103,9 @@ def test_read(self):
for expected_message in self.MESSAGES:
message = stream.read_json()
assert message == expected_message
with pytest.raises(EOFError):
with pytest.raises(messaging.NoMoreMessages) as exc_info:
stream.read_json()
assert exc_info.value.stream is stream

def test_write(self):
messages = []
Expand Down
4 changes: 3 additions & 1 deletion tests/ptvsd/server/test_multiproc.py
Expand Up @@ -8,6 +8,7 @@
import pytest
import sys

from ptvsd.common import messaging
from tests import debug
from tests.patterns import some
from tests.timeline import Event, Request
Expand Down Expand Up @@ -256,7 +257,8 @@ def parent():
parent_session.expected_returncode = some.int
try:
parent_session.request("disconnect")
except EOFError:
except messaging.NoMoreMessages:
# Can happen if ptvsd drops connection before sending the response.
pass
parent_session.wait_for_disconnect()
else:
Expand Down

0 comments on commit 768386d

Please sign in to comment.