Permalink
Browse files

Avoid using assert when parsing server response

To allow users to recover from bad server responses without
catching broad AssertionError, a new exception is introduced.

This new exception also replaces the undocumented `ParseError`
raised in parsing code.

Fixes #288
  • Loading branch information...
NicolasLM committed Sep 26, 2017
1 parent f02732d commit a309764fe27a54f69c8c99b2c3738c9c362aa793
View
@@ -21,6 +21,8 @@ Changed
are not used anymore.
- More precise exceptions available in `imapclient.exceptions` are raised when
an error happens
- `imapclient.exceptions.ProtocolError` is now raised when the reply from a
remote server violates the IMAP protocol.
- GMail labels are now strings instead of bytes in Python 3.
Fixed
View
@@ -36,3 +36,7 @@ class InvalidCriteriaError(IMAPClientError):
A command using a search criteria failed, probably due to a syntax
error in the criteria string.
"""
class ProtocolError(IMAPClientError):
"""The server replied with a response that violates the IMAP protocol."""
View
@@ -24,7 +24,7 @@
from .datetime_util import datetime_to_INTERNALDATE, format_criteria_date
from .imap_utf7 import encode as encode_utf7, decode as decode_utf7
from .response_parser import parse_response, parse_message_list, parse_fetch_response
from .util import to_bytes, to_unicode
from .util import to_bytes, to_unicode, assert_imap_protocol
xrange = moves.xrange
if PY3:
@@ -1526,7 +1526,7 @@ def _maybe_int_to_bytes(val):
def _parse_untagged_response(text):
assert text.startswith(b'* ')
assert_imap_protocol(text.startswith(b'* '))
text = text[2:]
if text.startswith((b'OK ', b'NO ')):
return tuple(text.split(b' ', 1))
@@ -13,6 +13,8 @@
import six
from .util import assert_imap_protocol
__all__ = ["TokenSource"]
CTRL_CHARS = frozenset(c for c in range(32))
@@ -97,7 +99,7 @@ def read_token_stream(self, stream_i):
if nextchar in whitespace:
yield token
elif nextchar == DOUBLE_QUOTE:
assert not token
assert_imap_protocol(not token)
token.append(nextchar)
token.extend(read_until(stream_i, nextchar))
yield token
@@ -138,7 +140,7 @@ def __init__(self, lexer, resp_record):
# A 'record' with a string which includes a literal marker, and
# the literal itself.
self.src_text = resp_record[0]
assert self.src_text.endswith(b"}"), self.src_text
assert_imap_protocol(self.src_text.endswith(b"}"), self.src_text)
self.literal = resp_record[1]
else:
# just a line with no literals.
@@ -22,14 +22,11 @@
from .datetime_util import parse_to_datetime
from .response_lexer import TokenSource
from .response_types import BodyData, Envelope, Address, SearchIds
from .exceptions import ProtocolError
xrange = six.moves.xrange
__all__ = ['parse_response', 'parse_message_list', 'ParseError']
class ParseError(ValueError):
pass
__all__ = ['parse_response', 'parse_message_list']
def parse_response(data):
@@ -93,11 +90,11 @@ def gen_parsed_response(text):
try:
for token in src:
yield atom(src, token)
except ParseError:
except ProtocolError:
raise
except ValueError:
_, err, _ = sys.exc_info()
raise ParseError("%s: %s" % (str(err), token))
raise ProtocolError("%s: %s" % (str(err), token))
def parse_fetch_response(text, normalise_times=True, uid_is_key=True):
@@ -121,12 +118,12 @@ def parse_fetch_response(text, normalise_times=True, uid_is_key=True):
try:
msg_response = six.next(response)
except StopIteration:
raise ParseError('unexpected EOF')
raise ProtocolError('unexpected EOF')
if not isinstance(msg_response, tuple):
raise ParseError('bad response type: %s' % repr(msg_response))
raise ProtocolError('bad response type: %s' % repr(msg_response))
if len(msg_response) % 2:
raise ParseError('uneven number of response items: %s' % repr(msg_response))
raise ProtocolError('uneven number of response items: %s' % repr(msg_response))
# always return the sequence of the message, so it is available
# even if we return keyed by UID.
@@ -159,7 +156,7 @@ def _int_or_error(value, error_text):
try:
return int(value)
except (TypeError, ValueError):
raise ParseError('%s: %s' % (error_text, repr(value)))
raise ProtocolError('%s: %s' % (error_text, repr(value)))
def _convert_INTERNALDATE(date_string, normalise_times=True):
@@ -212,9 +209,9 @@ def atom(src, token):
literal_len = int(token[1:-1])
literal_text = src.current_literal
if literal_text is None:
raise ParseError('No literal corresponds to %r' % token)
raise ProtocolError('No literal corresponds to %r' % token)
if len(literal_text) != literal_len:
raise ParseError('Expecting literal of size %d, got %d' % (
raise ProtocolError('Expecting literal of size %d, got %d' % (
literal_len, len(literal_text)))
return literal_text
elif len(token) >= 2 and (token[:1] == token[-1:] == b'"'):
@@ -232,7 +229,7 @@ def parse_tuple(src):
return tuple(out)
out.append(atom(src, token))
# no terminator
raise ParseError('Tuple incomplete before "(%s"' % _fmt_tuple(out))
raise ProtocolError('Tuple incomplete before "(%s"' % _fmt_tuple(out))
def _fmt_tuple(t):
View
@@ -7,6 +7,8 @@
import logging
from six import binary_type, text_type
from . import exceptions
logger = logging.getLogger(__name__)
@@ -27,3 +29,11 @@ def to_bytes(s, charset='ascii'):
if isinstance(s, text_type):
return s.encode(charset)
return s
def assert_imap_protocol(condition, message=None):
if not condition:
msg = "Server replied with a response that violates the IMAP protocol"
if message:
msg += "{}: {}".format(msg, message)
raise exceptions.ProtocolError(msg)
View
@@ -12,7 +12,9 @@
import six
from imapclient.exceptions import CapabilityError, IMAPClientError
from imapclient.exceptions import (
CapabilityError, IMAPClientError, ProtocolError
)
from imapclient.imapclient import IMAPlibLoggerAdapter
from imapclient.fixed_offset import FixedOffset
from imapclient.testable_imapclient import TestableIMAPClient as IMAPClient
@@ -651,3 +653,14 @@ def test_exception_inside_context_manager(self):
with self.assertRaises(ValueError):
with self.client as _:
raise ValueError("Error raised inside the context manager")
class TestProtocolError(IMAPClientTest):
def test_tagged_response_with_parse_error(self):
client = self.client
client._imap.tagged_commands = {sentinel.tag: None}
client._imap._get_response = lambda: b'NOT-A-STAR 99 EXISTS'
with self.assertRaises(ProtocolError):
client._consume_until_tagged_response(sentinel.tag, b'IDLE')
@@ -16,9 +16,9 @@
parse_response,
parse_message_list,
parse_fetch_response,
ParseError,
)
from imapclient.response_types import Envelope, Address
from imapclient.exceptions import ProtocolError
from tests.util import unittest
from .util import patch
@@ -160,7 +160,7 @@ def _test(self, to_parse, expected, wrap=True):
def _test_parse_error(self, to_parse, expected_msg):
if not isinstance(to_parse, list):
to_parse = [to_parse]
self.assertRaisesRegex(ParseError, expected_msg,
self.assertRaisesRegex(ProtocolError, expected_msg,
parse_response, to_parse)
@@ -200,13 +200,13 @@ def test_none_special_case(self):
self.assertEqual(parse_fetch_response([None]), {})
def test_bad_msgid(self):
self.assertRaises(ParseError, parse_fetch_response, [b'abc ()'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'abc ()'])
def test_bad_data(self):
self.assertRaises(ParseError, parse_fetch_response, [b'2 WHAT'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'2 WHAT'])
def test_missing_data(self):
self.assertRaises(ParseError, parse_fetch_response, [b'2'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'2'])
def test_simple_pairs(self):
self.assertEqual(parse_fetch_response([b'23 (ABC 123 StUfF "hello")']),
@@ -215,8 +215,8 @@ def test_simple_pairs(self):
b'SEQ': 23}})
def test_odd_pairs(self):
self.assertRaises(ParseError, parse_fetch_response, [b'(ONE)'])
self.assertRaises(ParseError, parse_fetch_response, [b'(ONE TWO THREE)'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'(ONE)'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'(ONE TWO THREE)'])
def test_UID(self):
self.assertEqual(parse_fetch_response([b'23 (UID 76)']),
@@ -230,7 +230,7 @@ def test_not_uid_is_key(self):
b'SEQ': 23}})
def test_bad_UID(self):
self.assertRaises(ParseError, parse_fetch_response, [b'(UID X)'])
self.assertRaises(ProtocolError, parse_fetch_response, [b'(UID X)'])
def test_FLAGS(self):
self.assertEqual(parse_fetch_response([b'23 (FLAGS (\Seen Stuff))']),
@@ -4,15 +4,17 @@
from __future__ import unicode_literals
from imapclient.exceptions import InvalidCriteriaError
from imapclient.exceptions import InvalidCriteriaError, ProtocolError
from imapclient.imapclient import (
join_message_ids,
_normalise_search_criteria,
normalise_text_list,
seq_to_parenstr,
seq_to_parenstr_upper,
_quoted
_quoted,
_parse_untagged_response
)
from imapclient.util import assert_imap_protocol
from tests.util import unittest
@@ -166,3 +168,17 @@ def test_None(self):
def test_empty(self):
self.assertRaises(InvalidCriteriaError, _normalise_search_criteria, '', None)
class TestAssertIMAPProtocol(unittest.TestCase):
def test_assert_imap_protocol(self):
assert_imap_protocol(True)
with self.assertRaises(ProtocolError):
assert_imap_protocol(False)
def test_assert_imap_protocol_with_message(self):
assert_imap_protocol(True, 'foo')
with self.assertRaises(ProtocolError):
assert_imap_protocol(False, 'foo')

0 comments on commit a309764

Please sign in to comment.