forked from giampaolo/confix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
confix.py
287 lines (231 loc) · 8.96 KB
/
confix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python
"""
A language-agnostic configuration parser.
Currently supports YAML, JSON, INI and TOML serialization formats.
"""
import collections
import json
import os
import sys
try:
import configparser # py3
except ImportError:
import ConfigParser as configparser
__all__ = ['register', 'parse', 'discard', 'schema', 'ValidationError']
__version__ = '0.2.0'
__author__ = 'Giampaolo Rodola'
__license__ = 'MIT'
_PY3 = sys.version_info >= (3, )
if _PY3:
basestring = str
# --- exceptions (public)
class Error(Exception):
"""Base exception class from which derive all others."""
class ValidationError(Error):
"""Raised when validation through required(validator=callable)
doesn't pass (callable return False).
This can be used within your validator in order to throw custom
error messages (...and it's the only exception class which is
"""
def __init__(self, msg=None):
self.msg = msg
# these are set later in parse()
self.section = None
self.key = None
self.value = None
def __str__(self):
msg = "'%s.%s'" % (self.section, self.key)
if not self.msg:
msg += " value is invalid (got %r)" % self.value
else:
msg += " %s" % self.msg
return msg
# --- exceptions (internal)
class InvalidKeyError(Error):
"""Raised when the configuration class does not define a key but
that is defined in the config file.
"""
def __init__(self, section, key, msg=None):
self.section = section
self.key = key
self.msg = msg
def __str__(self):
return self.msg or \
"%r configuration class has no value %r but this is defined" \
" in the config file" % (self.section, self.key)
class RequiredKeyError(Error):
"""Raised when the config file didn't specify a required key."""
def __init__(self, section, key, msg=None):
self.section = section
self.key = key
self.msg = msg
def __str__(self):
return self.msg or \
"%r configuration class requires %r key to be specified in the " \
"config file" % (self.section, self.key)
class TypesMismatchError(Error):
"""Raised when confg file overrides a key having a type different
than the original one defined in the configuration class.
"""
def __init__(self, section, key, default_value, new_value, msg=None):
self.section = section
self.key = key
self.default_value = default_value
self.new_value = new_value
self.msg = msg
def __str__(self):
# TODO: rephrase
return "'%s:%s' type mismatch expected %r, got %r" \
% (self.section, self.key,
type(self.default_value), type(self.new_value))
# --- parsers
def parse_yaml(file):
import yaml
return yaml.load(file.read())
def parse_toml(file):
import toml
return toml.loads(file.read())
def parse_json(file):
content = file.read()
if not content.strip():
# empty JSON file; do not explode in order to be consistent with
# other formats (for now at least...)
return {}
return json.loads(content)
def parse_ini(file):
config = configparser.ConfigParser()
config.read(file.name)
ret = {}
bool_true = set(("1", "yes", "true", "on"))
bool_false = set(("0", "no", "false", "off"))
for section, values in config._sections.items():
ret[section] = {}
for key, value in values.items():
value_stripped = value.strip()
if value.isdigit():
value = int(value)
elif value_stripped in bool_true:
value = True
elif value_stripped in bool_false:
value = False
else:
# guard against float('inf') which returns 'infinite'
if value_stripped != 'inf':
try:
value = float(value)
except ValueError:
pass
ret[section][key] = value
ret[section].pop('__name__', None)
return ret
# --- public API
_conf_map = {}
_conf_file = None
_DEFAULT = object()
class schema(collections.namedtuple('field',
['default', 'required', 'validator'])):
def __new__(cls, default=_DEFAULT, required=False, validator=None):
if not required and default is _DEFAULT:
raise TypeError("specify a default value or set required=True")
if validator is not None and not callable(validator):
raise ValueError("%r is not callable" % validator)
return super(schema, cls).__new__(cls, default, required, validator)
def register(name):
"""Register a configuration class which will be parsed later."""
def wrapper(klass):
_conf_map[name] = klass
return klass
return wrapper
def parse(conf_file, parser=None, type_check=True):
"""Parse a configuration file in order to overwrite the previously
registered configuration classes.
Params:
- (str|file) conf_file: a path to a configuration file or an
existing file-like object.
- (callable) parser: the function parsing the configuration file
and converting it to a dict. If None a default parser will
be picked up depending on the file extension.
- (bool) type_check: when True raises exception in case an option
specified in the configuration file has a different type than
the one defined in the configuration class.
"""
global _conf_file
if _conf_file is not None:
raise Error('already configured (you may want to use discard() '
'then call parse() again')
if isinstance(conf_file, basestring):
# 'r' looks mandatory on Python 3.X
file = open(conf_file, 'r')
with file:
pmap = {'.yaml': parse_yaml,
'.yml': parse_yaml,
'.toml': parse_toml,
'.json': parse_json,
'.ini': parse_ini}
if parser is None:
if not hasattr(file, 'name'):
raise ValueError("can't determine format from a file object "
"with no 'name' attribute")
try:
ext = os.path.splitext(file.name)[1]
parser = pmap[ext]
except KeyError:
raise ValueError("don't know how to parse %r" % file.name)
conf = parser(file)
# TODO: use a copy of _conf_map and set it at the end of this
# procedure?
# TODO: should we use threading.[R]Lock (probably safer)?
if isinstance(conf, dict):
for section, values in conf.items():
inst = _conf_map.get(section, None)
if inst is not None:
assert isinstance(values, dict)
for key, new_value in values.items():
#
try:
default_value = getattr(inst, key)
except AttributeError:
raise InvalidKeyError(section, key)
#
is_schema = isinstance(default_value, schema)
# TODO: perhpas "not is_schema" is not necessary
check_type = (type_check
and not is_schema
and default_value is not None
and new_value is not None)
if check_type and type(new_value) != type(default_value):
raise TypesMismatchError(section, key, default_value,
new_value)
#
if is_schema and default_value.validator is not None:
exc = None
try:
ok = default_value.validator(new_value)
except ValidationError as err:
exc = ValidationError(err.msg)
else:
if not ok:
exc = ValidationError()
if exc is not None:
exc.section = section
exc.key = key
exc.value = new_value
raise exc
setattr(inst, key, new_value)
else:
if conf is not None:
raise Error('invalid configuration file %r' % file.name)
# parse the configuration classes in order to collect all schemas
# which were not overwritten by the config file
for section, cflet in _conf_map.items():
for key, value in cflet.__dict__.items():
if isinstance(value, schema):
if value.required:
raise RequiredKeyError(section, key)
setattr(cflet, key, value.default)
_conf_file = file
def discard():
"""Discard previous configuration (if any)."""
global _conf_file
_conf_map.clear()
_conf_file = None