Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop locking on every handler call #153

Open
wants to merge 2 commits into
base: 🐟
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions docs/routing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ handlers should process the message:
1. Match all handlers whose ``@route`` decorator matches the ``to`` header
2. Iterate over these and call the following

1. any handlers that have been marked as ``@stateless``
2. the first (and only) stateful handler. If it returns a handler
- any handlers that have been marked as ``@stateless``
- the first (and only) stateful handler. If it returns a handler
reference, the state for that sender will be updated.

Keep in mind these could be called in any order and you should not rely on
them being called in a particular order.

3. If no valid handlers were found, the message is sent to the undeliverable
queue

Expand Down Expand Up @@ -135,6 +138,77 @@ our application might look like this:
"['myapp', 'user2@example.com']": <function START at 0x7f64194fa398>
}

Handlers and thread safety
^^^^^^^^^^^^^^^^^^^^^^^^^^

.. note::

This only applies to Salmon v4.0.0 or later. Previous version of Salmon
used a lock on every handler call, which reduced performance.

Salmon uses threads to process multiple incoming messages more efficiently.
This means it is important to write handlers in a thread-safe manner. If you're
already familiar with frameworks such as Django, this shouldn't be a surprise
to you.

Some APIs are already thread-safe, such as saving mail to a Maildir. Others can
be made thread-safe by being mindful of which parts are designed to be accessed
concurrently and which can't (such as SQLAlchemy's ``Session`` objects, as
noted in `their documentation
<https://docs.sqlalchemy.org/en/14/orm/session_basics.html#is-the-session-thread-safe>`_.
However, there are occasions where there is no way make the API work with
concurrent execution. For these situations you can either:

#. Use ``threading.RLock`` from Python's threading library on a block of unsafe code:

.. code-block:: python

from threading import RLock

from salmon.routing import route

LOCK = RLock()
HEADER_LOG_FILENAME = "somefile.txt"


@route(".*")
def START(message):
# do something that's thread-safe
header_names = "\n".join(message.keys())

# now append to a file
with LOCK:
with open(HEADER_LOG_FILENAME, "a") as log_file:
log_file.write(header_names)
log_file.write("\n")

This approach has the advantage that it can have a lesser impact on
performance, although that does come at the cost of code complexity.

#. Use the :func:`~salmon.routing.locking` decorator to lock on every call to that handler:

.. code-block:: python

from salmon.routing import locking, route

HEADER_LOG_FILENAME = "somefile.txt"


@route(".*")
@locking
def START(message):
# do something that's thread-safe
header_names = "\n".join(message.keys())

# now append to a file
with open(HEADER_LOG_FILENAME, "a") as log_file:
log_file.write(header_names)
log_file.write("\n")

Here, the whole function is called from within a lock rather than just the
thread-unsafe parts.


Stateless Processing
^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -211,4 +285,12 @@ capable. For example, Django's ORM could be used:

.. note::

This example is incomplete, it's only there to give an idea of how to implement a state storage class.
This example is incomplete, it's only there to give an idea of how to
implement a state storage class.

.. note::

State storage must be thread-safe. In this example, all the calls to
Django's ORM are either atomic (e.g. ``SalmonState.objects.get()``) or Django
automatically wraps them in a transaction (e.g.
``SalmonState.objects.all().delete()``)
4 changes: 1 addition & 3 deletions salmon/handlers/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
import logging

from salmon import handlers, queue
from salmon.routing import nolocking, route_like, stateless
from salmon.routing import route_like, stateless


@route_like(handlers.log.START)
@stateless
@nolocking
def START(message, to=None, host=None):
"""
@stateless and routes however handlers.log.START routes (everything).
Has @nolocking, but that's alright since it's just writing to a Maildir.
"""
logging.debug("MESSAGE to %s@%s added to queue.", to, host)
q = queue.Queue('run/queue')
Expand Down
123 changes: 54 additions & 69 deletions salmon/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
* @route_like -- Says that this function routes like another one.
* @stateless -- Indicates this function always runs on each route encountered, and
no state is maintained.
* @nolocking -- Use this if you want this handler to run parallel without any
locking around Salmon internals. SUPER DANGEROUS, add @stateless as well.
* @locking -- Use this if you want this handler to be run one call at a time.
* @state_key_generator -- Used on a function that knows how to make your state
keys for the module, for example if module_name + message.To is needed to maintain
state.
Expand All @@ -55,9 +54,12 @@
import shelve
import sys
import threading
import warnings

ROUTE_FIRST_STATE = 'START'
LOG = logging.getLogger("routing")
SALMON_SETTINGS_VARIABLE_NAME = "_salmon_settings"
_DEFAULT_VALUE = object()


def DEFAULT_STATE_KEY(mod, msg):
Expand Down Expand Up @@ -97,10 +99,11 @@ class MemoryStorage(StateStorage):
"""
The default simplified storage for the Router to hold the states. This
should only be used in testing, as you'll lose all your contacts and their
states if your server shuts down. It is also horribly NOT thread safe.
states if your server shuts down.
"""
def __init__(self):
self.states = {}
self.lock = threading.RLock()

def get(self, key, sender):
key = self.key(key, sender)
Expand All @@ -110,14 +113,15 @@ def get(self, key, sender):
return ROUTE_FIRST_STATE

def set(self, key, sender, state):
key = self.key(key, sender)
if state == ROUTE_FIRST_STATE:
try:
del self.states[key]
except KeyError:
pass
else:
self.states[key] = state
with self.lock:
key = self.key(key, sender)
if state == ROUTE_FIRST_STATE:
try:
del self.states[key]
except KeyError:
pass
else:
self.states[key] = state

def key(self, key, sender):
return repr([key, sender])
Expand All @@ -138,8 +142,8 @@ class ShelveStorage(MemoryStorage):
"""
def __init__(self, database_path):
"""Database path depends on the backing library use by Python's shelve."""
super().__init__()
self.database_path = database_path
self.lock = threading.RLock()

def get(self, key, sender):
"""
Expand Down Expand Up @@ -190,22 +194,11 @@ class RoutingBase:

in your settings module.

RoutingBase does locking on every write to its internal data (which usually
only happens during booting and reloading while debugging), and when each
handler's state function is called. ALL threads will go through this lock,
but only as each state is run, so you won't have a situation where the chain
of state functions will block all the others. This means that while your
handler runs nothing will be running, but you have not guarantees about
the order of each state function.

However, this can kill the performance of some kinds of state functions,
so if you find the need to not have locking, then use the @nolocking
decorator and the Router will NOT lock when that function is called. That
means while your @nolocking state function is running at least one other
thread (more if the next ones happen to be @nolocking) could also be
running.

It's your job to keep things straight if you do that.
RoutingBase assumes that both your STATE_STORE and handlers are
thread-safe. For handlers that cannot be made thread-safe, use @locking and
RoutingBase will use locks to make sure that handler is only called one
call at a time. Please note that this will have a negative impact on
performance.

NOTE: See @state_key_generator for a way to change what the key is to
STATE_STORE for different state control options.
Expand Down Expand Up @@ -351,11 +344,11 @@ def deliver(self, message):
for func, matchkw in self._collect_matches(message):
LOG.debug("Matched %r against %s.", message.To, func.__name__)

if salmon_setting(func, 'nolocking'):
self.call_safely(func, message, matchkw)
else:
if salmon_setting(func, 'locking'):
with self.call_lock:
self.call_safely(func, message, matchkw)
else:
self.call_safely(func, message, matchkw)

called_count += 1

Expand Down Expand Up @@ -516,29 +509,25 @@ def parse_format(self, format, captures):

def setup_accounting(self, func):
"""Sets up an accounting map attached to the func for routing decorators."""
attach_salmon_settings(func)
func._salmon_settings['format'] = self.format
func._salmon_settings['captures'] = self.captures
salmon_setting(func, 'format', self.format)
salmon_setting(func, 'captures', self.captures)


def salmon_setting(func, key):
"""Simple way to get the salmon setting off the function, or None."""
return func._salmon_settings.get(key)
def salmon_setting(func, key, value=_DEFAULT_VALUE):
"""Get or set a salmon setting on a handler function"""
try:
salmon_settings = getattr(func, SALMON_SETTINGS_VARIABLE_NAME)
except AttributeError:
salmon_settings = {}
setattr(func, SALMON_SETTINGS_VARIABLE_NAME, salmon_settings)
if value is not _DEFAULT_VALUE:
salmon_settings[key] = value
else:
return salmon_settings.get(key)


def has_salmon_settings(func):
return "_salmon_settings" in func.__dict__


def assert_salmon_settings(func):
"""Used to make sure that the func has been setup by a routing decorator."""
assert has_salmon_settings(func), "Function %s has not be setup with a @route first." % func.__name__


def attach_salmon_settings(func):
"""Use this to setup the _salmon_settings if they aren't already there."""
if '_salmon_settings' not in func.__dict__:
func._salmon_settings = {}
return hasattr(func, SALMON_SETTINGS_VARIABLE_NAME)


class route_like(route):
Expand All @@ -548,10 +537,10 @@ class route_like(route):
modules.
"""
def __init__(self, func):
if not has_salmon_settings(func):
self.format = salmon_setting(func, 'format')
self.captures = salmon_setting(func, 'captures')
if self.format is None or self.captures is None:
raise TypeError("{} is missing a @route".format(func))
self.format = func._salmon_settings['format']
self.captures = func._salmon_settings['captures']


def stateless(func):
Expand All @@ -566,30 +555,26 @@ def stateless(func):

Stateless handlers are NOT guaranteed to run before the handler with state.
"""
if has_salmon_settings(func) and salmon_setting(func, 'format'):
raise TypeError("You must use @stateless after @route or @route_like")
salmon_setting(func, 'stateless', True)
return func

attach_salmon_settings(func)
func._salmon_settings['stateless'] = True

def nolocking(func):
"""
Does nothing, as no locking is the default now
"""
warnings.warn("@nolocking is redundant and can be safely removed from your handler %s" % func,
category=DeprecationWarning, stacklevel=2)
return func


def nolocking(func):
def locking(func):
"""
Normally salmon.routing.Router has a lock around each call to all handlers
to prevent them from stepping on each other. It's assumed that 95% of the
time this is what you want, so it's the default. You probably want
everything to go in order and not step on other things going off from other
threads in the system.

However, sometimes you know better what you are doing and this is where
@nolocking comes in. Put this decorator on your state functions that you
don't care about threading issues or that you have found a need to
manually tune, and it will run it without any locks.
Salmon assumes your handlers are thread-safe, but is not always the case.
Put this decorator on any state functions that are not thread-safe for
whatever reason.
"""
attach_salmon_settings(func)
func._salmon_settings['nolocking'] = True
salmon_setting(func, 'locking', True)
return func


Expand Down
3 changes: 1 addition & 2 deletions salmon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None, workers
"""
The router should be fully configured and ready to work, the queue_dir
can be a fully qualified path or relative. The option workers dictates
how many threads are started to process messages. Consider adding
``@nolocking`` to your handlers if you are able to.
how many threads are started to process messages.
"""
self.queue = queue.Queue(queue_dir, pop_limit=size_limit,
oversize_dir=oversize_dir)
Expand Down
5 changes: 5 additions & 0 deletions salmon/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,8 @@ def assert_in_state(module, To, From, state):
state_key = routing.Router.state_key(module, fake)
assert routing.Router.STATE_STORE.get(state_key, From) == state, \
"%r != %r" % (routing.Router.STATE_STORE.get(state_key, From), state)


def assert_salmon_settings(func):
"""Used to make sure that the func has been setup by a routing decorator."""
assert routing.has_salmon_settings(func), "Function %s has not be setup with a @route first." % func.__name__
3 changes: 1 addition & 2 deletions tests/data/test_app/test_handlers/dump.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from salmon import queue
from salmon.routing import nolocking, route, stateless
from salmon.routing import route, stateless
from salmon.utils import settings


@route("(to)@(host)", to=".+", host="example.com")
@stateless
@nolocking
def START(message, to=None, host=None):
inbox = queue.Queue(settings.QUEUE_PATH)
inbox.push(message)
Loading