-
Notifications
You must be signed in to change notification settings - Fork 61
/
util.py
150 lines (121 loc) · 4.39 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from .mock_response import MockHTTPResponse
from datetime import datetime
from requests.models import PreparedRequest, Response
from requests.packages.urllib3 import HTTPResponse
from requests.structures import CaseInsensitiveDict
from requests.status_codes import _codes
from requests.cookies import RequestsCookieJar
import base64
import io
def coerce_content(content, encoding=None):
if hasattr(content, 'decode'):
content = content.decode(encoding or 'utf-8', 'replace')
return content
def body_io(string, encoding=None):
if hasattr(string, 'encode'):
string = string.encode(encoding or 'utf-8')
return io.BytesIO(string)
def from_list(value):
if isinstance(value, list):
return value[0]
return value
def add_body(r, preserve_exact_body_bytes, body_dict):
"""Simple function which takes a response or request and coerces the body.
This function adds either ``'string'`` or ``'base64_string'`` to
``body_dict``. If ``preserve_exact_body_bytes`` is ``True`` then it
encodes the body as a base64 string and saves it like that. Otherwise,
it saves the plain string.
:param r: This is either a PreparedRequest instance or a Response
instance.
:param preserve_exact_body_bytes bool: Either True or False.
:param body_dict dict: A dictionary already containing the encoding to be
used.
"""
body = getattr(r, 'raw', getattr(r, 'body', None))
if hasattr(body, 'read'):
body = body.read()
if not body:
body = ''
if (preserve_exact_body_bytes or
'gzip' in r.headers.get('Content-Encoding', '')):
body_dict['base64_string'] = base64.b64encode(body).decode()
else:
body_dict['string'] = coerce_content(body, body_dict['encoding'])
def serialize_prepared_request(request, preserve_exact_body_bytes):
headers = request.headers
body = {'encoding': 'utf-8'}
add_body(request, preserve_exact_body_bytes, body)
return {
'body': body,
'headers': dict(
(coerce_content(k, 'utf-8'), [v]) for (k, v) in headers.items()
),
'method': request.method,
'uri': request.url,
}
def deserialize_prepared_request(serialized):
p = PreparedRequest()
p._cookies = RequestsCookieJar()
body = serialized['body']
if isinstance(body, dict):
original_body = body.get('string')
p.body = original_body or base64.b64decode(
body.get('base64_string', '').encode())
else:
p.body = body
h = [(k, from_list(v)) for k, v in serialized['headers'].items()]
p.headers = CaseInsensitiveDict(h)
p.method = serialized['method']
p.url = serialized['uri']
return p
def serialize_response(response, preserve_exact_body_bytes):
body = {'encoding': response.encoding}
add_body(response, preserve_exact_body_bytes, body)
return {
'body': body,
'headers': dict((k, [v]) for k, v in response.headers.items()),
'status': {'code': response.status_code, 'message': response.reason},
'url': response.url,
}
def deserialize_response(serialized):
r = Response()
r.encoding = serialized['body']['encoding']
h = [(k, from_list(v)) for k, v in serialized['headers'].items()]
r.headers = CaseInsensitiveDict(h)
r.url = serialized.get('url', '')
if 'status' in serialized:
r.status_code = serialized['status']['code']
r.reason = serialized['status']['message']
else:
r.status_code = serialized['status_code']
r.reason = _codes[r.status_code][0].upper()
add_urllib3_response(serialized, r)
return r
def add_urllib3_response(serialized, response):
if 'base64_string' in serialized['body']:
body = io.BytesIO(
base64.b64decode(serialized['body']['base64_string'].encode())
)
else:
body = body_io(**serialized['body'])
h = HTTPResponse(
body,
status=response.status_code,
headers=response.headers,
preload_content=False,
original_response=MockHTTPResponse(response.headers)
)
response.raw = h
def timestamp():
stamp = datetime.utcnow().isoformat()
try:
i = stamp.rindex('.')
except ValueError:
return stamp
else:
return stamp[:i]
def _option_from(option, kwargs, defaults):
value = kwargs.get(option)
if value is None:
value = defaults.get(option)
return value