Skip to content

Commit

Permalink
[tests] Create testing framework
Browse files Browse the repository at this point in the history
  • Loading branch information
stbuehler committed Oct 2, 2010
1 parent b432e35 commit d32de32
Show file tree
Hide file tree
Showing 10 changed files with 800 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -10,3 +10,5 @@ SET(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
ENABLE_TESTING()

ADD_SUBDIRECTORY(src build)

add_subdirectory(tests)
1 change: 1 addition & 0 deletions configure.ac
Expand Up @@ -347,5 +347,6 @@ AC_CONFIG_FILES([Makefile \
src/modules/Makefile \
src/unittests/Makefile \
src/lighttpd2.pc \
tests/Makefile \
])
AC_OUTPUT
2 changes: 2 additions & 0 deletions tests/.gitignore
@@ -0,0 +1,2 @@
*.pyc
tmp*
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
@@ -0,0 +1,4 @@

cmake_policy(VERSION 2.6.4)

add_test(NAME http COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/runtests.py --angel $<TARGET_FILE:lighttpd2> --worker $<TARGET_FILE:lighttpd2-worker> --plugindir $<TARGET_FILE_DIR:lighttpd2>)
347 changes: 347 additions & 0 deletions tests/base.py
@@ -0,0 +1,347 @@
# -*- coding: utf-8 -*-

import os
import imp
import sys
import traceback

from service import *

__all__ = [ "Env", "Tests", "TestBase" ]

class Dict(object):
pass

Env = Dict()


def fix_test_name(name):
if None == name: return '/'
if (name[:1] != '/'): name = '/' + name
if (name[-1:] != '/'): name = name + '/'
return name

def load_test_file(name):
path = os.path.join(Env.sourcedir, name)
file = open(path)
(modname, ext) = os.path.splitext(name)
module = imp.load_module(modname, file, path, (ext, 'r', imp.PY_SOURCE))
file.close()
return module

def vhostname(testname):
return testname[1:-1].replace('/', '.')

# basic interface
class TestBase(object):
config = "defaultaction;"
name = None
vhost = None
runnable = True

def __init__(self):
self._test_cleanup_files = []
self._test_cleanup_dirs = []
self._test_failed = False # "not run" is "successful"

# internal methods, do not override
def _register(self, tests):
self.tests = tests
if not self.vhost: self.vhost = vhostname(self.name)
self.vhostdir = os.path.join(Env.dir, 'www', 'vhosts', self.vhost)
tests.add_test(self)

def _prepare(self):
self.Prepare()
if None != self.config:
errorlog = self.PrepareFile("log/error.log-%s" % self.vhost, "")
accesslog = self.PrepareFile("log/access.log-%s" % self.vhost, "")
config = """
var.vhosts = var.vhosts + [ "%s" : ${
log = [ "*": "file:%s" ];
accesslog = "%s";
%s
}
];
""" % (self.vhost, errorlog, accesslog, self.config)
self.tests.append_config(config)

def _run(self):
failed = False
print >> Env.log, "[Start] Running test %s" % (self.name)
try:
if not self.Run():
failed = True
print >> sys.stderr, "Test %s failed" % (self.name)
except Exception as e:
failed = True
print >> sys.stderr, "Test %s failed:" % (self.name)
print >> sys.stderr, traceback.format_exc(10)
print >> Env.log, "[Done] Running test %s [result=%s]" % (self.name, failed and "Failed" or "Succeeded")
self._test_failed = failed
return not failed

def _cleanup(self):
if Env.no_cleanup or (not Env.force_cleanup and self._test_failed):
return
self.Cleanup()
for f in self._test_cleanup_files:
self._cleanupFile(f)
for d in self._test_cleanup_dirs:
self._cleanupDir(d)

def _cleanupFile(self, fname):
self.tests.CleanupFile(fname)

def _cleanupDir(self, dirname):
self.tests.CleanupDir(dirname)

# public
def PrepareVHostFile(self, fname, content):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
fname = 'www/vhosts/' + self.vhost + '/' + fname
return self.tests.PrepareFile(fname, content)

def PrepareFile(self, fname, content):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
self._test_cleanup_files.append(fname)
return self.tests.PrepareFile(fname, content)

def PrepareDir(self, dirname):
"""remembers which directories have been prepared and while remove them on cleanup; returns absolute pathname"""
self._test_cleanup_dirs.append(fname)
return self.tests.PrepareDir(dirname)


# implement these yourself
def Prepare(self):
pass

def Run(self):
raise BaseException("Test '%s' not implemented yet" % self.name)

def Cleanup(self):
pass

class Tests(object):
def __init__(self):
self.tests_filter = []
if 0 == len(Env.tests):
self.tests_filter.append("")
else:
for t in Env.tests:
self.tests_filter.append(fix_test_name(t))

self.services = []
self.run = [] # tests we want to run
self.tests = [] # all tests (we always prepare/cleanup all tests)
self.tests_dict = { }
self.config = None

self.prepared_dirs = { }
self.prepared_files = { }

self.failed = False

self.add_service(Lighttpd())

def add_test(self, test):
name = test.name
if self.tests_dict.has_key(name):
raise BaseException("Test '%s' already defined" % name)
self.tests_dict[name] = test
for f in self.tests_filter:
if name.startswith(f):
if test.runnable:
self.run.append(test)
break
self.tests.append(test)

def add_service(self, service):
service.tests = self
self.services.append(service)

def append_config(self, config):
if None == self.config:
raise BaseException("Not prepared for adding config")
self.config += config

def LoadTests(self):
files = os.listdir(Env.sourcedir)
files = filter(lambda x: x[-3:] == '.py', files)
files = filter(lambda x: x[:2] == 't-', files)
files.sort()

mods = []
for f in files:
mods.append(load_test_file(f))

for m in mods:
t = m.Test()
t.name = fix_test_name(t.name)
if '/' == t.name:
(n, _) = os.path.splitext(os.path.basename(m.__file__))
t.name = fix_test_name(n[2:])
t._register(self)

def Prepare(self):
print >> Env.log, "[Start] Preparing tests"
errorlog = self.PrepareFile("log/error.log", "")
accesslog = self.PrepareFile("log/access.log", "")
self.config = """
setup {{
workers 2;
module_load (
"mod_accesslog",
"mod_dirlist",
"mod_lua",
"mod_vhost"
);
listen "127.0.0.1:{Env.port}";
log = [ "*": "stderr" ];
lua.plugin "{Env.luadir}/core.lua";
lua.plugin "{Env.luadir}/secdownload.lua";
accesslog.format = "%h %V %u %t \\"%r\\" %>s %b \\"%{{Referer}}i\\" \\"%{{User-Agent}}i\\"";
accesslog = "{accesslog}";
}}
log = [ "*": "file:{errorlog}" ];
defaultaction {{
docroot "{Env.defaultwww}";
}}
var.vhosts = [ "default": ${{
defaultaction;
}} ];
""".format(Env = Env, errorlog = errorlog, accesslog = accesslog)

for t in self.tests:
print >> Env.log, "[Start] Preparing test '%s'" % (t.name)
t._prepare()

self.config += """
vhost.map var.vhosts;
"""
Env.lighttpdconf = self.PrepareFile("conf/lighttpd.conf", self.config)
Env.angelconf = self.PrepareFile("conf/angel.conf", """
instance {{
binary "{Env.worker}";
config "{Env.lighttpdconf}";
modules "{Env.plugindir}";
}}
allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
""".format(Env = Env))

print >> Env.log, "[Done] Preparing tests"

print >> Env.log, "[Start] Preparing services"
for s in self.services:
try:
s._prepare()
except:
self.failed = True
raise
print >> Env.log, "[Done] Preparing services"


def Run(self):
print >> Env.log, "[Start] Running tests"
failed = False
for t in self.run:
if not t._run(): failed = True
self.failed = failed
print >> Env.log, "[Done] Running tests [result=%s]" % (failed and "Failed" or "Succeeded")
return not failed

def Cleanup(self):
# print >> sys.stderr, "cleanup_files: %s, cleanup_dirs: %s" % (self.prepared_files, self.prepared_dirs)

if not Env.no_cleanup and not self.failed:
print >> Env.log, "[Start] Cleanup services"
for s in self.services:
s._cleanup()
print >> Env.log, "[Done] Cleanup services"
else:
print >> Env.log, "[Start] Stopping services"
for s in self.services:
s._stop()
print >> Env.log, "[Done] Stopping services"

print >> Env.log, "[Start] Cleanup tests"
for t in self.tests:
t._cleanup()
if not Env.no_cleanup and not self.failed:
self.CleanupFile("log/access.log")
self.CleanupFile("log/error.log")
self.CleanupFile("conf/lighttpd.conf")
self.CleanupFile("conf/angel.conf")
print >> Env.log, "[Done] Cleanup tests"

## helpers for prepare/cleanup
def _preparefile(self, fname, content):
if self.prepared_files.has_key(fname):
raise BaseException("File '%s' already exists!" % fname)
else:
f = open(os.path.join(Env.dir, fname), "w")
f.write(content)
f.close()
self.prepared_files[fname] = 1

def _cleanupfile(self, fname):
if self.prepared_files.has_key(fname):
os.remove(os.path.join(Env.dir, fname))
return True
else:
return False

def _preparedir(self, dirname):
if self.prepared_dirs.has_key(dirname):
self.prepared_dirs[dirname] += 1
else:
os.mkdir(os.path.join(Env.dir, dirname))
self.prepared_dirs[dirname] = 1

def _cleanupdir(self, dirname):
self.prepared_dirs[dirname] -= 1
if 0 == self.prepared_dirs[dirname]:
os.rmdir(os.path.join(Env.dir, dirname))

def PrepareFile(self, fname, content):
path = filter(lambda x: x != '', fname.split('/'))
for i in range(1, len(path)):
self._preparedir(os.path.join(*path[0:i]))
self._preparefile(os.path.join(*path), content)
return os.path.join(Env.dir, *path)

def PrepareDir(self, dirname):
path = filter(lambda x: x != '', fname.split('/'))
for i in range(1, len(path)+1):
self._preparedir(os.path.join(*path[0:i]))
return os.path.join(Env.dir, *path)

def CleanupDir(self, dirname):
path = filter(lambda x: x != '', fname.split('/'))
for i in reversed(range(1, len(path)+1)):
self._cleanupdir(os.path.join(*path[0:i]))

def CleanupFile(self, fname):
path = filter(lambda x: x != '', fname.split('/'))
if not self._cleanupfile(os.path.join(*path)):
return False
for i in reversed(range(1, len(path))):
self._cleanupdir(os.path.join(*path[0:i]))


class Lighttpd(Service):
name = "lighttpd"

def Prepare(self):
self.portfree(Env.port)
self.fork(Env.angel, '-m', Env.plugindir, '-c', Env.angelconf)
self.waitconnect(Env.port)

0 comments on commit d32de32

Please sign in to comment.