From 851c5d6e514bf0633331ab2a36fa56cbf249307c Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Fri, 3 Jan 2020 00:28:20 +0000 Subject: [PATCH] Implement asyncio receivers Fixes #59 --- salmon/server.py | 136 ++++++++++++++++++++++++++++++++++++++--------- setup.py | 1 + 2 files changed, 111 insertions(+), 26 deletions(-) diff --git a/salmon/server.py b/salmon/server.py index 0ff3e5e..3d046ef 100644 --- a/salmon/server.py +++ b/salmon/server.py @@ -11,14 +11,20 @@ import time import traceback +from aiosmtpd.controller import Controller +from aiosmtpd.lmtp import LMTP +from aiosmtpd.smtp import SMTP from dns import resolver import lmtpd from salmon import __version__, mail, queue, routing from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES -lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__ -smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__ +ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__ +SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction" + +lmtpd.__version__ = ROUTER_VERSION_STRING +smtpd.__version__ = ROUTER_VERSION_STRING def undeliverable_message(raw_message, failure_type): @@ -153,6 +159,19 @@ def send(self, To, From, Subject, Body): self.deliver(msg) +def _deliver(receiver, Peer, From, To, Data, **kwargs): + try: + logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) + routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) + except SMTPError as err: + # looks like they want to return an error, so send it out + return str(err) + except Exception: + logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", + Peer, From, To) + undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + + class SMTPChannel(smtpd.SMTPChannel): """Replaces the standard SMTPChannel with one that rejects more than one recipient""" @@ -175,7 +194,7 @@ def smtp_RCPT(self, arg): # Of course, if smtpd.SMTPServer or SMTPReceiver implemented a # queue and bounces like you're meant too... logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.") - self.push("451 Will not accept multiple recipients in one transaction") + self.push(SMTP_MULTIPLE_RCPTS_ERROR) else: smtpd.SMTPChannel.smtp_RCPT(self, arg) @@ -216,17 +235,7 @@ def process_message(self, Peer, From, To, Data, **kwargs): """ Called by smtpd.SMTPServer when there's a message received. """ - - try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + return _deliver(self, Peer, From, To, Data, **kwargs) def close(self): """Doesn't do anything except log who called this, since nobody should. Ever.""" @@ -268,18 +277,7 @@ def process_message(self, Peer, From, To, Data, **kwargs): """ Called by lmtpd.LMTPServer when there's a message received. """ - - try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - # and yes, you should still use SMTPError in your handlers - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + return _deliver(self, Peer, From, To, Data, **kwargs) def close(self): """Doesn't do anything except log who called this, since nobody should. Ever.""" @@ -287,6 +285,92 @@ def close(self): logging.error(trace) +class SMTPOnlyOneRcpt(SMTP): + async def smtp_RCPT(self, arg): + if self.envelope.rcpt_tos: + await self.push(SMTP_MULTIPLE_RCPTS_ERROR) + else: + await super().smtp_RCPT(arg) + + +class SMTPHandler: + def __init__(self, executor=None): + self.executor = executor + + async def handle_DATA(self, server, session, envelope): + assert len(envelope.rcpt_tos) == 1, "There should only be one RCPT TO" + return await server.loop.run_in_executor(self.executor, partial(_deliver, self, session.peer, envelope.mail_from, envelope.rcpt_tos[0], envelope.content)) + + +class AsyncSMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + def __init__(self, handler=None, **kwargs): + if handler is None: + handler = SMTPHandler() + super().__init__(handler, **kwargs) + + def factory(self): + return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def stop(self): + """Doesn't do anything except log who called this, since nobody should. Ever.""" + trace = traceback.format_exc(chain=False) + logging.error(trace) + + +class LMTPHandler: + def __init__(self, executor=None): + self.executor = executor + + async def handle_DATA(self, server, session, envelope): + statuses = [] + for rcpt in envelope.rcpt_tos: + status = await server.loop.run_in_executor(self.executor, partial(_deliver, self, session.peer, envelope.mail_from, rcpt, envelope.content)) + statuses.append(status or "250 Ok") + return "\r\n".join(statuses) + + +class AsyncLMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + def __init__(self, handler=None, *, socket=None, **kwargs): + if handler is None: + handler = LMTPHandler() + self.socket_path = socket + super().__init__(handler, **kwargs) + + def factory(self): + return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def _run(self, ready_event): + # adapted from aiosmtpd.controller.Controller._run + # from commit 97730f37f4a283b3da3fa3dbf30dd925695fea69 + # Copyright 2015-2017 The aiosmtpd developers + # aiosmtpd is released under the Apache License version 2.0. + asyncio.set_event_loop(self.loop) + try: + if self.socket_path is None: + server = self.loop.create_server(self.factory, host=self.hostname, + port=self.port, ssl=self.ssl_context) + else: + # no ssl on unix sockets, it doesn't really make sense + server = self.loop.create_unix_connection(self.factory, path=self.socket_path) + self.server = self.loop.run_until_complete(server) + except Exception as error: + self._thread_exception = error + return + self.loop.call_soon(ready_event.set) + self.loop.run_forever() + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + self.loop.close() + self.server = None + + def stop(self): + """Doesn't do anything except log who called this, since nobody should. Ever.""" + trace = traceback.format_exc(chain=False) + logging.error(trace) + + class QueueReceiver: """ Rather than listen on a socket this will watch a queue directory and diff --git a/setup.py b/setup.py index 2c0af7e..d274498 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import versioneer install_requires = [ + 'aiosmtpd', 'chardet', 'click', 'dnspython',