-
Notifications
You must be signed in to change notification settings - Fork 0
/
clients.py
283 lines (232 loc) · 9.63 KB
/
clients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from urllib import quote, urlencode
import json
import treq
from twisted.internet import reactor
from twisted.python import log
from twisted.web import client
from twisted.web.http import OK
# Twisted's default HTTP11 client factory is way too verbose
client._HTTP11ClientFactory.noisy = False
class JsonClient(object):
debug = False
clock = reactor
timeout = 5
def __init__(self, endpoint):
"""
Create a client with the specified default endpoint.
"""
self.endpoint = endpoint
self.pool = client.HTTPConnectionPool(self.clock, persistent=False)
def requester(self, *args, **kwargs):
return treq.request(*args, **kwargs)
def _log_http_response(self, response, method, path, data):
log.msg('%s %s with %s returned: %s' % (
method, path, data, response.code))
return response
def _log_http_error(self, failure, url):
log.err(failure, 'Error performing request to %s' % (url,))
return failure
def request(self, method, path, endpoint=None, json_data=None, **kwargs):
"""
Perform a request. A number of basic defaults are set on the request
that make using a JSON API easier. These defaults can be overridden by
setting the parameters in the keyword args.
:param: method:
The HTTP method to use (example is `GET`).
:param: path:
The URL path. This is appended to the endpoint and should start
with a '/' (example is `/v2/apps`).
:param: endpoint:
The URL endpoint to use. The default value is the endpoint this
client was created with (`self.endpoint`) (example is
`http://localhost:8080`)
:param: json_data:
A python data structure that will be converted to a JSON string
using `json.dumps` and used as the request body.
:param: kwargs:
Any other parameters that will be passed to `treq.request`, for
example headers or parameters.
"""
url = ('%s%s' % (endpoint or self.endpoint, path)).encode('utf-8')
data = json.dumps(json_data) if json_data else None
requester_kwargs = {
'headers': {
'Content-Type': 'application/json',
'Accept': 'application/json',
},
'data': data,
'pool': self.pool,
'timeout': self.timeout
}
requester_kwargs.update(kwargs)
d = self.requester(method, url, **requester_kwargs)
if self.debug:
d.addCallback(self._log_http_response, method, url, data)
d.addErrback(self._log_http_error, url)
return d.addCallback(self._raise_for_status, url)
def get_json(self, path, **kwargs):
"""
Perform a GET request to the given path and return the JSON response.
"""
d = self.request('GET', path, **kwargs)
return d.addCallback(lambda response: response.json())
def _raise_for_status(self, response, url):
"""
Raises an `HTTPError` if the response did not succeed.
Adapted from the Requests library:
https://github.com/kennethreitz/requests/blob/v2.8.1/requests/models.py#L825-L837
"""
http_error_msg = ''
if 400 <= response.code < 500:
http_error_msg = '%s Client Error for url: %s' % (response.code,
url)
elif 500 <= response.code < 600:
http_error_msg = '%s Server Error for url: %s' % (response.code,
url)
if http_error_msg:
raise HTTPError(http_error_msg, response)
return response
class HTTPError(IOError):
"""
Error raised for 4xx and 5xx response codes.
"""
def __init__(self, message, response):
self.response = response
super(HTTPError, self).__init__(message)
class MarathonClient(JsonClient):
def _basic_get_request(self, path, field):
"""
Perform a GET request and get the contents of the JSON response.
Marathon's JSON responses tend to contain an object with a single key
which points to the actual data of the response. For example /v2/apps
returns something like {"apps": [ {"app1"}, {"app2"} ]}. We're
interested in the contents of "apps".
"""
return self.get_json(path).addCallback(self._get_json_field, field)
def _get_json_field(self, response_json, field_name):
"""
Get a JSON field from the response JSON.
:param: response_json:
The parsed JSON content of the response.
:param: field_name:
The name of the field in the JSON to get.
"""
if field_name not in response_json:
raise KeyError('Unable to get value for "%s" from Marathon '
'response: "%s"' % (
field_name, json.dumps(response_json),))
return response_json[field_name]
def get_event_subscriptions(self):
"""
Get the current Marathon event subscriptions, returning a list of
callback URLs.
"""
return self._basic_get_request(
'/v2/eventSubscriptions', 'callbackUrls')
def post_event_subscription(self, callback_url):
"""
Post a new Marathon event subscription with the given callback URL.
"""
d = self.request(
'POST', '/v2/eventSubscriptions?%s' % urlencode({
'callbackUrl': callback_url,
}))
return d.addCallback(lambda response: response.code == OK)
def get_apps(self):
"""
Get the currently running Marathon apps, returning a list of app
definitions.
"""
return self._basic_get_request('/v2/apps', 'apps')
def get_app(self, app_id):
"""
Get information about the app with the given app ID.
"""
return self._basic_get_request('/v2/apps%s' % (app_id,), 'app')
def get_app_tasks(self, app_id):
"""
Get the currently running tasks for the app with the given app ID,
returning a list of task definitions.
"""
return self._basic_get_request('/v2/apps%s/tasks' % (app_id,), 'tasks')
class ConsulClient(JsonClient):
fallback_timeout = 2
def __init__(self, endpoint, enable_fallback=False):
"""
Create a Consul client.
:param: endpoint:
The default Consul endpoint, usually on the same node as Consular
is running.
:param: enable_fallback:
Fall back to the default Consul endpoint when registering services
on an agent that cannot be reached.
"""
super(ConsulClient, self).__init__(endpoint)
self.endpoint = endpoint
self.enable_fallback = enable_fallback
def register_agent_service(self, agent_endpoint, registration):
"""
Register a Consul service at the given agent endpoint.
"""
d = self.request('PUT', '/v1/agent/service/register',
endpoint=agent_endpoint, json_data=registration)
if self.enable_fallback:
d.addErrback(self.register_agent_service_fallback, registration)
return d
def register_agent_service_fallback(self, failure, registration):
"""
Fallback to the default agent endpoint (`self.endpoint`) to register
a Consul service.
"""
log.msg('Falling back for %s at %s.' % (
registration['Name'], self.endpoint))
return self.request(
'PUT', '/v1/agent/service/register', json_data=registration,
timeout=self.fallback_timeout)
def deregister_agent_service(self, agent_endpoint, service_id):
"""
Deregister a Consul service at the given agent endpoint.
"""
return self.request('PUT', '/v1/agent/service/deregister/%s' % (
service_id,), endpoint=agent_endpoint)
def put_kv(self, key, value):
"""
Put a key/value in Consul's k/v store.
"""
return self.request(
'PUT', '/v1/kv/%s' % (quote(key),), json_data=value)
def get_kv_keys(self, keys_path, separator=None):
"""
Get the stored keys for the given keys path from the Consul k/v store.
:param: keys_path:
The path to some keys (example is `consular/my-app/`).
:param: separator:
Get all the keys up to some separator in the key path. Useful for
getting all the keys non-recursively for a path. For more
information see the Consul API documentation.
"""
params = {'keys': ''}
if separator:
params['separator'] = separator
return self.get_json(
'/v1/kv/%s?%s' % (quote(keys_path), urlencode(params),))
def delete_kv_keys(self, key, recurse=False):
"""
Delete the store key(s) at the given path from the Consul k/v store.
:param: key:
The key or key path to be deleted.
:param: recurse:
Whether or not to recursively delete all subpaths of the key.
"""
return self.request('DELETE', '/v1/kv/%s%s' % (
quote(key), '?recurse' if recurse else '',))
def get_catalog_nodes(self):
"""
Get the list of active Consul nodes from the catalog.
"""
return self.get_json('/v1/catalog/nodes')
def get_agent_services(self, agent_endpoint):
"""
Get the list of running services for the given agent endpoint.
"""
return self.get_json('/v1/agent/services', endpoint=agent_endpoint)