Skip to content

Commit

Permalink
Make most of headers logic python 3 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Oct 16, 2017
1 parent d24d5ac commit d74f584
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 192 deletions.
5 changes: 5 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ else
nosetests --with-coverage --cover-package=flanker tests/mime/bounce_tests.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/threading_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/tokenizer_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/headers/encodedword_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/headers/headers_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/headers/parametrized_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/headers/parsing_test.py
nosetests --with-coverage --cover-package=flanker tests/mime/message/headers/wrappers_test.py
fi
46 changes: 25 additions & 21 deletions flanker/mime/message/charsets.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
import codecs

import six

from flanker.mime.message.utils import to_unicode

_ALIASES = {
'sjis': 'shift_jis',
'windows-874': 'cp874',
'koi8-r': 'koi8_r'
}


def convert_to_unicode(charset, value):
#in case of unicode we have nothing to do
if isinstance(value, six.text_type):
return value

charset = _translate_charset(charset)
return to_unicode(value, charset=charset)

if six.PY2:
return value

def _translate_charset(charset):
"""Translates crappy charset into Python analogue (if supported).
value = value.encode('ascii')

Otherwise returns unmodified.
"""
# ev: (ticket #2819)
if "sjis" in charset.lower():
return 'shift_jis'
charset = _ensure_charset(charset)
value = to_unicode(value, charset)
return value

# cp874 looks to be an alias for windows-874
if "windows-874" == charset.lower():
return "cp874"

if 'koi8-r' in charset.lower():
return 'koi8_r'
def _ensure_charset(charset):
charset = charset.lower()
try:
codecs.lookup(charset)
return charset
except LookupError:
pass

if 'utf-8' in charset.lower() or charset.lower() == 'x-unknown':
return 'utf-8'
charset = _ALIASES.get(charset)
if charset:
return charset

return charset
return 'utf-8'
126 changes: 77 additions & 49 deletions flanker/mime/message/headers/encodedword.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,20 @@

from flanker.mime.message import charsets, errors

log = logging.getLogger(__name__)
_log = logging.getLogger(__name__)

#deal with unfolding
foldingWhiteSpace = re.compile(r"(\n\r?|\r\n?)(\s*)")
_RE_FOLDING_WHITE_SPACES = re.compile(r"(\n\r?|\r\n?)(\s*)")

# This spec refers to http://tools.ietf.org/html/rfc2047
_RE_ENCODED_WORD = re.compile(r'''(?P<encodedWord>
=\? # literal =?
(?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
\? # literal ?
(?P<encoding>[qb]) # either a "q" or a "b", case insensitive
\? # literal ?
(?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
\?= # literal ?=
)''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)


def unfold(value):
Expand All @@ -22,7 +32,7 @@ def unfold(value):
treated in its unfolded form for further syntactic and semantic
evaluation.
"""
return re.sub(foldingWhiteSpace, r'\2', value)
return re.sub(_RE_FOLDING_WHITE_SPACES, r'\2', value)


def decode(header):
Expand Down Expand Up @@ -51,26 +61,25 @@ def mime_to_unicode(header):
decoded = [] # decoded parts

while header:
match = encodedWord.search(header)
if match:
start = match.start()
if start != 0:
# decodes unencoded ascii part to unicode
value = charsets.convert_to_unicode(ascii, header[0:start])
if value.strip():
decoded.append(value)
# decode a header =?...?= of encoding
charset, value = decode_part(
match.group('charset').lower(),
match.group('encoding').lower(),
match.group('encoded'))
decoded.append(charsets.convert_to_unicode(charset, value))
header = header[match.end():]
else:
# no match? append the remainder
# of the string to the list of chunks
decoded.append(charsets.convert_to_unicode(ascii, header))
match = _RE_ENCODED_WORD.search(header)
if not match:
# Append the remainder of the string to the list of chunks.
decoded.append(charsets.convert_to_unicode('ascii', header))
break

start = match.start()
if start != 0:
# decodes unencoded ascii part to unicode
value = charsets.convert_to_unicode('ascii', header[0:start])
if value.strip():
decoded.append(value)
# decode a header =?...?= of encoding
charset, value = _decode_part(match.group('charset').lower(),
match.group('encoding').lower(),
match.group('encoded'))
decoded.append(charsets.convert_to_unicode(charset, value))
header = header[match.end():]

return u"".join(decoded)
except Exception:
try:
Expand All @@ -79,30 +88,15 @@ def mime_to_unicode(header):
logged_header = logged_header.encode('utf-8')
# encode header as utf-8 so all characters can be base64 encoded
logged_header = b64encode(logged_header)
log.warning(
_log.warning(
u"HEADER-DECODE-FAIL: ({0}) - b64encoded".format(
logged_header))
except Exception:
log.exception("Failed to log exception")
_log.exception("Failed to log exception")
return header


ascii = 'ascii'

#this spec refers to
#http://tools.ietf.org/html/rfc2047
encodedWord = re.compile(r'''(?P<encodedWord>
=\? # literal =?
(?P<charset>[^?]*?) # non-greedy up to the next ? is the charset
\? # literal ?
(?P<encoding>[qb]) # either a "q" or a "b", case insensitive
\? # literal ?
(?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string
\?= # literal ?=
)''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)


def decode_part(charset, encoding, value):
def _decode_part(charset, encoding, value):
"""
Attempts to decode part, understands
'q' - quoted encoding
Expand All @@ -111,18 +105,52 @@ def decode_part(charset, encoding, value):
Returns (charset, decoded-string)
"""
if encoding == 'q':
return (charset, email.quoprimime.header_decode(str(value)))
return charset, _decode_quoted_printable(value)

elif encoding == 'b':
if encoding == 'b':
# Postel's law: add missing padding
paderr = len(value) % 4
if paderr:
value += '==='[:4 - paderr]
return (charset, email.base64mime.decode(value))

elif not encoding:
return (charset, value)
return charset, email.base64mime.decode(value)

if not encoding:
return charset, value

raise errors.DecodingError('Unknown encoding: %s' % encoding)


def _decode_quoted_printable(qp):
if six.PY2:
return email.quoprimime.header_decode(str(qp))

buf = bytearray()
size = len(qp)
i = 0
while i < size:
ch = qp[i]
i += 1
if ch == '_':
buf.append(ord(' '))
continue

if ch != '=':
buf.append(ord(ch))
continue

# If there is no enough characters left, then treat them as is.
if size - i < 2:
buf.append(ord(ch))
continue

try:
codepoint = int(qp[i:i + 2], 16)
except ValueError:
buf.append(ord(ch))
continue

buf.append(codepoint)
i += 2

else:
raise errors.DecodingError(
"Unknown encoding: {0}".format(encoding))
return six.binary_type(buf)
60 changes: 32 additions & 28 deletions flanker/mime/message/headers/encoding.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import email.message
import flanker.addresslib.address
import logging

from collections import deque
from email.header import Header

import six

import flanker.addresslib.address
from flanker.mime.message.headers import parametrized
from flanker.mime.message.utils import to_utf8

log = logging.getLogger(__name__)
_log = logging.getLogger(__name__)

# max length for a header line is 80 chars
# max recursion depth is 1000
# 80 * 1000 for header is too much for the system
# so we allow just 100 lines for header
MAX_HEADER_LENGTH = 8000
_MAX_HEADER_LENGTH = 8000

ADDRESS_HEADERS = ('From', 'To', 'Delivered-To', 'Cc', 'Bcc', 'Reply-To')
_ADDRESS_HEADERS = ('From', 'To', 'Delivered-To', 'Cc', 'Bcc', 'Reply-To')


def to_mime(key, value):
Expand All @@ -24,39 +26,39 @@ def to_mime(key, value):

if type(value) == list:
return "; ".join(encode(key, v) for v in value)
else:
return encode(key, value)

return encode(key, value)


def encode(name, value):
try:
if parametrized.is_parametrized(name, value):
value, params = value
return encode_parametrized(name, value, params)
else:
return encode_unstructured(name, value)
return _encode_parametrized(name, value, params)

return _encode_unstructured(name, value)
except Exception:
log.exception("Failed to encode %s %s" % (name, value))
_log.exception("Failed to encode %s %s" % (name, value))
raise


def encode_unstructured(name, value):
if len(value) > MAX_HEADER_LENGTH:
def _encode_unstructured(name, value):
if len(value) > _MAX_HEADER_LENGTH:
return to_utf8(value)
try:
return Header(
value.encode("ascii"), "ascii",
header_name=name).encode(splitchars=' ;,')
except (UnicodeEncodeError, UnicodeDecodeError):
if is_address_header(name, value):
return encode_address_header(name, value)
else:
return Header(
to_utf8(value), "utf-8",
header_name=name).encode(splitchars=' ;,')
if _is_address_header(name, value):
return _encode_address_header(name, value)

return Header(
to_utf8(value), "utf-8",
header_name=name).encode(splitchars=' ;,')

def encode_address_header(name, value):

def _encode_address_header(name, value):
out = deque()
for addr in flanker.addresslib.address.parse_list(value):
if addr.requires_non_ascii():
Expand All @@ -66,17 +68,19 @@ def encode_address_header(name, value):
return '; '.join(out)


def encode_parametrized(key, value, params):
def _encode_parametrized(key, value, params):
if params:
params = [encode_param(key, n, v) for n, v in params.iteritems()]
params = [_encode_param(key, n, v) for n, v in six.iteritems(params)]
return value + "; " + ("; ".join(params))
else:
return value

return value


def encode_param(key, name, value):
def _encode_param(key, name, value):
try:
value = value.encode("ascii")
if six.PY2:
value = value.encode('ascii')

return email.message._formatparam(name, value)
except Exception:
value = Header(value.encode("utf-8"), "utf-8", header_name=key).encode(splitchars=' ;,')
Expand All @@ -93,5 +97,5 @@ def encode_string(name, value, maxlinelen=None):
return header.encode(splitchars=' ;,')


def is_address_header(key, val):
return key in ADDRESS_HEADERS and '@' in val
def _is_address_header(key, val):
return key in _ADDRESS_HEADERS and '@' in val
2 changes: 1 addition & 1 deletion flanker/mime/message/headers/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def to_stream(self, stream, prepends_only=False):
break
i += 1
try:
h = h.encode('ascii')
h.encode('ascii')
except UnicodeDecodeError:
raise EncodingError("Non-ascii header name")
stream.write("{0}: {1}\r\n".format(h, to_mime(h, v)))
Expand Down

0 comments on commit d74f584

Please sign in to comment.