Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asyncio receivers #134

Draft
wants to merge 4 commits into
base: 🐟
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
language: python
dist: xenial

python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
- pypy3.5-6.0

matrix:
include:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Salmon has just had some major changes to modernise the code-base. The main
APIs should be compatible with releases prior to 3.0.0, but there's no
guarantee that older applications won't need changes.

Python versions supported are: 3.5, 3.6, 3.7 and 3.8.
Python versions supported are: 3.6, 3.7 and 3.8.

See the CHANGELOG for more details on what's changed since Salmon version 2.

Expand Down
12 changes: 6 additions & 6 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ By deafult, all configuration happens in ``config/``
^^^^^^^^^^^

This file is used by Salmon during start-up to configure the daemon with
various things, such as starting the ``LMTPReceiver``. It's a bit like the
various things, such as starting the ``AsyncLMTPReceiver``. It's a bit like the
``wsgi.py`` file that Python web apps have. If you want to use a different boot
module, you can specify it with the ``--boot`` argument. E.g. to use
``myapp/othermodule.py``, do:
Expand Down Expand Up @@ -144,18 +144,18 @@ much in the same way as you host a WSGI application behind Apache or Nginx.

As seen above, a new Salmon project will start a LMTP server that listens on
``localhost:8823``. You can go into ``config/settings.py`` and change the host
and port Salmon uses. You can also switch out ``LMTPReceiver`` for
``SMTPReceiver`` if you require Salmon to use SMTP instead.
and port Salmon uses. You can also switch out ``AsyncLMTPReceiver`` for
``AsyncSMTPReceiver`` if you require Salmon to use SMTP instead.

.. warning::

Due to the way Salmon has been implemented it is better suited as a LMTP
server than a SMTP server. ``SMTPReceiver`` is unable to handle multiple
server than a SMTP server. ``AsyncSMTPReceiver`` is unable to handle multiple
recipients in one transaction as it doesn't implement the nessessary
features to properly implement this part of the SMTP protocol. This is a
compromise ``SMTPReceiver`` makes in order to allow users more freedom in
compromise ``AsyncSMTPReceiver`` makes in order to allow users more freedom in
what they do in their handlers.


``LMTPReceiver`` is unaffected by this issue and implements the LMTP
``AsyncLMTPReceiver`` is unaffected by this issue and implements the LMTP
protocol fully.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Contents:
routing
relaying
mail_objects
receivers
commandline
Salmon API guide <salmon>

Expand Down
41 changes: 41 additions & 0 deletions docs/receivers.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Receivers
=========

Salmon comes with a number of receivers

Asyncio based receivers
-----------------------

Salmon's default receivers based on `aiosmtpd <https://github.com/aio-libs/aiosmtpd>`__.

.. note::
Although these receivers use Asyncio, your handlers are executed in a
separate thread to allow them to use synchronous code that is easier to
write and maintain.

:class:`~salmon.server.AsyncLMTPReceiver`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:class:`~salmon.server.AsyncSMTPReceiver`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Asyncore based receivers
------------------------

Salmon's original receivers based on Python's ``smtpd`` library.

:class:`~salmon.server.LMTPReceiver`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:class:`~salmon.server.SMTPReceiver`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. warning::
This receiver is not suitable for use on an Internet facing port. Try
`AsyncSMTPReceiver`_ instead.

Other receivers
---------------

:class:`~salmon.server.QueueReceiver`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
6 changes: 3 additions & 3 deletions salmon/data/prototype/config/boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from salmon import queue
from salmon.routing import Router
from salmon.server import LMTPReceiver, Relay
from salmon.server import AsyncLMTPReceiver, Relay

from . import settings

Expand All @@ -13,8 +13,8 @@
port=settings.relay_config['port'], debug=1)

# where to listen for incoming messages
settings.receiver = LMTPReceiver(settings.receiver_config['host'],
settings.receiver_config['port'])
settings.receiver = AsyncLMTPReceiver(hostname=settings.receiver_config['host'],
port=settings.receiver_config['port'])

Router.defaults(**settings.router_defaults)
Router.load(settings.handlers)
Expand Down
140 changes: 107 additions & 33 deletions salmon/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
to do some serious surgery go use that. This works as a good
API for the 90% case of "put mail in, get mail out" queues.
"""
import contextlib
import errno
import fcntl
import hashlib
import logging
import mailbox
import os
import socket
import time
import json

from salmon import mail

Expand Down Expand Up @@ -42,14 +45,6 @@ def _create_tmp(self):
raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)


class QueueError(Exception):

def __init__(self, msg, data):
Exception.__init__(self, msg)
self._message = msg
self.data = data


class Queue:
"""
Provides a simplified API for dealing with 'queues' in Salmon.
Expand Down Expand Up @@ -89,8 +84,10 @@ def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None):

self.oversize_dir = os.path.join(oversize_dir, "new")

if not os.path.exists(self.oversize_dir):
try:
os.mkdir(self.oversize_dir)
except FileExistsError:
pass
else:
self.oversize_dir = None

Expand All @@ -104,6 +101,16 @@ def push(self, message):
message = str(message)
return self.mbox.add(message)

def _move_oversize(self, key, name):
if self.oversize_dir:
logging.info("Message key %s over size limit %d, moving to %s.",
key, self.pop_limit, self.oversize_dir)
os.rename(name, os.path.join(self.oversize_dir, key))
else:
logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
key, self.pop_limit)
os.unlink(name)

def pop(self):
"""
Pops a message off the queue, order is not really maintained
Expand All @@ -115,21 +122,10 @@ def pop(self):
over, over_name = self.oversize(key)

if over:
if self.oversize_dir:
logging.info("Message key %s over size limit %d, moving to %s.",
key, self.pop_limit, self.oversize_dir)
os.rename(over_name, os.path.join(self.oversize_dir, key))
else:
logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
key, self.pop_limit)
os.unlink(over_name)
self._move_oversize(key, over_name)
else:
try:
msg = self.get(key)
except QueueError as exc:
raise exc
finally:
self.remove(key)
msg = self.get(key)
self.remove(key)
return key, msg

return None, None
Expand All @@ -149,11 +145,11 @@ def get(self, key):
try:
return mail.MailRequest(self.dir, None, None, msg_data)
except Exception as exc:
logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data)
logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data)
return None

def remove(self, key):
"""Removes the queue, but not returned."""
"""Removes key the queue."""
self.mbox.remove(key)

def __len__(self):
Expand All @@ -166,15 +162,8 @@ def __len__(self):
def clear(self):
"""
Clears out the contents of the entire queue.

Warning: This could be horribly inefficient since it pops messages
until the queue is empty. It could also cause an infinite loop if
another process is writing to messages to the Queue faster than we can
pop.
"""
# man this is probably a really bad idea
while len(self) > 0:
self.pop()
self.mbox.clear()

def keys(self):
"""
Expand All @@ -188,3 +177,88 @@ def oversize(self, key):
return os.path.getsize(file_name) > self.pop_limit, file_name
else:
return False, None


class Metadata:
def __init__(self, path):
self.path = os.path.join(path, "metadata")
self.meta_file = None
try:
os.mkdir(self.path)
except FileExistsError:
pass

def get(self):
return json.load(self.meta_file)

def set(self, key, data):
json.dump(self.meta_file, data)

def remove(self):
os.unlink(self.meta_file)

def clear(self):
raise NotImplementedError

@contextlib.contextmanager
def lock(self, key, mode="r"):
i = 0
try:
self.meta_file = open(os.path.join(self.path, key), mode)
except FileNotFoundError:
pass
else:
while True:
# try for a lock using exponential backoff
try:
fcntl.flock(self.meta_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
if i > 5:
# 2**5 is 30 seconds which is far too long
raise
time.sleep(2**i)
i += 1
else:
break

try:
yield self
finally:
if self.meta_file is not None:
fcntl.flock(self.meta_file, fcntl.LOCK_UN)
self.meta_file.close()
self.meta_file = None


class QueueWithMetadata(Queue):
"""Just like Queue, except it stores envelope data"""
def push(self, message, Peer, From, To):
if not isinstance(To, list):
To = [To]
key = super().push(message)
with Metadata(self.dir).lock(key, "w") as metadata:
metadata.set(key, {"Peer": Peer, "From": From, "To": To})
return key

def get(self, key):
with Metadata(self.dir).lock(key) as metadata:
msg = super().get(key)
data = metadata.get(key)
# move data from metadata to msg obj
for k, v in data.items():
setattr(msg, k, v)
data["To"].remove(msg.To)
metadata.set(data)
return msg

def remove(self, key):
with Metadata(self.dir).lock(key) as metadata:
data = metadata.get(key)
# if there's still a To to be processed, leave the message on disk
if not data.get("To"):
super().remove(key)
metadata.remove()

def clear(self):
Metadata(self.dir).clear()
super().clear()