Skip to content

Commit

Permalink
Implement asyncio receivers
Browse files Browse the repository at this point in the history
Fixes #59
  • Loading branch information
moggers87 committed Jul 5, 2020
1 parent be343bd commit 3d740c0
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 173 deletions.
159 changes: 124 additions & 35 deletions salmon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@
The majority of the server related things Salmon needs to run, like receivers,
relays, and queue processors.
"""
from functools import partial
from multiprocessing.dummy import Pool
import asyncio
import asyncore
import logging
import smtpd
import smtplib
import threading
import time
import traceback

from aiosmtpd.controller import Controller
from aiosmtpd.lmtp import LMTP
from aiosmtpd.smtp import SMTP
from dns import resolver
import lmtpd

from salmon import __version__, mail, queue, routing
from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES

lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__
smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__
ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__
SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction"

lmtpd.__version__ = ROUTER_VERSION_STRING
smtpd.__version__ = ROUTER_VERSION_STRING


def undeliverable_message(raw_message, failure_type):
Expand Down Expand Up @@ -154,6 +161,19 @@ def send(self, To, From, Subject, Body):
self.deliver(msg)


def _deliver(receiver, Peer, From, To, Data, **kwargs):
try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))


class SMTPChannel(smtpd.SMTPChannel):
"""Replaces the standard SMTPChannel with one that rejects more than one recipient"""

Expand All @@ -176,7 +196,7 @@ def smtp_RCPT(self, arg):
# Of course, if smtpd.SMTPServer or SMTPReceiver implemented a
# queue and bounces like you're meant too...
logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.")
self.push("451 Will not accept multiple recipients in one transaction")
self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
smtpd.SMTPChannel.smtp_RCPT(self, arg)

Expand Down Expand Up @@ -207,6 +227,10 @@ def start(self):
self.poller = threading.Thread(target=asyncore.loop, kwargs={'timeout': 0.1, 'use_poll': True})
self.poller.start()

def stop(self):
self.close()
self.poller.join()

def handle_accept(self):
pair = self.accept()
if pair is not None:
Expand All @@ -217,22 +241,7 @@ def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by smtpd.SMTPServer when there's a message received.
"""

try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)
return _deliver(self, Peer, From, To, Data, **kwargs)


class LMTPReceiver(lmtpd.LMTPServer):
Expand All @@ -250,6 +259,8 @@ def __init__(self, host='127.0.0.1', port=8824, socket=None):
close the socket.
"""
if socket is None:
self.host = host
self.port = port
self.socket = "%s:%d" % (host, port)
lmtpd.LMTPServer.__init__(self, (host, port))
else:
Expand All @@ -265,27 +276,105 @@ def start(self):
self.poller = threading.Thread(target=asyncore.loop, kwargs={'timeout': 0.1, 'use_poll': True})
self.poller.start()

def stop(self):
self.close()
self.poller.join()

def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by lmtpd.LMTPServer when there's a message received.
"""
return _deliver(self, Peer, From, To, Data, **kwargs)


class SMTPOnlyOneRcpt(SMTP):
async def smtp_RCPT(self, arg):
if self.envelope.rcpt_tos:
await self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
await super().smtp_RCPT(arg)


class SMTPHandler:
def __init__(self, executor=None):
self.executor = executor

async def handle_DATA(self, server, session, envelope):
status = await server.loop.run_in_executor(self.executor, partial(
_deliver,
self,
session.peer,
envelope.mail_from,
envelope.rcpt_tos[0],
envelope.content,
))
return status or "250 Ok"


class AsyncSMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, handler=None, **kwargs):
if handler is None:
handler = SMTPHandler()
super().__init__(handler, **kwargs)

def factory(self):
# TODO implement a queue
return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)


class LMTPHandler:
def __init__(self, executor=None):
self.executor = executor

async def handle_DATA(self, server, session, envelope):
statuses = []
for rcpt in envelope.rcpt_tos:
status = await server.loop.run_in_executor(self.executor, partial(
_deliver,
self,
session.peer,
envelope.mail_from,
rcpt, envelope.content,
))
statuses.append(status or "250 Ok")
return "\r\n".join(statuses)


class AsyncLMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, handler=None, *, socket=None, **kwargs):
if handler is None:
handler = LMTPHandler()
self.socket_path = socket
super().__init__(handler, **kwargs)

def factory(self):
return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)

def _run(self, ready_event):
# adapted from aiosmtpd.controller.Controller._run
# from commit 97730f37f4a283b3da3fa3dbf30dd925695fea69
# Copyright 2015-2017 The aiosmtpd developers
# aiosmtpd is released under the Apache License version 2.0.
asyncio.set_event_loop(self.loop)
try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
# and yes, you should still use SMTPError in your handlers
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)
if self.socket_path is None:
server = self.loop.create_server(self.factory, host=self.hostname,
port=self.port, ssl=self.ssl_context)
else:
# no ssl on unix sockets, it doesn't really make sense
server = self.loop.create_unix_connection(self.factory, path=self.socket_path)
self.server = self.loop.run_until_complete(server)
except Exception as error:
self._thread_exception = error
return
self.loop.call_soon(ready_event.set)
self.loop.run_forever()
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.loop.close()
self.server = None


class QueueReceiver:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import versioneer

install_requires = [
'aiosmtpd',
'chardet',
'click',
'dnspython',
Expand Down
Loading

0 comments on commit 3d740c0

Please sign in to comment.