-
Notifications
You must be signed in to change notification settings - Fork 42
/
api_requestor.py
104 lines (85 loc) · 3.8 KB
/
api_requestor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from __future__ import unicode_literals
import requests
import json
import lob
from lob import error
from lob.version import VERSION
from lob.constants import TIMEOUT_DEFAULT
def _is_file_like(obj):
"""
Checks if an object is file-like enough to be sent to requests.
In particular, file, StringIO and cStringIO objects are file-like.
Refs http://stackoverflow.com/questions/3450857/python-determining-if-an-object-is-file-like
"""
return hasattr(obj, 'read') and hasattr(obj, 'seek')
class APIRequestor(object):
def __init__(self, key=None):
self.api_key = key or lob.api_key
def parse_response(self, resp):
if resp.status_code == 504:
raise error.APIConnectionError(resp.content or resp.reason, # pragma: no cover
resp.content, resp.status_code, resp)
payload = resp.json()
if resp.status_code == 200:
return payload
elif resp.status_code == 401:
raise error.AuthenticationError(payload['error']['message'],
resp.content, resp.status_code, resp)
elif resp.status_code in [404, 422]:
raise error.InvalidRequestError(payload['error']['message'],
resp.content, resp.status_code, resp)
else: # pragma: no cover
raise error.APIError(payload['error']['message'], resp.content, resp.status_code, resp)
def request(self, method, url, params=None, timeout=TIMEOUT_DEFAULT):
headers = {
'User-Agent': 'Lob/v1 PythonBindings/%s' % VERSION
}
if hasattr(lob, 'api_version'):
headers['Lob-Version'] = lob.api_version
if params and 'lob_version' in params:
headers['Lob-Version'] = params.pop('lob_version')
if params and 'headers' in params:
headers.update(params['headers'])
del params['headers']
if method == 'get':
resp = requests.get(
lob.api_base + url,
auth=(self.api_key, ''),
params=params,
headers=headers,
timeout=timeout
)
return self.parse_response(resp)
elif method == 'delete':
resp = requests.delete(lob.api_base + url, auth=(self.api_key, ''), headers=headers, timeout=timeout)
return self.parse_response(resp)
elif method == 'post':
query = {}
data = {}
files = params.pop('files', {})
explodedParams = {}
if params and 'query' in params:
query.update(params['query'])
del params['query']
for k, v in params.items():
if k == 'merge_variables' or k == 'addresses':
explodedParams[k] = json.dumps(v)
elif isinstance(v, dict) and not isinstance(v, lob.resource.LobObject):
for k2, v2 in v.items():
explodedParams[k + '[' + k2 + ']'] = v2
else:
explodedParams[k] = v
for k, v in explodedParams.items():
if _is_file_like(v):
files[k] = v
else:
if isinstance(v, lob.resource.LobObject):
data[k] = v.id
else:
if isinstance(v, dict):
for k2, v2 in v.items():
data[k + '[' + k2 + ']'] = v2
else:
data[k] = v
resp = requests.post(lob.api_base + url, auth=(self.api_key, ''), params=query, data=data, files=files, headers=headers, timeout=timeout)
return self.parse_response(resp)