Skip to content

Commit

Permalink
Ensure that mailhosts sharing a queue do not double-deliver mails
Browse files Browse the repository at this point in the history
Arrange this by sharing the thread which processes emails for
that directory.

See:  https://bugs.launchpad.net/zope2/+bug/574286
  • Loading branch information
tseaver committed May 3, 2010
1 parent 810e5b7 commit 00137aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
34 changes: 20 additions & 14 deletions MailHost.py
Expand Up @@ -15,6 +15,7 @@
$Id$
"""

from os.path import realpath
import mimetools
import rfc822
import time
Expand Down Expand Up @@ -202,32 +203,37 @@ def _makeMailer(self):
force_tls=self.force_tls
)

security.declarePrivate('_getThreadKey')
def _getThreadKey(self):
""" Return the key used to find our processor thread.
"""
return realpath(self.smtp_queue_directory)

@synchronized(lock)
def _stopQueueProcessorThread(self):
""" Stop thread for processing the mail queue """

path = self.absolute_url(1)
if queue_threads.has_key(path):
thread = queue_threads[path]
key = self._getThreadKey()
if queue_threads.has_key(key):
thread = queue_threads[key]
thread.stop()
while thread.isAlive():
# wait until thread is really dead
time.sleep(0.3)
del queue_threads[path]
LOG.info('Thread for %s stopped' % path)
LOG.info('Thread for %s stopped' % key)

@synchronized(lock)
def _startQueueProcessorThread(self):
""" Start thread for processing the mail queue """

path = self.absolute_url(1)
if not queue_threads.has_key(path):
""" Start thread for processing the mail queue
"""
key = self._getThreadKey()
if not queue_threads.has_key(key):
thread = QueueProcessorThread()
thread.setMailer(self._makeMailer())
thread.setQueuePath(self.smtp_queue_directory)
thread.start()
queue_threads[path] = thread
LOG.info('Thread for %s started' % path)
queue_threads[key] = thread
LOG.info('Thread for %s started' % key)

security.declareProtected(view, 'queueLength')
def queueLength(self):
Expand All @@ -243,9 +249,9 @@ def queueLength(self):

security.declareProtected(view, 'queueThreadAlive')
def queueThreadAlive(self):
""" return True/False is queue thread is working """

th = queue_threads.get(self.absolute_url(1))
""" return True/False is queue thread is working
"""
th = queue_threads.get(self._getThreadKey())
if th:
return th.isAlive()
return False
Expand Down
8 changes: 8 additions & 0 deletions tests/testMailHost.py
Expand Up @@ -208,6 +208,14 @@ def testSendImmediate(self):
self.assertEqual(mailhost.sent, outmsg)
self.assertEqual(mailhost.immediate, True)

def test__getThreadKey_uses_fspath(self):
mh1 = self._makeOne('mh1')
mh1.smtp_queue_directory = '/abc'
mh1.absolute_url = lambda self: 'http://example.com/mh1'
mh2 = self._makeOne('mh2')
mh2.smtp_queue_directory = '/abc'
mh2.absolute_url = lambda self: 'http://example.com/mh2'
self.assertEqual(mh1._getThreadKey(), mh2._getThreadKey())

def test_suite():
suite = unittest.TestSuite()
Expand Down

0 comments on commit 00137aa

Please sign in to comment.