From 53bfd71677f7067e373326bdc519f37ff90eba3c Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Fri, 3 Jan 2020 00:28:20 +0000 Subject: [PATCH 1/4] Implement asyncio receivers Fixes #59 --- salmon/server.py | 159 ++++++++++++---- setup.py | 1 + tests/server_tests.py | 433 +++++++++++++++++++++++++++++------------- tests/setup_env.py | 1 + tox.ini | 2 +- 5 files changed, 423 insertions(+), 173 deletions(-) diff --git a/salmon/server.py b/salmon/server.py index 6f752d3..158c868 100644 --- a/salmon/server.py +++ b/salmon/server.py @@ -2,23 +2,30 @@ The majority of the server related things Salmon needs to run, like receivers, relays, and queue processors. """ +from functools import partial from multiprocessing.dummy import Pool +import asyncio import asyncore import logging import smtpd import smtplib import threading import time -import traceback +from aiosmtpd.controller import Controller +from aiosmtpd.lmtp import LMTP +from aiosmtpd.smtp import SMTP from dns import resolver import lmtpd from salmon import __version__, mail, queue, routing from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES -lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__ -smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__ +ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__ +SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction" + +lmtpd.__version__ = ROUTER_VERSION_STRING +smtpd.__version__ = ROUTER_VERSION_STRING def undeliverable_message(raw_message, failure_type): @@ -154,6 +161,19 @@ def send(self, To, From, Subject, Body): self.deliver(msg) +def _deliver(receiver, Peer, From, To, Data, **kwargs): + try: + logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) + routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) + except SMTPError as err: + # looks like they want to return an error, so send it out + return str(err) + except Exception: + logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", + Peer, From, To) + undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + + class SMTPChannel(smtpd.SMTPChannel): """Replaces the standard SMTPChannel with one that rejects more than one recipient""" @@ -176,7 +196,7 @@ def smtp_RCPT(self, arg): # Of course, if smtpd.SMTPServer or SMTPReceiver implemented a # queue and bounces like you're meant too... logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.") - self.push("451 Will not accept multiple recipients in one transaction") + self.push(SMTP_MULTIPLE_RCPTS_ERROR) else: smtpd.SMTPChannel.smtp_RCPT(self, arg) @@ -207,6 +227,10 @@ def start(self): self.poller = threading.Thread(target=asyncore.loop, kwargs={'timeout': 0.1, 'use_poll': True}) self.poller.start() + def stop(self): + self.close() + self.poller.join() + def handle_accept(self): pair = self.accept() if pair is not None: @@ -217,22 +241,7 @@ def process_message(self, Peer, From, To, Data, **kwargs): """ Called by smtpd.SMTPServer when there's a message received. """ - - try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) - - def close(self): - """Doesn't do anything except log who called this, since nobody should. Ever.""" - trace = traceback.format_exc(chain=False) - logging.error(trace) + return _deliver(self, Peer, From, To, Data, **kwargs) class LMTPReceiver(lmtpd.LMTPServer): @@ -250,6 +259,8 @@ def __init__(self, host='127.0.0.1', port=8824, socket=None): close the socket. """ if socket is None: + self.host = host + self.port = port self.socket = "%s:%d" % (host, port) lmtpd.LMTPServer.__init__(self, (host, port)) else: @@ -265,27 +276,105 @@ def start(self): self.poller = threading.Thread(target=asyncore.loop, kwargs={'timeout': 0.1, 'use_poll': True}) self.poller.start() + def stop(self): + self.close() + self.poller.join() + def process_message(self, Peer, From, To, Data, **kwargs): """ Called by lmtpd.LMTPServer when there's a message received. """ + return _deliver(self, Peer, From, To, Data, **kwargs) + +class SMTPOnlyOneRcpt(SMTP): + async def smtp_RCPT(self, arg): + if self.envelope.rcpt_tos: + await self.push(SMTP_MULTIPLE_RCPTS_ERROR) + else: + await super().smtp_RCPT(arg) + + +class SMTPHandler: + def __init__(self, executor=None): + self.executor = executor + + async def handle_DATA(self, server, session, envelope): + status = await server.loop.run_in_executor(self.executor, partial( + _deliver, + self, + session.peer, + envelope.mail_from, + envelope.rcpt_tos[0], + envelope.content, + )) + return status or "250 Ok" + + +class AsyncSMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + def __init__(self, handler=None, **kwargs): + if handler is None: + handler = SMTPHandler() + super().__init__(handler, **kwargs) + + def factory(self): + # TODO implement a queue + return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + +class LMTPHandler: + def __init__(self, executor=None): + self.executor = executor + + async def handle_DATA(self, server, session, envelope): + statuses = [] + for rcpt in envelope.rcpt_tos: + status = await server.loop.run_in_executor(self.executor, partial( + _deliver, + self, + session.peer, + envelope.mail_from, + rcpt, envelope.content, + )) + statuses.append(status or "250 Ok") + return "\r\n".join(statuses) + + +class AsyncLMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + def __init__(self, handler=None, *, socket=None, **kwargs): + if handler is None: + handler = LMTPHandler() + self.socket_path = socket + super().__init__(handler, **kwargs) + + def factory(self): + return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def _run(self, ready_event): + # adapted from aiosmtpd.controller.Controller._run + # from commit 97730f37f4a283b3da3fa3dbf30dd925695fea69 + # Copyright 2015-2017 The aiosmtpd developers + # aiosmtpd is released under the Apache License version 2.0. + asyncio.set_event_loop(self.loop) try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - # and yes, you should still use SMTPError in your handlers - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) - - def close(self): - """Doesn't do anything except log who called this, since nobody should. Ever.""" - trace = traceback.format_exc(chain=False) - logging.error(trace) + if self.socket_path is None: + server = self.loop.create_server(self.factory, host=self.hostname, + port=self.port, ssl=self.ssl_context) + else: + # no ssl on unix sockets, it doesn't really make sense + server = self.loop.create_unix_server(self.factory, path=self.socket_path) + self.server = self.loop.run_until_complete(server) + except Exception as error: + self._thread_exception = error + return + self.loop.call_soon(ready_event.set) + self.loop.run_forever() + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + self.loop.close() + self.server = None class QueueReceiver: diff --git a/setup.py b/setup.py index a7b35a8..5cb4084 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import versioneer install_requires = [ + 'aiosmtpd', 'chardet', 'click', 'dnspython', diff --git a/tests/server_tests.py b/tests/server_tests.py index 26ee65d..49cd19c 100644 --- a/tests/server_tests.py +++ b/tests/server_tests.py @@ -1,8 +1,9 @@ # Copyright (C) 2008 Zed A. Shaw. Licensed under the terms of the GPLv3. +from smtplib import LMTP, SMTP, SMTPDataError from unittest.mock import Mock, call, patch +import os import socket - -import lmtpd +import tempfile from salmon import mail, queue, routing, server @@ -30,126 +31,24 @@ def test_router(self): routing.Router.deliver(msg) - @patch("asynchat.async_chat.push") - def test_SMTPChannel(self, push_mock): - channel = server.SMTPChannel(Mock(), Mock(), Mock()) - expected_version = "220 {} {}\r\n".format(socket.getfqdn(), server.smtpd.__version__).encode() - - self.assertEqual(push_mock.call_args[0][1:], (expected_version,)) - self.assertEqual(type(push_mock.call_args[0][1]), bytes) - - push_mock.reset_mock() - channel.seen_greeting = True - channel.smtp_MAIL("FROM: you@example.com\r\n") - self.assertEqual(push_mock.call_args[0][1:], (SMTP_MESSAGES["ok"],)) - - push_mock.reset_mock() - channel.smtp_RCPT("TO: me@example.com") - self.assertEqual(push_mock.call_args[0][1:], (SMTP_MESSAGES["ok"],)) - - push_mock.reset_mock() - channel.smtp_RCPT("TO: them@example.com") - self.assertEqual(push_mock.call_args[0][1:], - ("451 Will not accept multiple recipients in one transaction\r\n".encode(),)) - - def test_SMTPReceiver_process_message(self): - receiver = server.SMTPReceiver(host="localhost", port=0) - msg = generate_mail() - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - assert response is None, response - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = Exception() - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - assert response is None, response - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = server.SMTPError(450, "Not found") - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - self.assertEqual(response, "450 Not found") - - # Python 3's smtpd takes some extra kawrgs, but i we don't care about that at the moment - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg), mail_options=[], rcpt_options=[]) - assert response is None, response - - @patch("lmtpd.asynchat.async_chat.push") - def test_LMTPChannel(self, push_mock): - channel = lmtpd.LMTPChannel(Mock(), Mock(), Mock()) - expected_version = "220 {} {}\r\n".format(socket.getfqdn(), server.lmtpd.__version__).encode() - - self.assertEqual(push_mock.call_args[0][1:], (expected_version,)) - self.assertEqual(type(push_mock.call_args[0][1]), bytes) - - push_mock.reset_mock() - channel.seen_greeting = True - channel.lmtp_MAIL(b"FROM: you@example.com\r\n") - self.assertEqual(push_mock.call_args[0][1:], ("250 2.1.0 Ok\r\n".encode(),)) - - push_mock.reset_mock() - channel.lmtp_RCPT(b"TO: me@example.com") - self.assertEqual(push_mock.call_args[0][1:], ("250 2.1.0 Ok\r\n".encode(),)) - - push_mock.reset_mock() - channel.lmtp_RCPT(b"TO: them@example.com") - self.assertEqual(push_mock.call_args[0][1:], ("250 2.1.0 Ok\r\n".encode(),)) - - def test_LMTPReceiver_process_message(self): - receiver = server.LMTPReceiver(host="localhost", port=0) - msg = generate_mail() - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - assert response is None, response - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = Exception() - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - assert response is None, response - - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = server.SMTPError(450, "Not found") - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg)) - self.assertEqual(response, "450 Not found") + def test_SMTPError(self): + err = server.SMTPError(550) + self.assertEqual(str(err), '550 Permanent Failure Mail Delivery Protocol Status') - # lmtpd's server is a subclass of smtpd's server, so we should support the same kwargs here - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - response = receiver.process_message(msg.Peer, msg.From, msg.To, str(msg), mail_options=[], rcpt_options=[]) - assert response is None, response + err = server.SMTPError(400) + self.assertEqual(str(err), '400 Persistent Transient Failure Other or Undefined Status') - @patch("salmon.queue.Queue") - def test_QueueReceiver_process_message(self, queue_mock): - receiver = server.QueueReceiver("run/queue/thingy") - msg = generate_mail() + err = server.SMTPError(425) + self.assertEqual(str(err), '425 Persistent Transient Failure Mailbox Status') - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - response = receiver.process_message(msg) - assert response is None, response + err = server.SMTPError(999) + self.assertEqual(str(err), "999 ") - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = Exception() - response = receiver.process_message(msg) - assert response is None, response + err = server.SMTPError(999, "Bogus Error Code") + self.assertEqual(str(err), "999 Bogus Error Code") - with patch("salmon.server.routing.Router") as router_mock, \ - patch("salmon.server.undeliverable_message"): - router_mock.deliver.side_effect = server.SMTPError(450, "Not found") - response = receiver.process_message(msg) - # queue doesn't actually support SMTPErrors - assert response is None, response +class RelayTestCase(SalmonTestCase): def test_Relay_asserts_ssl_options(self): """Relay raises an TypeError if the ssl option is used in combination with starttls or lmtp""" with self.assertRaises(TypeError): @@ -255,6 +154,31 @@ def test_relay_raises_exception(self, create_mock): with self.assertRaises(socket.error): relay.deliver(generate_mail(factory=mail.MailResponse)) + +class QueueTestCase(SalmonTestCase): + @patch("salmon.queue.Queue") + def test_QueueReceiver_process_message(self, queue_mock): + receiver = server.QueueReceiver("run/queue/thingy") + msg = generate_mail() + + with patch("salmon.server.routing.Router") as router_mock, \ + patch("salmon.server.undeliverable_message"): + response = receiver.process_message(msg) + assert response is None, response + + with patch("salmon.server.routing.Router") as router_mock, \ + patch("salmon.server.undeliverable_message"): + router_mock.deliver.side_effect = Exception() + response = receiver.process_message(msg) + assert response is None, response + + with patch("salmon.server.routing.Router") as router_mock, \ + patch("salmon.server.undeliverable_message"): + router_mock.deliver.side_effect = server.SMTPError(450, "Not found") + response = receiver.process_message(msg) + # queue doesn't actually support SMTPErrors + assert response is None, response + @patch('salmon.routing.Router') def test_queue_receiver(self, router_mock): receiver = server.QueueReceiver('run/queue') @@ -325,31 +249,266 @@ def test_queue_receiver_pool(self, pool_mock): self.assertEqual(len(args), 1) self.assertEqual(type(args[0]), mail.MailRequest) - @patch('threading.Thread', new=Mock()) - @patch('salmon.routing.Router', new=Mock()) - def test_SMTPReceiver(self): - receiver = server.SMTPReceiver(port=0) - receiver.start() - receiver.process_message('localhost', 'test@localhost', 'test@localhost', - 'Fake body.') - routing.Router.deliver.side_effect = RuntimeError("Raised on purpose") - receiver.process_message('localhost', 'test@localhost', 'test@localhost', 'Fake body.') +class SmtpSeverTestCase(SalmonTestCase): + def setUp(self): + super().setUp() + self.server = server.SMTPReceiver(host="127.0.0.1", port=9999) + self.server.start() + self.addCleanup(self.server.stop) - receiver.close() + @patch('salmon.routing.Router') + def test_message_routed(self, router_mock): + with SMTP(self.server.host, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") - def test_SMTPError(self): - err = server.SMTPError(550) - self.assertEqual(str(err), '550 Permanent Failure Mail Delivery Protocol Status') + @patch('salmon.routing.Router') + def test_message_routed_error(self, router_mock): + router_mock.deliver.side_effect = RuntimeError("Raised on purpose") + with SMTP(self.server.host, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) - err = server.SMTPError(400) - self.assertEqual(str(err), '400 Persistent Transient Failure Other or Undefined Status') + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") - err = server.SMTPError(425) - self.assertEqual(str(err), '425 Persistent Transient Failure Mailbox Status') + @patch('salmon.routing.Router') + def test_message_routed_smtperror(self, router_mock): + router_mock.deliver.side_effect = server.SMTPError(450, "Raised on purpose") + with SMTP(self.server.host, self.server.port) as client: + with self.assertRaises(SMTPDataError): + client.sendmail("you@localhost", "me@localhost", "hello") + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + def test_multiple_rcpts(self): + with SMTP(self.server.host, self.server.port) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) + code, _ = client.mail("me@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("you@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("them@localhost") + self.assertEqual(code, 451) + + +class LmtpSeverTestCase(SalmonTestCase): + def setUp(self): + super().setUp() + self.server = server.LMTPReceiver(host="127.0.0.1", port=9999) + self.server.start() + self.addCleanup(self.server.stop) - err = server.SMTPError(999) - self.assertEqual(str(err), "999 ") + @patch('salmon.routing.Router') + def test_message_routed(self, router_mock): + with LMTP(self.server.host, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") - err = server.SMTPError(999, "Bogus Error Code") - self.assertEqual(str(err), "999 Bogus Error Code") + @patch('salmon.routing.Router') + def test_message_routed_error(self, router_mock): + router_mock.deliver.side_effect = RuntimeError("Raised on purpose") + with LMTP(self.server.host, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + @patch('salmon.routing.Router') + def test_message_routed_smtperror(self, router_mock): + router_mock.deliver.side_effect = server.SMTPError(450, "Raised on purpose") + with LMTP(self.server.host, self.server.port) as client: + with self.assertRaises(SMTPDataError): + client.sendmail("you@localhost", "me@localhost", "hello") + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + def test_multiple_rcpts(self): + with LMTP(self.server.host, self.server.port) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) + code, _ = client.mail("me@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("you@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("them@localhost") + self.assertEqual(code, 250) + + +class AsyncSmtpServerTestCase(SalmonTestCase): + def setUp(self): + super().setUp() + self.server = server.AsyncSMTPReceiver(hostname="127.0.0.1", port=9999) + self.server.start() + self.addCleanup(self.server.stop) + + @patch('salmon.routing.Router') + def test_message_routed(self, router_mock): + with SMTP(self.server.hostname, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + @patch('salmon.routing.Router') + def test_message_routed_error(self, router_mock): + router_mock.deliver.side_effect = RuntimeError("Raised on purpose") + with SMTP(self.server.hostname, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + @patch('salmon.routing.Router') + def test_message_routed_smtperror(self, router_mock): + router_mock.deliver.side_effect = server.SMTPError(450, "Raised on purpose") + with SMTP(self.server.hostname, self.server.port) as client: + with self.assertRaises(SMTPDataError): + client.sendmail("you@localhost", "me@localhost", "hello") + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + def test_multiple_rcpts(self): + with SMTP(self.server.hostname, self.server.port) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) + code, _ = client.mail("me@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("you@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("them@localhost") + self.assertEqual(code, 451) + + +class AsyncLmtpServerTestCase(SalmonTestCase): + def setUp(self): + super().setUp() + self.server = server.AsyncLMTPReceiver(hostname="127.0.0.1", port=9999) + self.server.start() + self.addCleanup(self.server.stop) + + @patch('salmon.routing.Router') + def test_message_routed(self, router_mock): + with LMTP(self.server.hostname, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + @patch('salmon.routing.Router') + def test_message_routed_error(self, router_mock): + router_mock.deliver.side_effect = RuntimeError("Raised on purpose") + with LMTP(self.server.hostname, self.server.port) as client: + result = client.sendmail("you@localhost", "me@localhost", "hello") + self.assertEqual(result, {}) + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + @patch('salmon.routing.Router') + def test_message_routed_smtperror(self, router_mock): + router_mock.deliver.side_effect = server.SMTPError(450, "Raised on purpose") + with LMTP(self.server.hostname, self.server.port) as client: + with self.assertRaises(SMTPDataError): + client.sendmail("you@localhost", "me@localhost", "hello") + + self.assertEqual(router_mock.deliver.call_count, 1) + msg = router_mock.deliver.call_args[0][0] + self.assertEqual(msg.Peer, client.sock.getsockname()) + self.assertEqual(msg.To, "me@localhost") + self.assertEqual(msg.From, "you@localhost") + self.assertEqual(msg.Data, b"hello") + + def test_multiple_rcpts(self): + with LMTP(self.server.hostname, self.server.port) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) + code, _ = client.mail("me@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("you@localhost") + self.assertEqual(code, 250) + code, _ = client.rcpt("them@localhost") + self.assertEqual(code, 250) + + +class LmtpSeverUnixSocketTestCase(SalmonTestCase): + def test_asyncio(self): + tempdir = tempfile.mkdtemp() + socket_name = os.path.join(tempdir, "lmtp") + _server = server.AsyncLMTPReceiver(socket=socket_name) + _server.start() + self.addCleanup(_server.stop) + + with LMTP(socket_name) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) + + def test_asyncore(self): + tempdir = tempfile.mkdtemp() + socket_name = os.path.join(tempdir, "lmtp") + _server = server.LMTPReceiver(socket=socket_name) + _server.start() + self.addCleanup(_server.stop) + + with LMTP(socket_name) as client: + code, _ = client.ehlo("localhost") + self.assertEqual(code, 250) diff --git a/tests/setup_env.py b/tests/setup_env.py index 4ea125b..45a5f63 100644 --- a/tests/setup_env.py +++ b/tests/setup_env.py @@ -22,6 +22,7 @@ def clean_dirs(): class SalmonTestCase(TestCase): def setUp(self): + super().setUp() clean_dirs() for path in dirs: os.mkdir(path) diff --git a/tox.ini b/tox.ini index 7910265..c82f492 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,7 @@ commands = flake8 salmon tests setup.py deps = flake8 [testenv:isort] -commands = isort --check-only --diff +commands = isort --check-only --diff salmon tests setup.py deps = isort [flake8] From a5a77de1eeda8d3ef87afc83bcb64b5283ac9652 Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Sun, 5 Jul 2020 04:43:53 +0100 Subject: [PATCH 2/4] Remove Python 3.5 from test matrix Also remove PyPy as I don't think a newer version is available on Travis that isn't also really really slow. --- .travis.yml | 4 ---- README.rst | 2 +- setup.py | 3 +-- tox.ini | 2 +- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index 419e81a..f14f0ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,8 @@ language: python -dist: xenial - python: - - "3.5" - "3.6" - "3.7" - "3.8" - - pypy3.5-6.0 matrix: include: diff --git a/README.rst b/README.rst index 2fbb9ad..ba1f044 100644 --- a/README.rst +++ b/README.rst @@ -94,7 +94,7 @@ Salmon has just had some major changes to modernise the code-base. The main APIs should be compatible with releases prior to 3.0.0, but there's no guarantee that older applications won't need changes. -Python versions supported are: 3.5, 3.6, 3.7 and 3.8. +Python versions supported are: 3.6, 3.7 and 3.8. See the CHANGELOG for more details on what's changed since Salmon version 2. diff --git a/setup.py b/setup.py index 5cb4084..406ea8e 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,6 @@ 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Development Status :: 4 - Beta', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', @@ -50,7 +49,7 @@ 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy', ], - 'python_requires': '>=3.5', + 'python_requires': '>=3.6', 'entry_points': { 'console_scripts': ['salmon = salmon.commands:main'], diff --git a/tox.ini b/tox.ini index c82f492..2279e78 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,7 @@ envlist = docs lint isort - py{35,36,37,38} + py{36,37,38} [testenv] commands = From 56b36ae03c24c3fb797deba96f93e18ea3630109 Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Sun, 5 Jul 2020 20:18:37 +0100 Subject: [PATCH 3/4] Update docs --- docs/getting_started.rst | 12 ++++---- docs/index.rst | 1 + docs/receivers.rst | 41 ++++++++++++++++++++++++++++ salmon/data/prototype/config/boot.py | 6 ++-- salmon/server.py | 13 ++++++--- salmon/utils.py | 2 +- 6 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 docs/receivers.rst diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 1c22da7..a54b23f 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -102,7 +102,7 @@ By deafult, all configuration happens in ``config/`` ^^^^^^^^^^^ This file is used by Salmon during start-up to configure the daemon with -various things, such as starting the ``LMTPReceiver``. It's a bit like the +various things, such as starting the ``AsyncLMTPReceiver``. It's a bit like the ``wsgi.py`` file that Python web apps have. If you want to use a different boot module, you can specify it with the ``--boot`` argument. E.g. to use ``myapp/othermodule.py``, do: @@ -144,18 +144,18 @@ much in the same way as you host a WSGI application behind Apache or Nginx. As seen above, a new Salmon project will start a LMTP server that listens on ``localhost:8823``. You can go into ``config/settings.py`` and change the host -and port Salmon uses. You can also switch out ``LMTPReceiver`` for -``SMTPReceiver`` if you require Salmon to use SMTP instead. +and port Salmon uses. You can also switch out ``AsyncLMTPReceiver`` for +``AsyncSMTPReceiver`` if you require Salmon to use SMTP instead. .. warning:: Due to the way Salmon has been implemented it is better suited as a LMTP - server than a SMTP server. ``SMTPReceiver`` is unable to handle multiple + server than a SMTP server. ``AsyncSMTPReceiver`` is unable to handle multiple recipients in one transaction as it doesn't implement the nessessary features to properly implement this part of the SMTP protocol. This is a - compromise ``SMTPReceiver`` makes in order to allow users more freedom in + compromise ``AsyncSMTPReceiver`` makes in order to allow users more freedom in what they do in their handlers. - ``LMTPReceiver`` is unaffected by this issue and implements the LMTP + ``AsyncLMTPReceiver`` is unaffected by this issue and implements the LMTP protocol fully. diff --git a/docs/index.rst b/docs/index.rst index 22ed08f..7c660d5 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -20,6 +20,7 @@ Contents: routing relaying mail_objects + receivers commandline Salmon API guide diff --git a/docs/receivers.rst b/docs/receivers.rst new file mode 100644 index 0000000..37e024c --- /dev/null +++ b/docs/receivers.rst @@ -0,0 +1,41 @@ +Receivers +========= + +Salmon comes with a number of receivers + +Asyncio based receivers +----------------------- + +Salmon's default receivers based on `aiosmtpd `__. + +.. note:: + Although these receivers use Asyncio, your handlers are executed in a + separate thread to allow them to use synchronous code that is easier to + write and maintain. + +:class:`~salmon.server.AsyncLMTPReceiver` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:class:`~salmon.server.AsyncSMTPReceiver` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Asyncore based receivers +------------------------ + +Salmon's original receivers based on Python's ``smtpd`` library. + +:class:`~salmon.server.LMTPReceiver` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:class:`~salmon.server.SMTPReceiver` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. warning:: + This receiver is not suitable for use on an Internet facing port. Try + `AsyncSMTPReceiver`_ instead. + +Other receivers +--------------- + +:class:`~salmon.server.QueueReceiver` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/salmon/data/prototype/config/boot.py b/salmon/data/prototype/config/boot.py index 8b4ce55..eb9aa3c 100644 --- a/salmon/data/prototype/config/boot.py +++ b/salmon/data/prototype/config/boot.py @@ -2,7 +2,7 @@ from salmon import queue from salmon.routing import Router -from salmon.server import LMTPReceiver, Relay +from salmon.server import AsyncLMTPReceiver, Relay from . import settings @@ -13,8 +13,8 @@ port=settings.relay_config['port'], debug=1) # where to listen for incoming messages -settings.receiver = LMTPReceiver(settings.receiver_config['host'], - settings.receiver_config['port']) +settings.receiver = AsyncLMTPReceiver(hostname=settings.receiver_config['host'], + port=settings.receiver_config['port']) Router.defaults(**settings.router_defaults) Router.load(settings.handlers) diff --git a/salmon/server.py b/salmon/server.py index 158c868..4faf695 100644 --- a/salmon/server.py +++ b/salmon/server.py @@ -175,8 +175,6 @@ def _deliver(receiver, Peer, From, To, Data, **kwargs): class SMTPChannel(smtpd.SMTPChannel): - """Replaces the standard SMTPChannel with one that rejects more than one recipient""" - def smtp_RCPT(self, arg): if self.__rcpttos: # We can't properly handle multiple RCPT TOs in SMTPReceiver @@ -202,7 +200,11 @@ def smtp_RCPT(self, arg): class SMTPReceiver(smtpd.SMTPServer): - """Receives emails and hands it to the Router for further processing.""" + """Receives emails and hands it to the Router for further processing. + + + This Receiver is based on Python's asyncore module. Consider using AsyncSMTPReceiver. + """ def __init__(self, host='127.0.0.1', port=8825): """ @@ -245,7 +247,10 @@ def process_message(self, Peer, From, To, Data, **kwargs): class LMTPReceiver(lmtpd.LMTPServer): - """Receives emails and hands it to the Router for further processing.""" + """Receives emails and hands it to the Router for further processing. + + This Receiver is based on Python's asyncore module. Consider using AsyncLMTPReceiver. + """ def __init__(self, host='127.0.0.1', port=8824, socket=None): """ diff --git a/salmon/utils.py b/salmon/utils.py index 3d1f407..52f809a 100644 --- a/salmon/utils.py +++ b/salmon/utils.py @@ -94,7 +94,7 @@ def make_fake_settings(host, port): logging.basicConfig(filename="logs/logger.log", level=logging.DEBUG) routing.Router.load(['salmon.handlers.log', 'salmon.handlers.queue']) settings = imp.new_module('settings') - settings.receiver = server.SMTPReceiver(host, port) + settings.receiver = server.AsyncSMTPReceiver(hostname=host, port=port) settings.relay = None logging.info("Logging mode enabled, will not send email to anyone, just log.") From 05ded0768fae8b2882d4be1c018bb8ad7f309851 Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Mon, 13 Jul 2020 16:28:26 +0100 Subject: [PATCH 4/4] wip queue --- salmon/queue.py | 140 +++++++++++++++++++++++++++++++---------- salmon/server.py | 117 ++++++++++++++++++++-------------- tests/command_tests.py | 2 +- tests/server_tests.py | 3 +- tests/utils_tests.py | 1 - 5 files changed, 181 insertions(+), 82 deletions(-) diff --git a/salmon/queue.py b/salmon/queue.py index 6d497b3..4bd9054 100644 --- a/salmon/queue.py +++ b/salmon/queue.py @@ -4,13 +4,16 @@ to do some serious surgery go use that. This works as a good API for the 90% case of "put mail in, get mail out" queues. """ +import contextlib import errno +import fcntl import hashlib import logging import mailbox import os import socket import time +import json from salmon import mail @@ -42,14 +45,6 @@ def _create_tmp(self): raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path) -class QueueError(Exception): - - def __init__(self, msg, data): - Exception.__init__(self, msg) - self._message = msg - self.data = data - - class Queue: """ Provides a simplified API for dealing with 'queues' in Salmon. @@ -89,8 +84,10 @@ def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None): self.oversize_dir = os.path.join(oversize_dir, "new") - if not os.path.exists(self.oversize_dir): + try: os.mkdir(self.oversize_dir) + except FileExistsError: + pass else: self.oversize_dir = None @@ -104,6 +101,16 @@ def push(self, message): message = str(message) return self.mbox.add(message) + def _move_oversize(self, key, name): + if self.oversize_dir: + logging.info("Message key %s over size limit %d, moving to %s.", + key, self.pop_limit, self.oversize_dir) + os.rename(name, os.path.join(self.oversize_dir, key)) + else: + logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).", + key, self.pop_limit) + os.unlink(name) + def pop(self): """ Pops a message off the queue, order is not really maintained @@ -115,21 +122,10 @@ def pop(self): over, over_name = self.oversize(key) if over: - if self.oversize_dir: - logging.info("Message key %s over size limit %d, moving to %s.", - key, self.pop_limit, self.oversize_dir) - os.rename(over_name, os.path.join(self.oversize_dir, key)) - else: - logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).", - key, self.pop_limit) - os.unlink(over_name) + self._move_oversize(key, over_name) else: - try: - msg = self.get(key) - except QueueError as exc: - raise exc - finally: - self.remove(key) + msg = self.get(key) + self.remove(key) return key, msg return None, None @@ -149,11 +145,11 @@ def get(self, key): try: return mail.MailRequest(self.dir, None, None, msg_data) except Exception as exc: - logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data) + logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data) return None def remove(self, key): - """Removes the queue, but not returned.""" + """Removes key the queue.""" self.mbox.remove(key) def __len__(self): @@ -166,15 +162,8 @@ def __len__(self): def clear(self): """ Clears out the contents of the entire queue. - - Warning: This could be horribly inefficient since it pops messages - until the queue is empty. It could also cause an infinite loop if - another process is writing to messages to the Queue faster than we can - pop. """ - # man this is probably a really bad idea - while len(self) > 0: - self.pop() + self.mbox.clear() def keys(self): """ @@ -188,3 +177,88 @@ def oversize(self, key): return os.path.getsize(file_name) > self.pop_limit, file_name else: return False, None + + +class Metadata: + def __init__(self, path): + self.path = os.path.join(path, "metadata") + self.meta_file = None + try: + os.mkdir(self.path) + except FileExistsError: + pass + + def get(self): + return json.load(self.meta_file) + + def set(self, key, data): + json.dump(self.meta_file, data) + + def remove(self): + os.unlink(self.meta_file) + + def clear(self): + raise NotImplementedError + + @contextlib.contextmanager + def lock(self, key, mode="r"): + i = 0 + try: + self.meta_file = open(os.path.join(self.path, key), mode) + except FileNotFoundError: + pass + else: + while True: + # try for a lock using exponential backoff + try: + fcntl.flock(self.meta_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + if i > 5: + # 2**5 is 30 seconds which is far too long + raise + time.sleep(2**i) + i += 1 + else: + break + + try: + yield self + finally: + if self.meta_file is not None: + fcntl.flock(self.meta_file, fcntl.LOCK_UN) + self.meta_file.close() + self.meta_file = None + + +class QueueWithMetadata(Queue): + """Just like Queue, except it stores envelope data""" + def push(self, message, Peer, From, To): + if not isinstance(To, list): + To = [To] + key = super().push(message) + with Metadata(self.dir).lock(key, "w") as metadata: + metadata.set(key, {"Peer": Peer, "From": From, "To": To}) + return key + + def get(self, key): + with Metadata(self.dir).lock(key) as metadata: + msg = super().get(key) + data = metadata.get(key) + # move data from metadata to msg obj + for k, v in data.items(): + setattr(msg, k, v) + data["To"].remove(msg.To) + metadata.set(data) + return msg + + def remove(self, key): + with Metadata(self.dir).lock(key) as metadata: + data = metadata.get(key) + # if there's still a To to be processed, leave the message on disk + if not data.get("To"): + super().remove(key) + metadata.remove() + + def clear(self): + Metadata(self.dir).clear() + super().clear() diff --git a/salmon/server.py b/salmon/server.py index 4faf695..d5776c9 100644 --- a/salmon/server.py +++ b/salmon/server.py @@ -23,6 +23,7 @@ ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__ SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction" +IN_QUEUE = "run/in_queue" lmtpd.__version__ = ROUTER_VERSION_STRING smtpd.__version__ = ROUTER_VERSION_STRING @@ -292,40 +293,47 @@ def process_message(self, Peer, From, To, Data, **kwargs): return _deliver(self, Peer, From, To, Data, **kwargs) -class SMTPOnlyOneRcpt(SMTP): - async def smtp_RCPT(self, arg): - if self.envelope.rcpt_tos: - await self.push(SMTP_MULTIPLE_RCPTS_ERROR) - else: - await super().smtp_RCPT(arg) - - class SMTPHandler: - def __init__(self, executor=None): + def __init__(self, executor=None, *, in_queue): self.executor = executor + self.in_queue = in_queue async def handle_DATA(self, server, session, envelope): - status = await server.loop.run_in_executor(self.executor, partial( - _deliver, - self, - session.peer, - envelope.mail_from, - envelope.rcpt_tos[0], - envelope.content, - )) - return status or "250 Ok" + try: + status = await server.loop.run_in_executor(self.executor, partial( + self.in_queue.queue.push, + envelope.content, + session.peer, + envelope.mail_from, + envelope.rcpt_tos, + )) + status = "250 Ok" + except Exception: + logging.exception("Raised exception while trying to push to Queue: %r, Peer: %r, From: %r, To: %r") + status = "550 Server error" + return status class AsyncSMTPReceiver(Controller): """Receives emails and hands it to the Router for further processing.""" - def __init__(self, handler=None, **kwargs): + def __init__(self, handler=None, in_queue=None, **kwargs): + if in_queue is None: + in_queue = QueueReceiver(queue.QueueWithMetadata(IN_QUEUE)) + self.in_queue = in_queue if handler is None: - handler = SMTPHandler() + handler = SMTPHandler(in_queue=self.in_queue) super().__init__(handler, **kwargs) def factory(self): - # TODO implement a queue - return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + return SMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def start(self): + super().start() + self.in_queue.start() + + def stop(self): + super().stop() + self.in_queue.stop() class LMTPHandler: @@ -340,7 +348,8 @@ async def handle_DATA(self, server, session, envelope): self, session.peer, envelope.mail_from, - rcpt, envelope.content, + rcpt, + envelope.content, )) statuses.append(status or "250 Ok") return "\r\n".join(statuses) @@ -389,20 +398,25 @@ class QueueReceiver: same way otherwise. """ - def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None, workers=10): + def __init__(self, in_queue, sleep=10, size_limit=0, oversize_dir=None, workers=10): """ The router should be fully configured and ready to work, the queue_dir can be a fully qualified path or relative. The option workers dictates how many threads are started to process messages. Consider adding ``@nolocking`` to your handlers if you are able to. """ - self.queue = queue.Queue(queue_dir, pop_limit=size_limit, - oversize_dir=oversize_dir) + if isinstance(in_queue, str): + self.queue = queue.Queue(in_queue, pop_limit=size_limit, + oversize_dir=oversize_dir) + else: + self.queue = in_queue self.sleep = sleep # Pool is from multiprocess.dummy which uses threads rather than processes self.workers = Pool(workers) + self._running = True + def start(self, one_shot=False): """ Start simply loops indefinitely sleeping and pulling messages @@ -412,25 +426,35 @@ def start(self, one_shot=False): """ logging.info("Queue receiver started on queue dir %s", self.queue.dir) - logging.debug("Sleeping for %d seconds...", self.sleep) - - # if there are no messages left in the maildir and this a one-shot, the - # while loop terminates - while not (len(self.queue) == 0 and one_shot): - # if there's nothing in the queue, take a break - if len(self.queue) == 0: - time.sleep(self.sleep) - continue - try: - key, msg = self.queue.pop() - except KeyError: - logging.debug("Could not find message in Queue") - continue - - logging.debug("Pulled message with key: %r off", key) - self.workers.apply_async(self.process_message, args=(msg,)) + def _run(): + while self._running: + # if there's nothing in the queue, take a break + if len(self.queue) == 0: + if one_shot: + self._running = False + else: + logging.debug("Sleeping for %d seconds...", self.sleep) + time.sleep(self.sleep) + continue + + try: + key, msg = self.queue.pop() + except KeyError: + logging.debug("Could not find message in Queue") + continue + + logging.debug("Pulled message with key: %r off", key) + self.workers.apply_async(self.process_message, args=(msg,)) + self.main_thread = threading.Thread(target=_run) + self.main_thread.start() + + if one_shot: + self.main_thread.join() + def stop(self): + self._running = False + self.main_thread.join() self.workers.close() self.workers.join() @@ -441,12 +465,13 @@ def process_message(self, msg): """ try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", msg.Peer, msg.From, msg.To) + logging.debug("Message received from Queue: %r, Peer: %r, From: %r, to To %r.", + self.queue, msg.Peer, msg.From, msg.To) routing.Router.deliver(msg) except SMTPError as err: logging.exception("Raising SMTPError when running in a QueueReceiver is unsupported.") undeliverable_message(msg.Data, err.message) except Exception: - logging.exception("Exception while processing message from Peer: " - "%r, From: %r, to To %r.", msg.Peer, msg.From, msg.To) + logging.exception("Exception while processing message from Queue: %r, Peer: " + "%r, From: %r, to To %r.", self.queue, msg.Peer, msg.From, msg.To) undeliverable_message(msg.Data, "Router failed to catch exception.") diff --git a/tests/command_tests.py b/tests/command_tests.py index 8bda525..459d8cd 100644 --- a/tests/command_tests.py +++ b/tests/command_tests.py @@ -79,7 +79,7 @@ def test_queue_command(self, MockQueue): self.assertEqual(mq.__len__.call_count, 1) @patch('salmon.utils.daemonize') - @patch('salmon.server.SMTPReceiver') + @patch('salmon.server.AsyncSMTPReceiver') def test_log_command(self, MockSMTPReceiver, daemon_mock): runner = CliRunner() ms = MockSMTPReceiver() diff --git a/tests/server_tests.py b/tests/server_tests.py index 49cd19c..31f81ae 100644 --- a/tests/server_tests.py +++ b/tests/server_tests.py @@ -222,6 +222,7 @@ def sleepy(*args, **kwargs): receiver = server.QueueReceiver('run/queue', sleep=10, workers=1) with self.assertRaises(SleepCalled): receiver.start() + receiver.main_thread.join() self.assertEqual(receiver.workers.apply_async.call_count, 0) self.assertEqual(sleep_mock.call_count, 2) @@ -427,7 +428,7 @@ def test_multiple_rcpts(self): code, _ = client.rcpt("you@localhost") self.assertEqual(code, 250) code, _ = client.rcpt("them@localhost") - self.assertEqual(code, 451) + self.assertEqual(code, 250) class AsyncLmtpServerTestCase(SalmonTestCase): diff --git a/tests/utils_tests.py b/tests/utils_tests.py index a8eb808..b15d99d 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -21,7 +21,6 @@ def test_make_fake_settings(self): assert settings assert settings.receiver assert settings.relay is None - settings.receiver.close() def test_import_settings(self): assert utils.settings is None