Skip to content

Commit

Permalink
- Restructured SMTP mailer and QueueProcessorThread so that all
Browse files Browse the repository at this point in the history
  SMTP error logic is in the mailer.  Clears the way for another
  mailer for /usr/sbin/sendmail command line can be used with
  QueueProcessorThread.
- Added ability for QueueProcessorThread so that it can handle temporary
  failures in delivery to its smart host - ie administrator reconfiguring
  mailserver, mail server reboot/restart
- Formatted log messages in a consistent fashion so that they can be grepped
  out of z3.log
- Added maildir message filename to log messages as message id - allows
  easy analysis/triage of mail message sending problems
- Added cleaning of lock links to QueueProcessorThread so that messages can be
  sent immediately on Zope3 restart.
- Added pollingInterval (ms), cleanLockLinks (boolean), and retryInterval 
  (seconds) configure options to configure.zcml.
  • Loading branch information
Matthew Grant committed Mar 9, 2008
1 parent f5d43a4 commit 2643d89
Show file tree
Hide file tree
Showing 11 changed files with 758 additions and 126 deletions.
15 changes: 15 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
Change history
~~~~~~~~~~~~~~

- Restructured SMTP mailer and QueueProcessorThread so that all
SMTP error logic is in the mailer. Clears the way for another
mailer for /usr/sbin/sendmail command line can be used with
QueueProcessorThread.
- Added ability for QueueProcessorThread so that it can handle temporary
failures in delivery to its smart host - ie administrator reconfiguring
mailserver, mail server reboot/restart
- Formatted log messages in a consistent fashion so that they can be grepped
out of z3.log
- Added maildir message filename to log messages as message id - allows
easy analysis/triage of mail message sending problems
- Added cleaning of lock links to QueueProcessorThread so that messages can be
sent immediately on Zope3 restart.
- Added pollingInterval (ms), cleanLockLinks (boolean), and retryInterval
(seconds) configure options to configure.zcml.

3.5.0b1 (unreleased)
--------------------
Expand Down
11 changes: 9 additions & 2 deletions src/zope/sendmail/configure.zcml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@

<!--
To send mail, uncomment the following directive and be sure to
create the queue directory.
create the queue directory.
Other parameters sepcify the polling interval, and the retry
interval used when a temporary failure is detected. Lock links
can also be cleared on server start.
<mail:queuedDelivery permission="zope.SendMail"
queuePath="./queue"
mailer="smtp" />
retryInterval="300"
pollingInterval="3000"
cleanLockLinks="False"
mailer="smtp" />
-->

<interface interface="zope.sendmail.interfaces.IMailDelivery" />
Expand Down
139 changes: 95 additions & 44 deletions src/zope/sendmail/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import os
import os.path
import rfc822
import smtplib
import stat
import threading
import time
Expand All @@ -33,8 +32,12 @@
from time import strftime
from socket import gethostname

from zope.interface import implements
from zope.interface import implements, providedBy
from zope.interface.exceptions import DoesNotImplement
from zope.sendmail.interfaces import IDirectMailDelivery, IQueuedMailDelivery
from zope.sendmail.interfaces import ISMTPMailer, IMailer
from zope.sendmail.interfaces import MailerTemporaryFailureException
from zope.sendmail.interfaces import MailerPermanentFailureException
from zope.sendmail.maildir import Maildir
from transaction.interfaces import IDataManager
import transaction
Expand All @@ -46,6 +49,15 @@
# messages sent.
MAX_SEND_TIME = 60*60*3

# Prefixes for messages being processed in the queue
# This one is the lock link prefix for when a message
# is being sent - also edit this in maildir.py if changed...
SENDING_MSG_LOCK_PREFIX = '.sending-'
# This is the rejected message prefix
REJECTED_MSG_PREFIX = '.rejected-'



class MailDataManager(object):
implements(IDataManager)

Expand Down Expand Up @@ -124,7 +136,7 @@ def __init__(self, mailer):

def createDataManager(self, fromaddr, toaddrs, message):
return MailDataManager(self.mailer.send,
args=(fromaddr, toaddrs, message))
args=(fromaddr, toaddrs, message, 'direct_delivery'))


class QueuedMailDelivery(AbstractMailDelivery):
Expand Down Expand Up @@ -209,9 +221,15 @@ class QueueProcessorThread(threading.Thread):
__stopped = False
interval = 3.0 # process queue every X second

def __init__(self, interval=3.0):
def __init__(self,
interval=3.0,
retry_interval=300.0,
clean_lock_links=False):
threading.Thread.__init__(self)
self.interval = interval
self.retry_interval = retry_interval
self.clean_lock_links = clean_lock_links
self.test_results = {}

def setMaildir(self, maildir):
"""Set the maildir.
Expand All @@ -223,6 +241,8 @@ def setQueuePath(self, path):
self.maildir = Maildir(path, True)

def setMailer(self, mailer):
if not(IMailer.providedBy(mailer)):
raise (DoesNotImplement)
self.mailer = mailer

def _parseMessage(self, message):
Expand Down Expand Up @@ -252,8 +272,45 @@ def _parseMessage(self, message):

return fromaddr, toaddrs, rest

def _unlinkFile(self, filename):
"""Unlink the message file """
try:
os.unlink(filename)
except OSError, e:
if e.errno == 2: # file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise

def _queueRetryWait(self, tmp_filename, forever):
"""Implements Retry Wait if there is an SMTP Connection
Failure or error 4xx due to machine load etc
"""
# Clean up by unlinking lock link
self._unlinkFile(tmp_filename)
# Wait specified retry interval in stages of self.interval
count = self.retry_interval
while(count > 0 and not self.__stopped):
if forever:
time.sleep(self.interval)
count -= self.interval
# Plug for test routines so that we know we got here
if not forever:
self.test_results['_queueRetryWait'] \
= "Retry timeout: %s count: %s" \
% (str(self.retry_interval), str(count))


def run(self, forever=True):
atexit.register(self.stop)
# Clean .sending- lock files from queue
if self.clean_lock_links:
self.maildir._cleanLockLinks()
# Set up logger for mailer
if hasattr(self.mailer, 'set_logger'):
self.mailer.set_logger(self.log)
while not self.__stopped:
for filename in self.maildir:
# if we are asked to stop while sending messages, do so
Expand All @@ -263,8 +320,9 @@ def run(self, forever=True):
fromaddr = ''
toaddrs = ()
head, tail = os.path.split(filename)
tmp_filename = os.path.join(head, '.sending-' + tail)
rejected_filename = os.path.join(head, '.rejected-' + tail)
tmp_filename = os.path.join(head, SENDING_MSG_LOCK_PREFIX + tail)
rejected_filename = os.path.join(head, REJECTED_MSG_PREFIX + tail)
message_id = os.path.basename(filename)
try:
# perform a series of operations in an attempt to ensure
# that no two threads/processes send this message
Expand Down Expand Up @@ -339,53 +397,46 @@ def run(self, forever=True):
file.close()
fromaddr, toaddrs, message = self._parseMessage(message)
try:
self.mailer.send(fromaddr, toaddrs, message)
except smtplib.SMTPResponseException, e:
if 500 <= e.smtp_code <= 599:
# permanent error, ditch the message
self.log.error(
"Discarding email from %s to %s due to"
" a permanent error: %s",
fromaddr, ", ".join(toaddrs), str(e))
os.link(filename, rejected_filename)
else:
# Log an error and retry later
raise

try:
os.unlink(filename)
except OSError, e:
if e.errno == 2: # file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise

try:
os.unlink(tmp_filename)
except OSError, e:
if e.errno == 2: # file does not exist
# someone else unlinked the file; oh well
pass
else:
# something bad happend, log it
raise
sentaddrs = self.mailer.send(fromaddr,
toaddrs,
message,
message_id)
except MailerTemporaryFailureException, e:
self._queueRetryWait(tmp_filename, forever)
# We break as we don't want to send message later
break;
except MailerPermanentFailureException, e:
os.link(filename, rejected_filename)
sentaddrs = []

# Unlink message file
self._unlinkFile(filename)

# Unlink the lock file
self._unlinkFile(tmp_filename)

# TODO: maybe log the Message-Id of the message sent
self.log.info("Mail from %s to %s sent.",
fromaddr, ", ".join(toaddrs))
if len(sentaddrs) > 0:
self.log.info("%s - mail sent, Sender: %s, Rcpt: %s,",
message_id,
fromaddr,
", ".join(sentaddrs))
# Blanket except because we don't want
# this thread to ever die
except:
if fromaddr != '' or toaddrs != ():
self.log.error(
"Error while sending mail from %s to %s.",
fromaddr, ", ".join(toaddrs), exc_info=True)
"%s - Error while sending mail, Sender: %s,"
" Rcpt: %s,",
message_id,
fromaddr,
", ".join(toaddrs),
exc_info=True)
else:
self.log.error(
"Error while sending mail : %s ",
filename, exc_info=True)
"%s - Error while sending mail.",
message_id,
exc_info=True)
else:
if forever:
time.sleep(self.interval)
Expand Down
81 changes: 76 additions & 5 deletions src/zope/sendmail/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"""
__docformat__ = 'restructuredtext'

from zope.interface import Interface, Attribute
from zope.interface import Interface, Attribute, implements
from zope.interface.common.interfaces import IException
from zope.schema import TextLine, Int, Password, Bool

from zope.i18nmessageid import MessageFactory
Expand Down Expand Up @@ -108,7 +109,7 @@ class IQueuedMailDelivery(IMailDelivery):


class IMailQueueProcessor(Interface):
"""A mail queue processor that delivers queueud messages asynchronously.
"""A mail queue processor that delivers queued messages asynchronously.
"""

queuePath = TextLine(
Expand All @@ -119,15 +120,79 @@ class IMailQueueProcessor(Interface):
title=_(u"Polling Interval"),
description=_(u"How often the queue is checked for new messages"
" (in milliseconds)"),
default=5000)
default=3000)

mailer = Attribute("IMailer that is used for message delivery")

cleanLockFiles = Bool(
title=_(u"Clean Lock Files"),
description=_(u"Clean stale lock files from queue before processing"
" start."),
default=False)

retryInterval = Int(
title=_(u"Retry Interval"),
description=_(u"Retry time after connection failure or SMTP error 4xx."
" (in seconds)"),
default=300)

#
# Exception classes for use within Zope Sendmail, for use of Mailers
#
class IMailerFailureException(IException):
"""Failure in sending mail"""
pass

class MailerFailureException(Exception):
"""Failure in sending mail"""

implements(IMailerFailureException)

def __init__(self, message="Failure in sending mail"):
self.message = message
self.args = (message,)


class IMailerTemporaryFailureException(IMailerFailureException):
"""Temporary failure in sending mail - retry later"""
pass

class MailerTemporaryFailureException(MailerFailureException):
"""Temporary failure in sending mail - retry later"""

implements(IMailerTemporaryFailureException)

def __init__(self, message="Temporary failure in sending mail - retry later"):
self.message = message
self.args = (message,)


class IMailerPermanentFailureException(IMailerFailureException):
"""Permanent failure in sending mail - take reject action"""
pass

class MailerPermanentFailureException(MailerFailureException):
"""Permanent failure in sending mail - take reject action"""

implements(IMailerPermanentFailureException)

def __init__(self, message="Permanent failure in sending mail - take reject action"):
self.message = message
self.args = (message,)


class IMailer(Interface):
"""Mailer handles synchronous mail delivery."""
"""Mailer handles synchronous mail delivery.
def send(fromaddr, toaddrs, message):
Mailer can raise the exceptions
MailerPermanentFailure
MailerTemporaryFailure
to indicate to sending process what action to take.
"""

def send(fromaddr, toaddrs, message, message_id):
"""Send an email message.
`fromaddr` is the sender address (unicode string),
Expand All @@ -138,12 +203,18 @@ def send(fromaddr, toaddrs, message):
2822. It should contain at least Date, From, To, and Message-Id
headers.
`message_id` is an id for the message, typically a filename.
Messages are sent immediatelly.
Dispatches an `IMailSentEvent` on successful delivery, otherwise an
`IMailErrorEvent`.
"""

def set_logger(logger):
"""Set the log object for the Mailer - this is for use by
QueueProcessorThread to hand a logging object to the mailer
"""

class ISMTPMailer(IMailer):
"""A mailer that delivers mail to a relay host via SMTP."""
Expand Down
Loading

0 comments on commit 2643d89

Please sign in to comment.