-
Notifications
You must be signed in to change notification settings - Fork 74
/
helpers.py
104 lines (85 loc) · 3.49 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import traceback
import numpy
from openfisca_core import parameters, periods
from openfisca_core.errors import ParameterParsingError
from openfisca_core.parameters import config
def contains_nan(vector):
if numpy.issubdtype(vector.dtype, numpy.record) or numpy.issubdtype(
vector.dtype, numpy.void
):
return any([contains_nan(vector[name]) for name in vector.dtype.names])
else:
return numpy.isnan(vector).any()
def load_parameter_file(file_path, name=""):
"""
Load parameters from a YAML file (or a directory containing YAML files).
:returns: An instance of :class:`.ParameterNode` or :class:`.ParameterScale` or :class:`.Parameter`.
"""
if not os.path.exists(file_path):
raise ValueError("{} does not exist".format(file_path))
if os.path.isdir(file_path):
return parameters.ParameterNode(name, directory_path=file_path)
data = _load_yaml_file(file_path)
return _parse_child(name, data, file_path)
def _compose_name(path, child_name=None, item_name=None):
if not path:
return child_name
if child_name is not None:
return "{}.{}".format(path, child_name)
if item_name is not None:
return "{}[{}]".format(path, item_name)
def _load_yaml_file(file_path):
with open(file_path, "r") as f:
try:
return config.yaml.load(f, Loader=config.Loader)
except (config.yaml.scanner.ScannerError, config.yaml.parser.ParserError):
stack_trace = traceback.format_exc()
raise ParameterParsingError(
"Invalid YAML. Check the traceback above for more details.",
file_path,
stack_trace,
)
except Exception:
stack_trace = traceback.format_exc()
raise ParameterParsingError(
"Invalid parameter file content. Check the traceback above for more details.",
file_path,
stack_trace,
)
def _parse_child(child_name, child, child_path):
if "values" in child:
return parameters.Parameter(child_name, child, child_path)
elif "brackets" in child:
return parameters.ParameterScale(child_name, child, child_path)
elif isinstance(child, dict) and all(
[periods.INSTANT_PATTERN.match(str(key)) for key in child.keys()]
):
return parameters.Parameter(child_name, child, child_path)
else:
return parameters.ParameterNode(child_name, data=child, file_path=child_path)
def _set_backward_compatibility_metadata(parameter, data):
if data.get("unit") is not None:
parameter.metadata["unit"] = data["unit"]
if data.get("reference") is not None:
parameter.metadata["reference"] = data["reference"]
def _validate_parameter(parameter, data, data_type=None, allowed_keys=None):
type_map = {
dict: "object",
list: "array",
}
if data_type is not None and not isinstance(data, data_type):
raise ParameterParsingError(
"'{}' must be of type {}.".format(parameter.name, type_map[data_type]),
parameter.file_path,
)
if allowed_keys is not None:
keys = data.keys()
for key in keys:
if key not in allowed_keys:
raise ParameterParsingError(
"Unexpected property '{}' in '{}'. Allowed properties are {}.".format(
key, parameter.name, list(allowed_keys)
),
parameter.file_path,
)