-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
base.py
233 lines (191 loc) · 8.61 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from __future__ import unicode_literals
import textwrap
import markdown
import os
import logging
import collections
import unittest
from functools import wraps
try:
# py>=3.2
from tempfile import TemporaryDirectory
except ImportError:
from backports.tempfile import TemporaryDirectory
from mkdocs import config
from mkdocs import utils
def dedent(text):
return textwrap.dedent(text).strip()
def get_markdown_toc(markdown_source):
""" Return TOC generated by Markdown parser from Markdown source text. """
md = markdown.Markdown(extensions=['toc'])
md.convert(markdown_source)
return md.toc
def load_config(**cfg):
""" Helper to build a simple config for testing. """
path_base = os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'integration', 'minimal'
)
cfg = cfg or {}
if 'site_name' not in cfg:
cfg['site_name'] = 'Example'
if 'config_file_path' not in cfg:
cfg['config_file_path'] = os.path.join(path_base, 'mkdocs.yml')
if 'docs_dir' not in cfg:
# Point to an actual dir to avoid a 'does not exist' error on validation.
cfg['docs_dir'] = os.path.join(path_base, 'docs')
conf = config.Config(schema=config.DEFAULT_SCHEMA, config_file_path=cfg['config_file_path'])
conf.load_dict(cfg)
errors_warnings = conf.validate()
assert(errors_warnings == ([], [])), errors_warnings
return conf
def tempdir(files=None, **kw):
"""
A decorator for building a temporary directory with prepopulated files.
The temproary directory and files are created just before the wrapped function is called and are destroyed
imediately after the wrapped function returns.
The `files` keyword should be a dict of file paths as keys and strings of file content as values.
If `files` is a list, then each item is assumed to be a path of an empty file. All other
keywords are passed to `tempfile.TemporaryDirectory` to create the parent directory.
In the following example, two files are created in the temporary directory and then are destroyed when
the function exits:
@tempdir(files={
'foo.txt': 'foo content',
'bar.txt': 'bar content'
})
def example(self, tdir):
assert os.path.isfile(os.path.join(tdir, 'foo.txt'))
pth = os.path.join(tdir, 'bar.txt')
assert os.path.isfile(pth)
with io.open(pth, 'r', encoding='utf-8') as f:
assert f.read() == 'bar content'
"""
files = {f: '' for f in files} if isinstance(files, (list, tuple)) else files or {}
if 'prefix' not in kw:
kw['prefix'] = 'mkdocs_test-'
def decorator(fn):
@wraps(fn)
def wrapper(self, *args):
with TemporaryDirectory(**kw) as td:
for path, content in files.items():
pth = os.path.join(td, path)
utils.write_file(content.encode(encoding='utf-8'), pth)
return fn(self, td, *args)
return wrapper
return decorator
class PathAssertionMixin(object):
"""
Assertion methods for testing paths.
Each method accepts one or more strings, which are first joined using os.path.join.
"""
def assertPathsEqual(self, a, b, msg=None):
self.assertEqual(a.replace('\\', '/'), b.replace('\\', '/'))
def assertPathExists(self, *parts):
path = os.path.join(*parts)
if not os.path.exists(path):
msg = self._formatMessage(None, "The path '{}' does not exist".format(path))
raise self.failureException(msg)
def assertPathNotExists(self, *parts):
path = os.path.join(*parts)
if os.path.exists(path):
msg = self._formatMessage(None, "The path '{}' does exist".format(path))
raise self.failureException(msg)
def assertPathIsFile(self, *parts):
path = os.path.join(*parts)
if not os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is not a file that exists".format(path))
raise self.failureException(msg)
def assertPathNotFile(self, *parts):
path = os.path.join(*parts)
if os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is a file that exists".format(path))
raise self.failureException(msg)
def assertPathIsDir(self, *parts):
path = os.path.join(*parts)
if not os.path.isdir(path):
msg = self._formatMessage(None, "The path '{}' is not a directory that exists".format(path))
raise self.failureException(msg)
def assertPathNotDir(self, *parts):
path = os.path.join(*parts)
if os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is a directory that exists".format(path))
raise self.failureException(msg)
# Backport unittest.TestCase.assertLogs for Python 2.7
# see https://github.com/python/cpython/blob/3.6/Lib/unittest/case.py
if not utils.PY3:
_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
["records", "output"])
class _CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""
def __init__(self):
logging.Handler.__init__(self)
self.watcher = _LoggingWatcher([], [])
def flush(self):
pass
def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)
class _AssertLogsContext(object):
"""A context manager used to implement TestCase.assertLogs()."""
LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
def __init__(self, test_case, logger_name, level):
self.test_case = test_case
self.logger_name = logger_name
if level:
self.level = logging._levelNames.get(level, level)
else:
self.level = logging.INFO
self.msg = None
def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
logger = self.logger = self.logger_name
else:
logger = self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = _CapturingHandler()
handler.setFormatter(formatter)
self.watcher = handler.watcher
self.old_handlers = logger.handlers[:]
self.old_level = logger.level
self.old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(self.level)
logger.propagate = False
return handler.watcher
def __exit__(self, exc_type, exc_value, tb):
self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate
self.logger.setLevel(self.old_level)
if exc_type is not None:
# let unexpected exceptions pass through
return False
if len(self.watcher.records) == 0:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
def _raiseFailure(self, standardMsg):
msg = self.test_case._formatMessage(self.msg, standardMsg)
raise self.test_case.failureException(msg)
class LogTestCase(unittest.TestCase):
def assertLogs(self, logger=None, level=None):
"""Fail unless a log message of level *level* or higher is emitted
on *logger_name* or its children. If omitted, *level* defaults to
INFO and *logger* defaults to the root logger.
This method must be used as a context manager, and will yield
a recording object with two attributes: `output` and `records`.
At the end of the context manager, the `output` attribute will
be a list of the matching formatted log messages and the
`records` attribute will be a list of the corresponding LogRecord
objects.
Example::
with self.assertLogs('foo', level='INFO') as cm:
logging.getLogger('foo').info('first message')
logging.getLogger('foo.bar').error('second message')
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
"""
return _AssertLogsContext(self, logger, level)
else:
LogTestCase = unittest.TestCase