/
helpers.py
227 lines (177 loc) · 7.44 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import importlib
import json
import logging
import re
from typing import Dict, Iterable, Mapping, Optional
import jmespath
import jwt
import requests
from box.box import Box
from tavern._core import exceptions
from tavern._core.dict_util import check_keys_match_recursive, recurse_access_key
from tavern._core.jmesutils import actual_validation, validate_comparison
from tavern._core.schema.files import verify_pykwalify
logger: logging.Logger = logging.getLogger(__name__)
def check_exception_raised(
response: requests.Response, exception_location: str
) -> None:
"""Make sure the result from the server is the same as the exception we
expect to raise
Args:
response: response object
exception_location: entry point style location of exception
"""
dumped = json.loads(response.content.decode("utf8"))
module_name, exception_name = exception_location.split(":")
module = importlib.import_module(module_name)
exception = getattr(module, exception_name)
for possible_title in ["title", "error"]:
if possible_title in dumped:
try:
assert dumped[possible_title] == exception.error_title # noqa
except AssertionError as e:
raise exceptions.UnexpectedExceptionError(
"Incorrect title of exception"
) from e
actual_description = dumped.get("description", dumped.get("error_description"))
expected_description = getattr(
exception, "error_description", exception.description
)
try:
assert actual_description == expected_description # noqa
except AssertionError as e:
# If it has a format, ignore this error. Would be annoying to say how to
# format things in the validator, especially if it's a set/dict which is
# unordered
# TODO: improve logic? Use a regex like '{.+?}' instead?
if not any(i in expected_description for i in "{}"):
raise exceptions.UnexpectedExceptionError(
"exception description did not match"
) from e
try:
assert response.status_code == int(exception.status.split()[0]) # noqa
except AssertionError as e:
raise exceptions.UnexpectedExceptionError(
"exception status code did not match"
) from e
def validate_jwt(
response: requests.Response, jwt_key: str, **kwargs
) -> Mapping[str, Box]:
"""Make sure a jwt is valid
This uses the pyjwt library to decode the jwt, so any keyword args needed
should be passed as per that library. You will probably want to use
verify_signature=False unless using a HMAC key because it can be a bit
verbose to pass in a public key.
This also returns the jwt so it can be used both to verify and save jwts -
it wraps this in a Box so it can also be used for future formatting
Args:
response: requests.Response object
jwt_key: key of jwt in body of request
**kwargs: Any extra arguments to pass to jwt.decode
Returns:
mapping of jwt: boxed jwt claims
"""
token = response.json()[jwt_key]
decoded = jwt.decode(token, **kwargs)
logger.debug("Decoded jwt to %s", decoded)
return {"jwt": Box(decoded)}
def validate_pykwalify(response: requests.Response, schema: Dict) -> None:
"""Make sure the response matches a given schema
Args:
response: reqeusts Response object
schema: Schema for response
"""
try:
to_verify = response.json()
except TypeError as e:
raise exceptions.BadSchemaError(
"Tried to match a pykwalify schema against a non-json response"
) from e
else:
verify_pykwalify(to_verify, schema)
def validate_regex(
response: requests.Response,
expression: str,
*,
header: Optional[str] = None,
in_jmespath: Optional[str] = None,
) -> Dict[str, Box]:
"""Make sure the response matches a regex expression
Args:
response: requests.Response object
expression: Regex expression to use
header: Match against a particular header instead of the body
in_jmespath: if present, jmespath to access before trying to match
Returns:
mapping of regex to boxed name capture groups
"""
if header and in_jmespath:
raise exceptions.BadSchemaError("Can only specify one of header or jmespath")
if header:
content = response.headers[header]
else:
content = response.text
if in_jmespath:
if not response.headers.get("content-type", "").startswith("application/json"):
logger.warning(
"Trying to use jmespath match but content type is not application/json"
)
try:
decoded = json.loads(content)
except json.JSONDecodeError as e:
raise exceptions.RegexAccessError(
"unable to decode json for regex match"
) from e
content = recurse_access_key(decoded, in_jmespath)
if not isinstance(content, str):
raise exceptions.RegexAccessError(
"Successfully accessed {} from response, but it was a {} and not a string".format(
in_jmespath, type(content)
)
)
logger.debug("Matching %s with %s", content, expression)
match = re.search(expression, content)
if match is None:
raise exceptions.RegexAccessError("No match for regex")
return {"regex": Box(match.groupdict())}
def validate_content(response: requests.Response, comparisons: Iterable[Dict]) -> None:
"""Asserts expected value with actual value using JMES path expression
Args:
response: reqeusts.Response object.
comparisons:
A list of dict containing the following keys:
1. jmespath : JMES path expression to extract data from.
2. operator : Operator to use to compare data.
3. expected : The expected value to match for
"""
for each_comparison in comparisons:
path, _operator, expected = validate_comparison(each_comparison)
logger.debug("Searching for '%s' in '%s'", path, response.json())
actual = jmespath.search(path, response.json())
expession = " ".join([str(path), str(_operator), str(expected)])
parsed_expession = " ".join([str(actual), str(_operator), str(expected)])
try:
actual_validation(_operator, actual, expected, parsed_expession, expession)
except AssertionError as e:
raise exceptions.JMESError("Error validating JMES") from e
def check_jmespath_match(parsed_response, query: str, expected: Optional[str] = None):
"""
Check that the JMES path given in 'query' is present in the given response
Args:
parsed_response: Response list or dict
query: JMES query
expected: Possible value to match against. If None,
'query' will just check that _something_ is present
"""
actual = jmespath.search(query, parsed_response)
msg = f"JMES path '{query}' not found in response"
if actual is None:
raise exceptions.JMESError(msg)
if expected is not None:
# Reuse dict util helper as it should behave the same
check_keys_match_recursive(expected, actual, [], True)
elif not actual and not (actual == expected):
# This can return an empty list, but it might be what we expect. if not,
# raise an exception
raise exceptions.JMESError(msg)
return actual