Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

executable file 437 lines (370 sloc) 14.239 kb
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import shutil
import subprocess
import sys
import textwrap
import unittest
from os.path import join as joinpath
class TestFixture(unittest.TestCase):
_maildir_dir = "test.maildir.%d" % os.getpid()
_testdata_dir = "test.testdata.%d" % os.getpid()
_testmail_data_a = [
"From: Sender <sender@example.com>\n",
"To: Recipient <recipient@example.com>\n",
"Cc: Carbon Copy <carbon.copy@example.com>\n",
"Subject: A subject with =?utf8?b?csOkdnNtw7ZyZ8Olcw==?=\n",
"Delivered-To: Alpha\n",
"X-BeenThere: Bravo\n",
"X-Mailing-List: Charlie\n",
"Mailing-List: Delta\n",
"\n",
"Body. R\xc3\xa4vsm\xc3\xb6rg\xc3\xa5s.\n",
]
_testmail_data_bad_header = [
"From: Sender <sender@example.com>\n",
"To: Recipient <recipient@example.com>\n",
"Subject: =?ISO-2022-JP?B?gYqBiQ=?=\n",
"\n",
"Body.\n",
]
def setUp(self):
shutil.rmtree(self._testdata_dir, True)
os.mkdir(self._testdata_dir)
shutil.rmtree(self._maildir_dir, True)
os.mkdir(self._maildir_dir)
def tearDown(self):
shutil.rmtree(self._testdata_dir, True)
shutil.rmtree(self._maildir_dir, True)
def makedirs(self, maildirs):
for md in maildirs:
for d in ["cur", "new", "tmp"]:
os.makedirs(joinpath(self._maildir_dir, md, d))
def create(self, name, dest):
fp = open(joinpath(self._maildir_dir, dest), "w")
fp.write("".join(getattr(self, "_testmail_data_%s" % name)))
fp.close()
def run_mdp(self, rc, maildirs, maildir_base=""):
if maildir_base is None:
extra_argv = []
elif maildir_base == "":
extra_argv = ["-b", self._maildir_dir]
else:
extra_argv = ["-b", maildir_base]
argv = [
"./%s" % self._maildirproc_name,
"--once",
"-l", "/dev/null",
"-r", "-",
] + extra_argv
for maildir in maildirs:
argv.extend(["-m", maildir])
sh_cmd = "-c 'a=%s/delivered; echo $1 >$a; cat >>$a'" % (
self._testdata_dir)
p = subprocess.Popen(
argv,
stdin=subprocess.PIPE,
env={
"SENDMAIL": "/bin/sh",
"SENDMAILFLAGS": sh_cmd,
},
)
p.communicate(textwrap.dedent(rc))
p.wait()
assert(p.returncode == 0)
def verify_result(self, result):
actual = {}
for maildir in os.listdir(self._maildir_dir):
for subdir in ["cur", "new"]:
path = joinpath(self._maildir_dir, maildir, subdir)
number = len(os.listdir(path))
if number > 0:
actual[joinpath(maildir, subdir)] = number
self.assertEqual(result, actual)
class TestSuite(TestFixture):
def __init__(self, method_name, use_python2, maildirproc_name):
TestFixture.__init__(self, method_name)
self._use_python2 = use_python2
self._maildirproc_name = maildirproc_name
def fix_u(self, script):
return script.replace("__u__", "u" if self._use_python2 else "")
###########################################################################
# Basics.
def test_empty(self):
rc = ""
self.makedirs(["incoming"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"incoming/new": 1})
def test_default_maildir_base(self):
rc = '''
for mail in processor:
mail.move("%s/dest")
''' % self._maildir_dir
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/cur/a")
self.create("a", "incoming/new/b")
self.run_mdp(rc, [joinpath(self._maildir_dir, "incoming")], None)
self.verify_result({"dest/cur": 1, "dest/new": 1})
for x in ["incoming/tmp", "incoming/cur", "incoming/new", "dest/tmp"]:
self.assertEqual(
len(os.listdir(joinpath(self._maildir_dir, x))),
0)
def test_move(self):
rc = '''
for mail in processor:
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/cur/a")
self.create("a", "incoming/new/b")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/cur": 1, "dest/new": 1})
for x in ["incoming/tmp", "incoming/cur", "incoming/new", "dest/tmp"]:
self.assertEqual(
len(os.listdir(joinpath(self._maildir_dir, x))),
0)
def test_move_keeps_flags(self):
rc = '''
for mail in processor:
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/cur/a:2,x")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/cur": 1})
filename = os.listdir(joinpath(self._maildir_dir, "dest/cur"))[0]
self.assertNotEqual(filename, "a:2,x")
self.assert_(filename.endswith(":2,x"))
def test_copy(self):
rc = '''
for mail in processor:
mail.copy("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/cur/a")
self.create("a", "incoming/new/b")
self.run_mdp(rc, ["incoming"])
self.verify_result(
{"incoming/cur": 1,
"incoming/new": 1,
"dest/cur": 1,
"dest/new": 1})
def test_copy_keeps_flags(self):
rc = '''
for mail in processor:
mail.copy("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/cur/a:2,x")
self.run_mdp(rc, ["incoming"])
self.verify_result({"incoming/cur": 1, "dest/cur": 1})
filename = os.listdir(joinpath(self._maildir_dir, "dest/cur"))[0]
self.assertNotEqual(filename, "a:2,x")
self.assert_(filename.endswith(":2,x"))
def test_forward(self):
rc = '''
for mail in processor:
mail.forward("foo@example.com")
'''
self.makedirs(["incoming"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({})
expected = ["foo@example.com\n"] + self._testmail_data_a
fp = open(joinpath(self._testdata_dir, "delivered"))
lines = fp.readlines()
self.assertEqual(lines, expected)
def test_forward_copy(self):
rc = '''
for mail in processor:
mail.forward_copy("foo@example.com")
'''
self.makedirs(["incoming"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"incoming/new": 1})
expected = ["foo@example.com\n"] + self._testmail_data_a
fp = open(joinpath(self._testdata_dir, "delivered"))
lines = fp.readlines()
self.assertEqual(lines, expected)
def test_delete(self):
rc = '''
for mail in processor:
mail.delete()
'''
self.makedirs(["incoming"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({})
def test_multiple_mail(self):
rc = '''
for mail in processor:
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
for i in range(10):
self.create("a", "incoming/new/%d" % i)
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 10})
###########################################################################
# Mail attributes.
def test_mail_path(self):
rc = '''
for mail in processor:
if mail.path.endswith("incoming/new/a"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
###########################################################################
# Headers.
def test_ascii_header(self):
rc = '''
for mail in processor:
if mail["From"] == "Sender <sender@example.com>":
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
def test_utf8_header(self):
rc = self.fix_u('''
for mail in processor:
if mail["Subject"].matches(
__u__"A subject with\\s+r\\xe4vsm\\xf6rg\\xe5s"):
mail.move("dest")
''')
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
def test_bad_header(self):
rc = self.fix_u('''
for mail in processor:
if mail["subject"] == __u__"=?ISO-2022-JP?B?gYqBiQ=?=":
mail.move("dest")
''')
self.makedirs(["incoming", "dest"])
self.create("bad_header", "incoming/new/bad_header")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
###########################################################################
# Matchers.
def test_negative_header_matches(self):
rc = '''
for mail in processor:
if mail["from"].matches("^sender@example\\.com"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"incoming/new": 1})
def test_positive_header_matches(self):
rc = '''
for mail in processor:
if mail["from"].matches("sender@example\\.com"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
def test_negative_header_contains(self):
rc = '''
for mail in processor:
if mail["from"].contains("foo"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"incoming/new": 1})
def test_positive_header_contains(self):
rc = '''
for mail in processor:
if mail["from"].contains("sender@example.com"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 1})
def test_target_contains(self):
rc = '''
for mail in processor:
if mail.target.contains("recipient@example.com"):
mail.copy("dest")
if mail.target.contains("carbon.copy@example.com"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 2})
def test_target_matches(self):
rc = '''
for mail in processor:
if mail.target.matches("recipient@example\\.com"):
mail.copy("dest")
if mail.target.matches("carbon\\.copy@example\\.com"):
mail.move("dest")
'''
self.makedirs(["incoming", "dest"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({"dest/new": 2})
def test_from_mailing_list(self):
rc = '''
for mail in processor:
if mail.from_mailing_list("recipient@example.com"):
mail.copy("to")
if mail.from_mailing_list("carbon.copy@example.com"):
mail.copy("cc")
if mail.from_mailing_list("alpha"):
mail.copy("delivered-to")
if mail.from_mailing_list("bravo"):
mail.copy("x-beenthere")
if mail.from_mailing_list("charlie"):
mail.copy("x-mailing-list")
if mail.from_mailing_list("delta"):
mail.copy("mailing-list")
'''
self.makedirs(
["incoming", "to", "cc", "delivered-to",
"x-beenthere", "x-mailing-list", "mailing-list"])
self.create("a", "incoming/new/a")
self.run_mdp(rc, ["incoming"])
self.verify_result({
"incoming/new": 1,
"delivered-to/new": 1,
"x-beenthere/new": 1,
"x-mailing-list/new": 1,
"mailing-list/new": 1,
})
class Python2TestSuite(TestSuite):
def __init__(self, method_name):
TestSuite.__init__(self, method_name, True, "maildirproc-python2")
class Python3TestSuite(TestSuite):
def __init__(self, method_name):
TestSuite.__init__(self, method_name, False, "maildirproc")
######################################################################
test_runner = unittest.TextTestRunner()
test_loader = unittest.defaultTestLoader
test_suite = unittest.TestSuite()
tests = [Python2TestSuite, Python3TestSuite]
args = sys.argv[1:]
if args:
for arg in args:
for test in tests:
test_suite.addTest(test(arg))
else:
for test in tests:
test_suite.addTests(test_loader.loadTestsFromTestCase(test))
result = test_runner.run(test_suite)
sys.exit(0 if result.wasSuccessful() else 1)
Jump to Line
Something went wrong with that request. Please try again.