Skip to content

Commit

Permalink
Backported c103980 - fixing LP 413335
Browse files Browse the repository at this point in the history
  • Loading branch information
hannosch committed Apr 5, 2011
1 parent 6d89bf1 commit 202cdb6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Change history
3.5.3 (unreleased)
------------------

- Work around problem when used with Python >=2.5.1. See
https://bugs.edge.launchpad.net/zope.sendmail/+bug/413335.

3.5.2 (2010-10-01)
------------------
Expand Down
74 changes: 47 additions & 27 deletions src/zope/sendmail/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,14 @@ class QueueProcessorThread(threading.Thread):
"""

log = logging.getLogger("QueueProcessorThread")
__stopped = False
_stopped = False
interval = 3.0 # process queue every X second

def __init__(self, interval=3.0):
threading.Thread.__init__(self)
self.interval = interval
self._lock = threading.Lock()
self.setDaemon(True)

def setMaildir(self, maildir):
"""Set the maildir.
Expand Down Expand Up @@ -260,10 +262,10 @@ def _parseMessage(self, message):

def run(self, forever=True):
atexit.register(self.stop)
while not self.__stopped:
while not self._stopped:
for filename in self.maildir:
# if we are asked to stop while sending messages, do so
if self.__stopped:
if self._stopped:
break

fromaddr = ''
Expand Down Expand Up @@ -349,31 +351,47 @@ def run(self, forever=True):
message = file.read()
file.close()
fromaddr, toaddrs, message = self._parseMessage(message)
# The next block is the only one that is sensitive to
# interruptions. Everywhere else, if this daemon thread
# stops, we should be able to correctly handle a restart.
# In this block, if we send the message, but we are
# stopped before we unlink the file, we will resend the
# message when we are restarted. We limit the likelihood
# of this somewhat by using a lock to link the two
# operations. When the process gets an interrupt, it
# will call the atexit that we registered (``stop``
# below). This will try to get the same lock before it
# lets go. Because this can cause the daemon thread to
# continue (that is, to not act like a daemon thread), we
# still use the _stopped flag to communicate.
self._lock.acquire()
try:
self.mailer.send(fromaddr, toaddrs, message)
except smtplib.SMTPResponseException, e:
if 500 <= e.smtp_code <= 599:
# permanent error, ditch the message
self.log.error(
"Discarding email from %s to %s due to"
" a permanent error: %s",
fromaddr, ", ".join(toaddrs), str(e))
#os.link(filename, rejected_filename)
_os_link(filename, rejected_filename)
else:
# Log an error and retry later
raise

try:
os.unlink(filename)
except OSError, e:
if e.errno == 2: # file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise
try:
self.mailer.send(fromaddr, toaddrs, message)
except smtplib.SMTPResponseException, e:
if 500 <= e.smtp_code <= 599:
# permanent error, ditch the message
self.log.error(
"Discarding email from %s to %s due to"
" a permanent error: %s",
fromaddr, ", ".join(toaddrs), str(e))
#os.link(filename, rejected_filename)
_os_link(filename, rejected_filename)
else:
# Log an error and retry later
raise

try:
os.unlink(filename)
except OSError, e:
if e.errno == 2: # file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise
finally:
self._lock.release()
try:
os.unlink(tmp_filename)
except OSError, e:
Expand Down Expand Up @@ -407,4 +425,6 @@ def run(self, forever=True):
break

def stop(self):
self.__stopped = True
self._stopped = True
self._lock.acquire()
self._lock.release()

0 comments on commit 202cdb6

Please sign in to comment.