Skip to content
This repository has been archived by the owner on Oct 31, 2020. It is now read-only.

Commit

Permalink
Add verp policy
Browse files Browse the repository at this point in the history
improve and test LookupPolicy
  • Loading branch information
icgood committed Aug 20, 2017
1 parent a9c8336 commit 1fbb763
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 11 deletions.
9 changes: 9 additions & 0 deletions slimta/lookup/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class LookupBase(object):
"""

def _format_key(self, key_template, kwargs):
kwargs = kwargs.copy()
while True:
try:
return key_template.format(**kwargs)
except KeyError as exc:
key = exc.args[0]
kwargs[key] = '{'+key+'}'

def lookup(self, **kwargs):
"""The keyword arguments will be used by the lookup driver to return a
dictionary-like object that will be used to affect actions or policy.
Expand Down
5 changes: 2 additions & 3 deletions slimta/lookup/drivers/dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ def __init__(self, backend, key_template):
self.key_template = key_template

def lookup(self, **kwargs):
ret = None
key = self._format_key(self.key_template, kwargs)
try:
key = self.key_template.format(**kwargs)
ret = self.backend[key]
except KeyError:
pass
ret = None
self.log(__name__, kwargs, ret)
return ret

Expand Down
5 changes: 1 addition & 4 deletions slimta/lookup/drivers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ def _hash_lookup(self, key):
return self.redis.hgetall(key)

def lookup(self, **kwargs):
try:
key = self.key_template.format(**kwargs)
except KeyError:
return
key = self._format_key(self.key_template, kwargs)
ret = self._key_lookup(key)
self.log(__name__, kwargs, ret)
return ret
Expand Down
31 changes: 27 additions & 4 deletions slimta/lookup/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,41 @@ def _add_headers(self, envelope, headers_raw):
for key, val in headers.items():
envelope.prepend_header(key, val)

def _verp_enc(self, address, on_domain):
localpart, _, domain = address.rpartition('@')
if localpart:
return '{0!s}={1!s}@{2!s}'.format(localpart, domain, on_domain)
else:
return '{0!s}@{1!s}'.format(domain, on_domain)

def _get_alias(self, address, alias):
if '@' in alias:
return alias
else:
localpart, _, domain = address.rpartition('@')
if localpart:
return '{0!s}@{1!s}'.format(localpart, alias)
else:
return alias

def apply(self, envelope):
if self.on_sender:
ret = self.lookup.lookup_address(envelope.sender)
ret = self.lookup.lookup_address(envelope.sender) or {}
if 'verp' in ret:
envelope.sender = self._verp_enc(envelope.sender, ret['verp'])
if 'alias' in ret:
envelope.sender = ret['alias']
envelope.sender = self._get_alias(
envelope.sender, ret['alias'])
if 'add_headers' in ret:
self._add_headers(envelope, ret['add_headers'])
if self.on_rcpts:
for i, rcpt in enumerate(envelope.recipients):
ret = self.lookup.lookup_address(rcpt)
ret = self.lookup.lookup_address(rcpt) or {}
if 'verp' in ret:
envelope.recipients[i] = self._verp_enc(rcpt, ret['verp'])
if 'alias' in ret:
envelope.recipients[i] = ret['alias']
envelope.recipients[i] = self._get_alias(
rcpt, ret['alias'])
if 'add_headers' in ret:
self._add_headers(envelope, ret['add_headers'])

Expand Down
42 changes: 42 additions & 0 deletions test/test_slimta_lookup_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

from mox3.mox import MoxTestBase, IsA

from slimta.envelope import Envelope
from slimta.lookup.policy import LookupPolicy
from slimta.lookup.drivers.dict import DictLookup


class TestDictLookup(MoxTestBase):

def setUp(self):
super(TestDictLookup, self).setUp()
self.data = {}
self.drv = DictLookup(self.data, '{address}')
self.policy = LookupPolicy(self.drv, True, True)

def test_verp(self):
self.data['sender@example.com'] = {'verp': 'verp.com'}
self.data['rcpt2@example.com'] = {'verp': 'verp.com'}
env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com'])
self.policy.apply(env)
self.assertEquals('sender=example.com@verp.com', env.sender)
self.assertEquals(['rcpt1@example.com', 'rcpt2=example.com@verp.com'], env.recipients)

def test_alias(self):
self.data['sender@example.com'] = {'alias': 'sender@other.com'}
self.data['rcpt2@example.com'] = {'alias': 'other.com'}
env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com'])
self.policy.apply(env)
self.assertEquals('sender@other.com', env.sender)
self.assertEquals(['rcpt1@example.com', 'rcpt2@other.com'], env.recipients)

def test_add_headers(self):
self.data['sender@example.com'] = {'add_headers': '{"X-Test-A": "one"}'}
self.data['rcpt2@example.com'] = {'add_headers': '{"X-Test-B": "two"}'}
env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com'])
env.parse('\n\n')
self.policy.apply(env)
self.assertEquals('one', env.headers['x-test-a'])
self.assertEquals('two', env.headers['x-test-b'])

# vim:et:fdm=marker:sts=4:sw=4:ts=4:tw=0

0 comments on commit 1fbb763

Please sign in to comment.