Skip to content

Commit

Permalink
Implement asyncio receivers
Browse files Browse the repository at this point in the history
Fixes #59
  • Loading branch information
moggers87 committed Jan 8, 2020
1 parent ba9b37f commit 851c5d6
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 26 deletions.
136 changes: 110 additions & 26 deletions salmon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import time
import traceback

from aiosmtpd.controller import Controller
from aiosmtpd.lmtp import LMTP
from aiosmtpd.smtp import SMTP
from dns import resolver
import lmtpd

from salmon import __version__, mail, queue, routing
from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES

lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__
smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__
ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__
SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction"

lmtpd.__version__ = ROUTER_VERSION_STRING
smtpd.__version__ = ROUTER_VERSION_STRING


def undeliverable_message(raw_message, failure_type):
Expand Down Expand Up @@ -153,6 +159,19 @@ def send(self, To, From, Subject, Body):
self.deliver(msg)


def _deliver(receiver, Peer, From, To, Data, **kwargs):
try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))


class SMTPChannel(smtpd.SMTPChannel):
"""Replaces the standard SMTPChannel with one that rejects more than one recipient"""

Expand All @@ -175,7 +194,7 @@ def smtp_RCPT(self, arg):
# Of course, if smtpd.SMTPServer or SMTPReceiver implemented a
# queue and bounces like you're meant too...
logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.")
self.push("451 Will not accept multiple recipients in one transaction")
self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
smtpd.SMTPChannel.smtp_RCPT(self, arg)

Expand Down Expand Up @@ -216,17 +235,7 @@ def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by smtpd.SMTPServer when there's a message received.
"""

try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
return _deliver(self, Peer, From, To, Data, **kwargs)

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
Expand Down Expand Up @@ -268,25 +277,100 @@ def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by lmtpd.LMTPServer when there's a message received.
"""

try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
# and yes, you should still use SMTPError in your handlers
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
return _deliver(self, Peer, From, To, Data, **kwargs)

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class SMTPOnlyOneRcpt(SMTP):
async def smtp_RCPT(self, arg):
if self.envelope.rcpt_tos:
await self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
await super().smtp_RCPT(arg)


class SMTPHandler:
def __init__(self, executor=None):
self.executor = executor

async def handle_DATA(self, server, session, envelope):
assert len(envelope.rcpt_tos) == 1, "There should only be one RCPT TO"
return await server.loop.run_in_executor(self.executor, partial(_deliver, self, session.peer, envelope.mail_from, envelope.rcpt_tos[0], envelope.content))


class AsyncSMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, handler=None, **kwargs):
if handler is None:
handler = SMTPHandler()
super().__init__(handler, **kwargs)

def factory(self):
return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)

def stop(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class LMTPHandler:
def __init__(self, executor=None):
self.executor = executor

async def handle_DATA(self, server, session, envelope):
statuses = []
for rcpt in envelope.rcpt_tos:
status = await server.loop.run_in_executor(self.executor, partial(_deliver, self, session.peer, envelope.mail_from, rcpt, envelope.content))
statuses.append(status or "250 Ok")
return "\r\n".join(statuses)


class AsyncLMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, handler=None, *, socket=None, **kwargs):
if handler is None:
handler = LMTPHandler()
self.socket_path = socket
super().__init__(handler, **kwargs)

def factory(self):
return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)

def _run(self, ready_event):
# adapted from aiosmtpd.controller.Controller._run
# from commit 97730f37f4a283b3da3fa3dbf30dd925695fea69
# Copyright 2015-2017 The aiosmtpd developers
# aiosmtpd is released under the Apache License version 2.0.
asyncio.set_event_loop(self.loop)
try:
if self.socket_path is None:
server = self.loop.create_server(self.factory, host=self.hostname,
port=self.port, ssl=self.ssl_context)
else:
# no ssl on unix sockets, it doesn't really make sense
server = self.loop.create_unix_connection(self.factory, path=self.socket_path)
self.server = self.loop.run_until_complete(server)
except Exception as error:
self._thread_exception = error
return
self.loop.call_soon(ready_event.set)
self.loop.run_forever()
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
self.server = None

def stop(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class QueueReceiver:
"""
Rather than listen on a socket this will watch a queue directory and
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import versioneer

install_requires = [
'aiosmtpd',
'chardet',
'click',
'dnspython',
Expand Down

0 comments on commit 851c5d6

Please sign in to comment.