diff --git a/repoze/sendmail/delivery.py b/repoze/sendmail/delivery.py index 22401c6..40b8fc4 100644 --- a/repoze/sendmail/delivery.py +++ b/repoze/sendmail/delivery.py @@ -28,54 +28,237 @@ from repoze.sendmail.maildir import Maildir from repoze.sendmail import encoding from transaction.interfaces import IDataManager +from transaction.interfaces import ISavepointDataManager +from transaction.interfaces import IDataManagerSavepoint import transaction -@implementer(IDataManager) +import logging +log = logging.getLogger(__name__) + + +DEBUG_FLOW = False +if DEBUG_FLOW : + import sys + log.setLevel(logging.DEBUG) + h1 = logging.StreamHandler(sys.stdout) + log.addHandler(h1) + + + +class MaiDataManagerState(object): + INIT = 0 + NO_WORK = 1 + COMMITTED = 2 + ABORTED = 3 + + TPC_NONE = 11 + TPC_BEGIN = 12 + TPC_VOTED = 13 + TPC_COMMITED = 14 + TPC_FINISHED = 15 + TPC_ABORTED = 16 + + +def is_resource_in_transaction(resource,trans): + if resource in trans._resources : + return True + return False + + + +@implementer(ISavepointDataManager) class MailDataManager(object): def __init__(self, callable, args=(), onAbort=None): + """We expect to : + 1. NOT be in a transaction on creation + 2. DO be joined into a transaction + """ + if DEBUG_FLOW : log.debug("MailDataManager.__init__") self.callable = callable self.args = args - self.onAbort = onAbort + # Use the default thread transaction manager. self.transaction_manager = transaction.manager - def commit(self, transaction): + # Our transaction state: + self.transaction = None + + # What state are we in + self.state = MaiDataManagerState.INIT + + # What phase, if any, of two-phase commit we are in: + self.tpc_phase = 0 + + # store the onAbort + self.onAbort = onAbort + + + def join_transaction(self,trans=None): + if DEBUG_FLOW : log.debug("MailDataManager.join_transaction") + + # are we specifying a transaction to join ? + if trans is not None: + self.transaction = trans + + # If this is the first change in the transaction, join the transaction + if self.transaction is None : + self.transaction = self.transaction_manager.get() + + # only join a transaction ONCE + if not is_resource_in_transaction( self , self.transaction ): + self.transaction.join(self) + + + def _finish(self,final_state): + if DEBUG_FLOW : log.debug("MailDataManager._finish") + assert self.transaction is not None + self.state = final_state + self._resetTransaction() + + + def _resetTransaction(self): + if DEBUG_FLOW : log.debug("MailDataManager._resetTransaction") + self.last_note = getattr(self.transaction, 'description', None) + self.transaction = None + + self.state = MaiDataManagerState.INIT + self.tpc_phase = 0 + + + def commit( self, trans ): + if DEBUG_FLOW : log.debug("MailDataManager.commit") pass + + + + + def abort(self, trans): + """Throw away changes made before the commit process has started + """ + if DEBUG_FLOW : log.debug("MailDataManager.abort") + assert (self.transaction is not None), "Must have transaction" + assert (trans is self.transaction), "Must not change transactions" + assert is_resource_in_transaction( self , self.transaction ) , "Must be in the transaction" + assert (self.tpc_phase == 0), "Must be called outside of tpc" + + self._resetTransaction() - def abort(self, transaction): if self.onAbort: self.onAbort() + def sortKey(self): + if DEBUG_FLOW : log.debug("MailDataManager.sortKey") return str(id(self)) - # No subtransaction support. - def abort_sub(self, transaction): + + # No subtransaction support ? + def abort_sub(self, trans): + raise ValueError("abort_sub") pass #pragma NO COVERAGE - commit_sub = abort_sub - def beforeCompletion(self, transaction): + # No subtransaction support ? + def commit_sub(self, trans): + raise ValueError("commit_sub") pass #pragma NO COVERAGE - afterCompletion = beforeCompletion - def tpc_begin(self, transaction, subtransaction=False): + + ### + ### Savepoint Support + ### + + def savepoint(self): + if DEBUG_FLOW : log.debug("MailDataManager.savepoint") + if DEBUG_FLOW : log.debug(self.transaction._resources) + # + # we create a custom MailDataSavepoint object , which just has a rollback + # the custom instance doesn't actually do anything. transaction does it all. + # + return MailDataSavepoint(self) + + def _savepoint_rollback(self, savepoint): + if DEBUG_FLOW : log.debug("MailDataManager._savepoint_rollback") + if DEBUG_FLOW : log.debug(self.transaction._resources) + # + # called by the custom savepoint MailDataSavepoint + # this doesn't actually do anything. transaction does it all. + # + + + + ### + ### Two Phase Support + ### + + + + def tpc_begin(self, trans, subtransaction=False): + if DEBUG_FLOW : log.debug("MailDataManager.tpc_begin | %s , %s" , self.state , self.tpc_phase) + assert trans is self.transaction, "Must not change transactions" + assert self.tpc_phase == 0 , "Must be called outside of tpc" assert not subtransaction - def tpc_vote(self, transaction): - pass + # begin + self.tpc_phase = 1 + - def tpc_finish(self, transaction): + def tpc_vote(self, trans): + if DEBUG_FLOW : log.debug("MailDataManager.tpc_vote | %s , %s" , self.state , self.tpc_phase) + assert trans is self.transaction, "Must not change transactions" + assert self.tpc_phase == 1, "Must be called in first phase of tpc" + + # vote + self.tpc_phase = 2 + + + def tpc_finish(self, trans): + if DEBUG_FLOW : log.debug("MailDataManager.tpc_finish | %s , %s" , self.state , self.tpc_phase) + assert trans is self.transaction, "Must not change transactions" + assert self.tpc_phase == 2, "Must be called in second phase of tpc" + self.callable(*self.args) - tpc_abort = abort + self._finish(MaiDataManagerState.TPC_FINISHED) + + + def tpc_abort(self, trans): + if DEBUG_FLOW : log.debug("MailDataManager.tpc_abort | %s , %s" , self.state , self.tpc_phase) + assert trans is self.transaction, "Must not change transactions" + assert self.tpc_phase != 0, "Must be called inside of tpc" + assert self.state is not MaiDataManagerState.COMMITTED, "not in a commit!" + + self._finish(MaiDataManagerState.TPC_ABORTED) + + + +@implementer( transaction.interfaces.IDataManagerSavepoint ) +class MailDataSavepoint: + + def __init__(self, mail_data_manager ): + # + # we don't actually do anything here. transaction does it all. + # + if DEBUG_FLOW : log.debug("MailDataSavepoint.__init__") + self.mail_data_manager = mail_data_manager + + + def rollback(self): + # + # we don't actually do anything here. transaction does it all. + # + if DEBUG_FLOW : log.debug("MailDataSavepoint.rollback") + self.mail_data_manager._savepoint_rollback(self) + + class AbstractMailDelivery(object): def send(self, fromaddr, toaddrs, message): + if DEBUG_FLOW : log.debug("AbstractMailDelivery.send") assert isinstance(message, Message), \ 'Message must be instance of email.message.Message' encoding.cleanup_message(message) @@ -84,18 +267,21 @@ def send(self, fromaddr, toaddrs, message): messageid = message['Message-Id'] = make_msgid('repoze.sendmail') if message['Date'] is None: message['Date'] = formatdate() - transaction.get().join( - self.createDataManager(fromaddr, toaddrs, message)) + managedMessage = self.createDataManager(fromaddr, toaddrs, message) + managedMessage.join_transaction() return messageid + @implementer(IMailDelivery) class DirectMailDelivery(AbstractMailDelivery): def __init__(self, mailer): + if DEBUG_FLOW : log.debug("DirectMailDelivery.__init__") self.mailer = mailer def createDataManager(self, fromaddr, toaddrs, message): + if DEBUG_FLOW : log.debug("DirectMailDelivery.createDataManager") return MailDataManager(self.mailer.send, args=(fromaddr, toaddrs, message)) @@ -103,13 +289,15 @@ def createDataManager(self, fromaddr, toaddrs, message): @implementer(IMailDelivery) class QueuedMailDelivery(AbstractMailDelivery): - def __init__(self, queuePath): - self._queuePath = queuePath - queuePath = property(lambda self: self._queuePath) processor_thread = None + def __init__(self, queuePath): + if DEBUG_FLOW : log.debug("QueuedMailDelivery.__init__") + self._queuePath = queuePath + def createDataManager(self, fromaddr, toaddrs, message): + if DEBUG_FLOW : log.debug("QueuedMailDelivery.createDataManager") message = copy_message(message) message['X-Actually-From'] = Header(fromaddr, 'utf-8') message['X-Actually-To'] = Header(','.join(toaddrs), 'utf-8') @@ -121,3 +309,5 @@ def createDataManager(self, fromaddr, toaddrs, message): def copy_message(message): parser = Parser() return parser.parsestr(message.as_string()) + + diff --git a/repoze/sendmail/tests/test_transaction.py b/repoze/sendmail/tests/test_transaction.py new file mode 100644 index 0000000..15db220 --- /dev/null +++ b/repoze/sendmail/tests/test_transaction.py @@ -0,0 +1,120 @@ +import unittest + +from repoze.sendmail.delivery import DirectMailDelivery +from email.message import Message + + + +class TestTransactionMails(unittest.TestCase): + + def test_abort(self): + import transaction + mailer = _makeMailerStub() + delivery = DirectMailDelivery(mailer) + + ( fromaddr , toaddrs ) = fromaddr_toaddrs() + message = sample_message() + msgid = delivery.send(fromaddr, toaddrs, message) + self.assertEqual(msgid, '<20030519.1234@example.org>') + self.assertEqual(mailer.sent_messages, []) + transaction.abort() + transaction.commit() + self.assertEqual(mailer.sent_messages,[]) + + + def test_doom(self): + import transaction + mailer = _makeMailerStub() + delivery = DirectMailDelivery(mailer) + + ( fromaddr , toaddrs ) = fromaddr_toaddrs() + message = sample_message() + msgid = delivery.send(fromaddr, toaddrs, message) + self.assertEqual(msgid, '<20030519.1234@example.org>') + self.assertEqual(mailer.sent_messages, []) + transaction.doom() + transaction.abort() + transaction.commit() + self.assertEqual(mailer.sent_messages, []) + + + + def test_savepoint(self): + import transaction + + mailer = _makeMailerStub() + delivery = DirectMailDelivery(mailer) + ( fromaddr , toaddrs ) = fromaddr_toaddrs() + + bodies_good = {} + bodies_bad = {} + for i in ( 1,3,5, ): + bodies_good[i] = 'Sample Body - %s | Good' % i + for i in ( 2,4,6, ): + bodies_bad[i] = 'Sample Body - %s | Bad' % i + + bodies_all = dict( bodies_good.items() + bodies_bad.items() ) + + + transaction.begin() + for i in range(1,7) : + sp = transaction.savepoint() + body = bodies_all[i] + message = sample_message(body=body) + msgid = delivery.send(fromaddr, toaddrs, message) + self.assertEqual(msgid, '<20030519.1234@example.org>') + self.assertEqual(mailer.sent_messages, []) + if i in bodies_bad : + sp.rollback() + + # we shouldn't have sent anything + self.assertEqual(mailer.sent_messages, []) + + # so now let's commit + transaction.commit() + + # make sure we have the right number of messages + self.assertEqual(len(mailer.sent_messages), len(bodies_good.values())) + + # generate our expected body + bodies_expected = bodies_good.values() + + # make sure our bodies are only good + for i in mailer.sent_messages : + ( f,t,m ) = i + self.assertIn(m._payload, bodies_expected) + + + +def sample_message( body="This is just an example"): + ( fromaddr , toaddrs ) = fromaddr_toaddrs() + message = Message() + message['From'] = fromaddr + message['To'] = 'some-zope-coders:;' + message['Date'] = 'Date: Mon, 19 May 2003 10:17:36 -0400' + message['Message-Id'] = ext_msgid = '<20030519.1234@example.org>' + message['Subject'] = 'example' + message.set_payload(body) + return message + +def fromaddr_toaddrs(): + fromaddr = 'Jim ' + toaddrs = ('Guido ', + 'Steve ') + return ( fromaddr , toaddrs ) + + + +def _makeMailerStub(*args, **kw): + from zope.interface import implementer + from repoze.sendmail.interfaces import IMailer + implementer(IMailer) + + class MailerStub(object): + def __init__(self, *args, **kw): + self.sent_messages = [] + + def send(self, fromaddr, toaddrs, message): + self.sent_messages.append((fromaddr, toaddrs, message)) + return MailerStub(*args, **kw) +