Permalink
Browse files

Add unittest2 test discovery runner for Django.

  • Loading branch information...
prestontimmons committed Mar 30, 2013
1 parent b5e1e2e commit 68142a4ef14a5bcf7938eaa7279373ebd2de1ca1
View
@@ -1,8 +1,9 @@
*.egg-info
*.pot
*.py[co]
+*.swp
MANIFEST
dist/
docs/_build/
tests/coverage_html/
-tests/.coverage
+tests/.coverage
@@ -10,6 +10,15 @@
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
+ make_option('--installed', action='store_true', default=False,
+ help='Discover tests for INSTALLED_APPS.'),
+ make_option('--root', action='store', dest='root', default='.',
+ help='Root directory for unittest discovery.'),
+ make_option('--top-level', action='store', dest='top_level', default=None,
+ help='Top level of project for unittest discovery.'),
+ make_option('--pattern', action='store', dest='pattern',
+ default="test*.py",
+ help='The test matching pattern. Defaults to test*.py.'),
make_option('--noinput',
action='store_false', dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.'),
View
@@ -0,0 +1,273 @@
+from os.path import join, realpath
+from unittest import TestSuite
+
+from django.conf import settings
+from django.core.exceptions import ImproperlyConfigured
+from django.test import TestCase
+from django.test.utils import setup_test_environment, teardown_test_environment
+from django.utils.importlib import import_module
+from django.utils import unittest
+from django.utils.unittest.loader import defaultTestLoader
+
+
+class DiscoverRunner(object):
+ """
+ A Django test runner that uses unittest2 test discovery.
+ """
+
+ test_loader = defaultTestLoader
+ reorder_by = (TestCase, )
+
+ def __init__(self, root=None, pattern="test*.py", top_level=None,
+ verbosity=1, interactive=True, failfast=False,
+ installed=False, **kwargs):
+
+ self.root = root
+ self.pattern = pattern
+ self.top_level = top_level or root
+
+ self.verbosity = verbosity
+ self.interactive = interactive
+ self.failfast = failfast
+
+ self.installed = installed
+
+ def setup_test_environment(self, **kwargs):
+ setup_test_environment()
+ settings.DEBUG = False
+ unittest.installHandler()
+
+ def build_suite(self, test_labels=None, extra_tests=None, **kwargs):
+ suite = TestSuite()
+ root = self.root
+ top_level = self.top_level
+ test_labels = test_labels or []
+
+ if self.installed:
+ test_labels = settings.INSTALLED_APPS
+
+ apps = [x for x in test_labels if "." not in x]
+ labels = [x for x in test_labels if "." in x]
+
+ for app in apps:
+ root = import_module(app).__path__[0]
+ top_level = realpath(join(root, ".."))
+
+ suite.addTests(discover(
+ loader=self.test_loader,
+ root=import_module(app).__path__[0],
+ pattern=self.pattern,
+ top_level=top_level,
+ ))
+
+ if labels:
+ suite.addTests(self.test_loader.loadTestsFromNames(labels))
+
+ if not (apps or labels):
+ suite.addTests(discover(
+ loader=self.test_loader,
+ root=self.root,
+ pattern=self.pattern,
+ top_level=self.top_level,
+ ))
+
+ if extra_tests:
+ for test in extra_tests:
+ suite.addTest(test)
+
+ return reorder_suite(suite, self.reorder_by)
+
+ def setup_databases(self, **kwargs):
+ return setup_databases(self.verbosity, self.interactive, **kwargs)
+
+ def run_suite(self, suite, **kwargs):
+ return unittest.TextTestRunner(
+ verbosity=self.verbosity,
+ failfast=self.failfast,
+ ).run(suite)
+
+ def teardown_databases(self, old_config, **kwargs):
+ """
+ Destroys all the non-mirror databases.
+ """
+ old_names, mirrors = old_config
+ for connection, old_name, destroy in old_names:
+ if destroy:
+ connection.creation.destroy_test_db(old_name, self.verbosity)
+
+ def teardown_test_environment(self, **kwargs):
+ unittest.removeHandler()
+ teardown_test_environment()
+
+ def suite_result(self, suite, result, **kwargs):
+ return len(result.failures) + len(result.errors)
+
+ def run_tests(self, test_labels, extra_tests=None, **kwargs):
+ """
+ Run the unit tests for all the test labels in the provided list.
+
+ Test labels should be dotted Python paths to test modules, test
+ classes, or test methods.
+
+ A list of 'extra' tests may also be provided; these tests
+ will be added to the test suite.
+
+ Returns the number of tests that failed.
+ """
+ self.setup_test_environment()
+ suite = self.build_suite(test_labels, extra_tests)
+ old_config = self.setup_databases()
+ result = self.run_suite(suite)
+ self.teardown_databases(old_config)
+ self.teardown_test_environment()
+ return self.suite_result(suite, result)
+
+
+def discover(loader, root, pattern=None, top_level=None):
+ args = {}
+ args["start_dir"] = root
+ args["top_level_dir"] = top_level or root
+ if pattern:
+ args["pattern"] = pattern
+
+ return loader.discover(**args)
+
+
+def dependency_ordered(test_databases, dependencies):
+ """
+ Reorder test_databases into an order that honors the dependencies
+ described in TEST_DEPENDENCIES.
+ """
+ ordered_test_databases = []
+ resolved_databases = set()
+
+ # Maps db signature to dependencies of all it's aliases
+ dependencies_map = {}
+
+ # sanity check - no DB can depend on it's own alias
+ for sig, (_, aliases) in test_databases:
+ all_deps = set()
+ for alias in aliases:
+ all_deps.update(dependencies.get(alias, []))
+ if not all_deps.isdisjoint(aliases):
+ raise ImproperlyConfigured(
+ "Circular dependency: databases %r depend on each other, "
+ "but are aliases." % aliases)
+ dependencies_map[sig] = all_deps
+
+ while test_databases:
+ changed = False
+ deferred = []
+
+ # Try to find a DB that has all it's dependencies met
+ for signature, (db_name, aliases) in test_databases:
+ if dependencies_map[signature].issubset(resolved_databases):
+ resolved_databases.update(aliases)
+ ordered_test_databases.append((signature, (db_name, aliases)))
+ changed = True
+ else:
+ deferred.append((signature, (db_name, aliases)))
+
+ if not changed:
+ raise ImproperlyConfigured(
+ "Circular dependency in TEST_DEPENDENCIES")
+ test_databases = deferred
+ return ordered_test_databases
+
+
+def reorder_suite(suite, classes):
+ """
+ Reorders a test suite by test type.
+
+ `classes` is a sequence of types
+
+ All tests of type classes[0] are placed first, then tests of type
+ classes[1], etc. Tests with no match in classes are placed last.
+ """
+ class_count = len(classes)
+ bins = [unittest.TestSuite() for i in range(class_count+1)]
+ partition_suite(suite, classes, bins)
+ for i in range(class_count):
+ bins[0].addTests(bins[i+1])
+ return bins[0]
+
+
+def partition_suite(suite, classes, bins):
+ """
+ Partitions a test suite by test type.
+
+ classes is a sequence of types
+ bins is a sequence of TestSuites, one more than classes
+
+ Tests of type classes[i] are added to bins[i],
+ tests with no match found in classes are place in bins[-1]
+ """
+ for test in suite:
+ if isinstance(test, unittest.TestSuite):
+ partition_suite(test, classes, bins)
+ else:
+ for i in range(len(classes)):
+ if isinstance(test, classes[i]):
+ bins[i].addTest(test)
+ break
+ else:
+ bins[-1].addTest(test)
+
+
+def setup_databases(verbosity, interactive, **kwargs):
+ from django.db import connections, DEFAULT_DB_ALIAS
+
+ # First pass -- work out which databases actually need to be created,
+ # and which ones are test mirrors or duplicate entries in DATABASES
+ mirrored_aliases = {}
+ test_databases = {}
+ dependencies = {}
+ for alias in connections:
+ connection = connections[alias]
+ if connection.settings_dict['TEST_MIRROR']:
+ # If the database is marked as a test mirror, save
+ # the alias.
+ mirrored_aliases[alias] = (
+ connection.settings_dict['TEST_MIRROR'])
+ else:
+ # Store a tuple with DB parameters that uniquely identify it.
+ # If we have two aliases with the same values for that tuple,
+ # we only need to create the test database once.
+ item = test_databases.setdefault(
+ connection.creation.test_db_signature(),
+ (connection.settings_dict['NAME'], set())
+ )
+ item[1].add(alias)
+
+ if 'TEST_DEPENDENCIES' in connection.settings_dict:
+ dependencies[alias] = (
+ connection.settings_dict['TEST_DEPENDENCIES'])
+ else:
+ if alias != DEFAULT_DB_ALIAS:
+ dependencies[alias] = connection.settings_dict.get(
+ 'TEST_DEPENDENCIES', [DEFAULT_DB_ALIAS])
+
+ # Second pass -- actually create the databases.
+ old_names = []
+ mirrors = []
+
+ for signature, (db_name, aliases) in dependency_ordered(
+ test_databases.items(), dependencies):
+ test_db_name = None
+ # Actually create the database for the first connection
+
+ for alias in aliases:
+ connection = connections[alias]
+ old_names.append((connection, db_name, True))
+ if test_db_name is None:
+ test_db_name = connection.creation.create_test_db(
+ verbosity, autoclobber=not interactive)
+ else:
+ connection.settings_dict['NAME'] = test_db_name
+
+ for alias, mirror_alias in mirrored_aliases.items():
+ mirrors.append((alias, connections[alias].settings_dict['NAME']))
+ connections[alias].settings_dict['NAME'] = (
+ connections[mirror_alias].settings_dict['NAME'])
+
+ return old_names, mirrors
Oops, something went wrong.

0 comments on commit 68142a4

Please sign in to comment.