Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Message encoding support #3

Merged
merged 8 commits into from about 2 years ago

2 participants

Ross Patterson Chris McDonough
Ross Patterson
Collaborator

Needed for the latest push to my fork of pyramid_mailer as well.

Chris McDonough mcdonc merged commit 60ae523 into from March 15, 2012
Chris McDonough mcdonc closed this March 15, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 8 unique commits by 1 author.

Mar 14, 2012
Ross Patterson Always require a `email.message.Message` object for sending. Also fix
interface docstrings.
e3d6a9d
Ross Patterson Oops, missed some stuff for requiring `email.message.Message` objects b9300e5
Ross Patterson Add a helper function for predictable and sane encoding of message he…
…aders.

This is the result of long conversations about the RFC's, the `email`
package and various bugs and quirks in both.
71c9c8b
Ross Patterson Add payload support to the message encoding. 83fa605
Ross Patterson Integrate message encoding support c64843a
Ross Patterson Add coverage for binary payloads and fix handling ea4a5f6
Ross Patterson Tolerate some Python 2.7 and PyPy output differences.
For some reason, under Python 2.6 and 3.2, the encoding is included in
the RFC 2047 prefix as 'utf_8' but under 2.7 and 3.2 it is 'utf-8'
with a dash instead of an underscore.
3f9c27d
Ross Patterson assert_() is dead, long live assertTrue() 60ae523
This page is out of date. Refresh to see the latest.
6  CHANGES.txt
@@ -4,6 +4,12 @@ Change history
4 4
 2.4 (Unreleased)
5 5
 ----------------
6 6
 
  7
+- Provide improved support for encoding messages to bytes.  It should
  8
+  now be possible to represent your messages in
  9
+  `email.message.Message` objects just with unicode (excepting bytes
  10
+  for binary attachments) and the mailer will handler it as
  11
+  appropriate.
  12
+
7 13
 - cPython 2.6, 2.7, 3.2 and pypy 1.8 compatibility.
8 14
 
9 15
 2.3 (2011-05-17)
95  repoze/sendmail/encoding.py
... ...
@@ -0,0 +1,95 @@
  1
+BBB_PY_2 = True
  2
+try:
  3
+    str = unicode
  4
+except NameError:
  5
+    BBB_PY_2 = False
  6
+
  7
+from email import utils
  8
+from email import charset
  9
+
  10
+# From http://tools.ietf.org/html/rfc5322#section-3.6
  11
+ADDR_HEADERS = ('resent-from',
  12
+                'resent-sender',
  13
+                'resent-to',
  14
+                'resent-cc',
  15
+                'resent-bcc',
  16
+                'from',
  17
+                'sender',
  18
+                'reply-to',
  19
+                'to',
  20
+                'cc',
  21
+                'bcc')
  22
+
  23
+PARAM_HEADERS = ('content-type',
  24
+                 'content-disposition')
  25
+                
  26
+
  27
+def encode_message(message,
  28
+                   addr_headers=ADDR_HEADERS, param_headers=PARAM_HEADERS):
  29
+    """
  30
+    Encode a `Message` handling headers and payloads.
  31
+
  32
+    Headers are handled in the most sane way possible.  Address names
  33
+    are left in `ascii` if possible or encoded to `latin_1` or `utf-8`
  34
+    and finally encoded according to RFC 2047 without encoding the
  35
+    address, something the `email` stdlib package doesn't do.
  36
+    Parameterized headers such as `filename` in the
  37
+    `Content-Disposition` header, have their values encoded properly
  38
+    while leaving the rest of the header to be handled without
  39
+    encoding.  Finally, all other header are left in `ascii` if
  40
+    possible or encoded to `latin_1` or `utf-8` as a whole.
  41
+
  42
+    The return is a bytest string of the whole message.
  43
+    """
  44
+    for key, value in message.items():
  45
+        if key.lower() in addr_headers:
  46
+            addrs = []
  47
+            for name, addr in utils.getaddresses([value]):
  48
+                best, encoded = best_charset(name)
  49
+                if BBB_PY_2:
  50
+                    name = encoded
  51
+                name = charset.Charset(best).header_encode(name)
  52
+                addrs.append(utils.formataddr((name, addr)))
  53
+            value = ', '.join(addrs)
  54
+            message.replace_header(key, value)
  55
+        if key.lower() in param_headers:
  56
+            for param_key, param_value in message.get_params(header=key):
  57
+                if param_value:
  58
+                    best, encoded = best_charset(param_value)
  59
+                    if BBB_PY_2:
  60
+                        param_value = encoded
  61
+                    if best == 'ascii':
  62
+                        best = None
  63
+                    message.set_param(param_key, param_value,
  64
+                                      header=key, charset=best)
  65
+        else:
  66
+            best, encoded = best_charset(value)
  67
+            if BBB_PY_2:
  68
+                value = encoded
  69
+            value = charset.Charset(best).header_encode(value)
  70
+            message.replace_header(key, value)
  71
+
  72
+    payload = message.get_payload()
  73
+    if payload and isinstance(payload, str):
  74
+        best, encoded = best_charset(payload)
  75
+        if BBB_PY_2:
  76
+            payload = encoded
  77
+        message.set_payload(payload, charset=best)
  78
+
  79
+    return message.as_string().encode('ascii')
  80
+
  81
+
  82
+def best_charset(text):
  83
+    """
  84
+    Find the most human-readable and/or conventional encoding for unicode text.
  85
+
  86
+    Prefers `ascii` or `latin_1` and falls back to `utf_8`.
  87
+    """
  88
+    encoded = text
  89
+    for charset in 'ascii', 'latin_1', 'utf_8':
  90
+        try:
  91
+            encoded = text.encode(charset)
  92
+        except UnicodeError:
  93
+            pass
  94
+        else:
  95
+            return charset, encoded
10  repoze/sendmail/interfaces.py
@@ -64,8 +64,8 @@ def send(fromaddr, toaddrs, message):
64 64
 
65 65
         `toaddrs` is a sequence of recipient addresses (byte strings).
66 66
 
67  
-        `message` is a byte string that contains both headers and body
68  
-        formatted according to RFC 2822.  If it does not contain a Message-Id
  67
+        `message` is a `Message` object from the stdlib
  68
+        `email.message` module.  If it does not contain a Message-Id
69 69
         header, it will be generated and added automatically.
70 70
 
71 71
         Returns the message ID.
@@ -87,9 +87,9 @@ def send(fromaddr, toaddrs, message):
87 87
 
88 88
         `toaddrs` is a sequence of recipient addresses (unicode strings).
89 89
 
90  
-        `message` contains both headers and body formatted according to RFC
91  
-        2822.  It should contain at least Date, From, To, and Message-Id
92  
-        headers.
  90
+        `message` is a `Message` object from the stdlib
  91
+        `email.message` module.  If it does not contain a Message-Id
  92
+        header, it will be generated and added automatically.
93 93
 
94 94
         Messages are sent immediatelly.
95 95
 
6  repoze/sendmail/mailer.py
@@ -20,6 +20,7 @@
20 20
 
21 21
 from zope.interface import implementer
22 22
 from repoze.sendmail.interfaces import IMailer
  23
+from repoze.sendmail import encoding
23 24
 
24 25
 have_ssl = hasattr(socket, 'ssl')
25 26
 
@@ -44,8 +45,9 @@ def smtp_factory(self):
44 45
         return connection
45 46
 
46 47
     def send(self, fromaddr, toaddrs, message):
47  
-        if isinstance(message, Message):
48  
-            message = message.as_string()
  48
+        assert isinstance(message, Message), \
  49
+               'Message must be instance of email.message.Message'
  50
+        message = encoding.encode_message(message)
49 51
 
50 52
         connection = self.smtp_factory()
51 53
 
4  repoze/sendmail/tests/test_delivery.py
@@ -83,7 +83,7 @@ def testSend(self):
83 83
 
84 84
         mailer.sent_messages = []
85 85
         msgid = delivery.send(fromaddr, toaddrs, message)
86  
-        self.assert_('@' in msgid)
  86
+        self.assertTrue('@' in msgid)
87 87
         self.assertEquals(mailer.sent_messages, [])
88 88
         transaction.commit()
89 89
         self.assertEquals(len(mailer.sent_messages), 1)
@@ -184,7 +184,7 @@ def testSend(self):
184 184
 
185 185
         MaildirMessageStub.commited_messages = []
186 186
         msgid = delivery.send(fromaddr, toaddrs, message)
187  
-        self.assert_('@' in msgid)
  187
+        self.assertTrue('@' in msgid)
188 188
         self.assertEquals(MaildirMessageStub.commited_messages, [])
189 189
         self.assertEquals(MaildirMessageStub.aborted_messages, [])
190 190
         transaction.commit()
182  repoze/sendmail/tests/test_encoding.py
... ...
@@ -0,0 +1,182 @@
  1
+##############################################################################
  2
+#
  3
+# Copyright (c) 2003 Zope Corporation and Contributors.
  4
+# All Rights Reserved.
  5
+#
  6
+# This software is subject to the provisions of the Zope Public License,
  7
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
  8
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
  9
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  10
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
  11
+# FOR A PARTICULAR PURPOSE.
  12
+#
  13
+##############################################################################
  14
+
  15
+import unittest
  16
+import base64
  17
+import quopri
  18
+from email import message
  19
+from email.mime import multipart
  20
+from email.mime import application
  21
+
  22
+try:
  23
+    from urllib.parse import quote
  24
+except ImportError:
  25
+    # BBB Python 2 and 3 compat
  26
+    from urllib import quote
  27
+
  28
+
  29
+class TestEncoding(unittest.TestCase):
  30
+
  31
+    def setUp(self):
  32
+        self.message = message.Message()
  33
+        self.latin_1_encoded = b'LaPe\xf1a'
  34
+        self.latin_1 = self.latin_1_encoded.decode('latin_1')
  35
+        self.utf_8_encoded = b'mo \xe2\x82\xac'
  36
+        self.utf_8 = self.utf_8_encoded.decode('utf_8')
  37
+
  38
+    def encode(self, message=None):
  39
+        if message is None:
  40
+            message = self.message
  41
+        from repoze.sendmail import encoding
  42
+        return encoding.encode_message(message)
  43
+
  44
+    def test_best_charset_ascii(self):
  45
+        from repoze.sendmail import encoding
  46
+        value = 'foo'
  47
+        best, encoded = encoding.best_charset(value)
  48
+        self.assertEqual(encoded, b'foo')
  49
+        self.assertEqual(best, 'ascii')
  50
+
  51
+    def test_best_charset_latin_1(self):
  52
+        from repoze.sendmail import encoding
  53
+        value = self.latin_1
  54
+        best, encoded = encoding.best_charset(value)
  55
+        self.assertEqual(encoded, self.latin_1_encoded)
  56
+        self.assertEqual(best, 'latin_1')
  57
+
  58
+    def test_best_charset_utf_8(self):
  59
+        from repoze.sendmail import encoding
  60
+        value = self.utf_8
  61
+        best, encoded = encoding.best_charset(value)
  62
+        self.assertEqual(encoded, self.utf_8_encoded)
  63
+        self.assertEqual(best, 'utf_8')
  64
+    
  65
+    def test_encoding_ascii_headers(self):
  66
+        to = ', '.join(['Chris McDonough <chrism@example.com>',
  67
+                        '"Chris Rossi, M.D." <chrisr@example.com>'])
  68
+        self.message['To'] = to
  69
+        from_ = 'Ross Patterson <rpatterson@example.com>'
  70
+        self.message['From'] = from_
  71
+        subject = 'I know what you did last PyCon'
  72
+        self.message['Subject'] = subject
  73
+
  74
+        encoded = self.encode()
  75
+
  76
+        self.assertTrue(
  77
+            b'To: Chris McDonough <chrism@example.com>, "Chris Rossi,'
  78
+            in encoded)
  79
+        self.assertTrue(b'From: '+from_.encode('ascii') in encoded)
  80
+        self.assertTrue(b'Subject: '+subject.encode('ascii') in encoded)
  81
+
  82
+    def test_encoding_latin_1_headers(self):
  83
+        to = ', '.join([
  84
+            '"'+self.latin_1+' McDonough, M.D." <chrism@example.com>',
  85
+            'Chris Rossi <chrisr@example.com>'])
  86
+        self.message['To'] = to
  87
+        from_ = self.latin_1+' Patterson <rpatterson@example.com>'
  88
+        self.message['From'] = from_
  89
+        subject = 'I know what you did last '+self.latin_1
  90
+        self.message['Subject'] = subject
  91
+
  92
+        encoded = self.encode()
  93
+
  94
+        self.assertTrue(b'To: =?iso-8859-1?' in encoded)
  95
+        self.assertTrue(b'From: =?iso-8859-1?' in encoded)
  96
+        self.assertTrue(b'Subject: =?iso-8859-1?' in encoded)
  97
+        self.assertTrue(b'<chrism@example.com>' in encoded)
  98
+        self.assertTrue(b'<chrisr@example.com>' in encoded)
  99
+        self.assertTrue(b'<rpatterson@example.com>' in encoded)
  100
+
  101
+    def test_encoding_utf_8_headers(self):
  102
+        to = ', '.join([
  103
+            '"'+self.utf_8+' McDonough, M.D." <chrism@example.com>',
  104
+            'Chris Rossi <chrisr@example.com>'])
  105
+        self.message['To'] = to
  106
+        from_ = self.utf_8+' Patterson <rpatterson@example.com>'
  107
+        self.message['From'] = from_
  108
+        subject = 'I know what you did last '+self.utf_8
  109
+        self.message['Subject'] = subject
  110
+
  111
+        encoded = self.encode()
  112
+
  113
+        self.assertTrue(b'To: =?utf' in encoded)
  114
+        self.assertTrue(b'From: =?utf' in encoded)
  115
+        self.assertTrue(b'Subject: =?utf' in encoded)
  116
+        self.assertTrue(b'<chrism@example.com>' in encoded)
  117
+        self.assertTrue(b'<chrisr@example.com>' in encoded)
  118
+        self.assertTrue(b'<rpatterson@example.com>' in encoded)
  119
+    
  120
+    def test_encoding_ascii_header_parameters(self):
  121
+        self.message['Content-Disposition'] = (
  122
+            'attachment; filename=foo.ppt')
  123
+
  124
+        encoded = self.encode()
  125
+        
  126
+        self.assertTrue(
  127
+            b'Content-Disposition: attachment; filename="foo.ppt"' in encoded)
  128
+    
  129
+    def test_encoding_latin_1_header_parameters(self):
  130
+        self.message['Content-Disposition'] = (
  131
+            'attachment; filename='+self.latin_1+'.ppt')
  132
+
  133
+        encoded = self.encode()
  134
+        
  135
+        self.assertTrue(
  136
+            b"Content-Disposition: attachment; filename*=" in encoded)
  137
+        self.assertTrue(b"latin_1''"+quote(
  138
+            self.latin_1_encoded).encode('ascii') in encoded)
  139
+    
  140
+    def test_encoding_utf_8_header_parameters(self):
  141
+        self.message['Content-Disposition'] = (
  142
+            'attachment; filename='+self.utf_8+'.ppt')
  143
+
  144
+        encoded = self.encode()
  145
+        
  146
+        self.assertTrue(
  147
+            b"Content-Disposition: attachment; filename*=" in encoded)
  148
+        self.assertTrue(b"utf_8''"+quote(self.utf_8_encoded).encode('ascii')
  149
+                        in encoded)
  150
+
  151
+    def test_encoding_ascii_body(self):
  152
+        body = 'I know what you did last PyCon'
  153
+        self.message.set_payload(body)
  154
+
  155
+        encoded = self.encode()
  156
+
  157
+        self.assertTrue(body.encode('ascii') in encoded)
  158
+
  159
+    def test_encoding_latin_1_body(self):
  160
+        body = 'I know what you did last '+self.latin_1
  161
+        self.message.set_payload(body)
  162
+
  163
+        encoded = self.encode()
  164
+
  165
+        self.assertTrue(quopri.encodestring(body.encode('latin_1')) in encoded)
  166
+
  167
+    def test_encoding_utf_8_body(self):
  168
+        body = 'I know what you did last '+self.utf_8
  169
+        self.message.set_payload(body)
  170
+
  171
+        encoded = self.encode()
  172
+
  173
+        self.assertTrue(base64.encodestring(body.encode('utf_8')) in encoded)
  174
+
  175
+    def test_binary_body(self):
  176
+        body = b'I know what you did last PyCon'
  177
+        self.message = multipart.MIMEMultipart()
  178
+        self.message.attach(application.MIMEApplication(body))
  179
+
  180
+        encoded = self.encode()
  181
+
  182
+        self.assertTrue(base64.encodestring(body) in encoded)
6  repoze/sendmail/tests/test_maildir.py
@@ -227,7 +227,7 @@ def test_add(self):
227 227
         from repoze.sendmail.maildir import Maildir
228 228
         m = Maildir('/path/to/maildir')
229 229
         tx_message = m.add(Message())
230  
-        self.assert_(tx_message._pending_path,
  230
+        self.assertTrue(tx_message._pending_path,
231 231
                      '/path/to/maildir/tmp/1234500002.4242.myhostname.')
232 232
 
233 233
     def test_add_no_good_filenames(self):
@@ -254,7 +254,7 @@ def test_tx_msg_abort(self):
254 254
         tx_msg.abort()
255 255
         self.assertEquals(tx_msg._aborted, True)
256 256
         self.assertEquals(tx_msg._committed, False)
257  
-        self.assert_(filename1 in self.fake_os_module._removed_files)
  257
+        self.assertTrue(filename1 in self.fake_os_module._removed_files)
258 258
 
259 259
         tx_msg.abort()
260 260
         self.assertRaises(RuntimeError, tx_msg.commit)
@@ -269,7 +269,7 @@ def test_tx_msg_commit(self):
269 269
         tx_msg.commit()
270 270
         self.assertEquals(tx_msg._aborted, False)
271 271
         self.assertEquals(tx_msg._committed, True)
272  
-        self.assert_((filename1, filename2)
  272
+        self.assertTrue((filename1, filename2)
273 273
                        in self.fake_os_module._renamed_files)
274 274
 
275 275
         self.assertRaises(RuntimeError, tx_msg.abort)
56  repoze/sendmail/tests/test_mailer.py
@@ -14,6 +14,7 @@
14 14
 
15 15
 from zope.interface.verify import verifyObject
16 16
 from repoze.sendmail.mailer import SMTPMailer
  17
+import email
17 18
 import ssl
18 19
 import unittest
19 20
 
@@ -86,11 +87,12 @@ def test_send(self):
86 87
             msg['Headers'] = 'headers'
87 88
             msg.set_payload('bodybodybody\n-- \nsig\n')
88 89
             self.mailer.send(fromaddr, toaddrs, msg)
89  
-            self.assertEquals(self.smtp.fromaddr, fromaddr)
90  
-            self.assertEquals(self.smtp.toaddrs, toaddrs)
91  
-            self.assertEquals(self.smtp.msgtext, msg.as_string())
92  
-            self.assert_(self.smtp.quitted)
93  
-            self.assert_(self.smtp.closed)
  90
+            self.assertEqual(self.smtp.fromaddr, fromaddr)
  91
+            self.assertEqual(self.smtp.toaddrs, toaddrs)
  92
+            self.assertEqual(
  93
+                self.smtp.msgtext, msg.as_string().encode('ascii'))
  94
+            self.assertTrue(self.smtp.quitted)
  95
+            self.assertTrue(self.smtp.closed)
94 96
 
95 97
     def test_fail_ehlo(self):
96 98
         from email.message import Message
@@ -114,34 +116,42 @@ def test_tls_required_not_available(self):
114 116
     def test_send_auth(self):
115 117
         fromaddr = 'me@example.com'
116 118
         toaddrs = ('you@example.com', 'him@example.com')
117  
-        msgtext = 'Headers: headers\n\nbodybodybody\n-- \nsig\n'
  119
+        headers = 'Headers: headers'
  120
+        body='bodybodybody\n-- \nsig\n'
  121
+        msgtext = headers+'\n\n'+body
  122
+        msg = email.message_from_string(msgtext)
118 123
         self.mailer.username = 'foo'
119 124
         self.mailer.password = 'evil'
120 125
         self.mailer.hostname = 'spamrelay'
121 126
         self.mailer.port = 31337
122  
-        self.mailer.send(fromaddr, toaddrs, msgtext)
123  
-        self.assertEquals(self.smtp.username, 'foo')
124  
-        self.assertEquals(self.smtp.password, 'evil')
125  
-        self.assertEquals(self.smtp.hostname, 'spamrelay')
126  
-        self.assertEquals(self.smtp.port, '31337')
127  
-        self.assertEquals(self.smtp.fromaddr, fromaddr)
128  
-        self.assertEquals(self.smtp.toaddrs, toaddrs)
129  
-        self.assertEquals(self.smtp.msgtext, msgtext)
130  
-        self.assert_(self.smtp.quitted)
131  
-        self.assert_(self.smtp.closed)
  127
+        self.mailer.send(fromaddr, toaddrs, msg)
  128
+        self.assertEqual(self.smtp.username, 'foo')
  129
+        self.assertEqual(self.smtp.password, 'evil')
  130
+        self.assertEqual(self.smtp.hostname, 'spamrelay')
  131
+        self.assertEqual(self.smtp.port, '31337')
  132
+        self.assertEqual(self.smtp.fromaddr, fromaddr)
  133
+        self.assertEqual(self.smtp.toaddrs, toaddrs)
  134
+        self.assertTrue(body.encode('ascii') in self.smtp.msgtext)
  135
+        self.assertTrue(headers.encode('ascii') in self.smtp.msgtext)
  136
+        self.assertTrue(self.smtp.quitted)
  137
+        self.assertTrue(self.smtp.closed)
132 138
 
133 139
     def test_send_failQuit(self):
134 140
         self.mailer.smtp.fail_on_quit = True
135 141
         try:
136 142
             fromaddr = 'me@example.com'
137 143
             toaddrs = ('you@example.com', 'him@example.com')
138  
-            msgtext = 'Headers: headers\n\nbodybodybody\n-- \nsig\n'
139  
-            self.mailer.send(fromaddr, toaddrs, msgtext)
140  
-            self.assertEquals(self.smtp.fromaddr, fromaddr)
141  
-            self.assertEquals(self.smtp.toaddrs, toaddrs)
142  
-            self.assertEquals(self.smtp.msgtext, msgtext)
143  
-            self.assert_(not self.smtp.quitted)
144  
-            self.assert_(self.smtp.closed)
  144
+            headers = 'Headers: headers'
  145
+            body='bodybodybody\n-- \nsig\n'
  146
+            msgtext = headers+'\n\n'+body
  147
+            msg = email.message_from_string(msgtext)
  148
+            self.mailer.send(fromaddr, toaddrs, msg)
  149
+            self.assertEqual(self.smtp.fromaddr, fromaddr)
  150
+            self.assertEqual(self.smtp.toaddrs, toaddrs)
  151
+            self.assertTrue(body.encode('ascii') in self.smtp.msgtext)
  152
+            self.assertTrue(headers.encode('ascii') in self.smtp.msgtext)
  153
+            self.assertTrue(not self.smtp.quitted)
  154
+            self.assertTrue(self.smtp.closed)
145 155
         finally:
146 156
             self.mailer.smtp.fail_on_quit = False
147 157
 
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.