-
Notifications
You must be signed in to change notification settings - Fork 73
/
queues.py
347 lines (291 loc) · 13.4 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import falcon
from oslo_log import log as logging
import six
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar.storage import errors as storage_errors
from zaqar.transport import acl
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
def _get_reserved_metadata(validate):
_reserved_metadata = ['max_messages_post_size', 'default_message_ttl',
'default_message_delay']
reserved_metadata = {
'_%s' % meta:
validate.get_limit_conf_value(meta)
for meta in _reserved_metadata
}
for metadata in ['_dead_letter_queue', '_dead_letter_queue_messages_ttl',
'_max_claim_count']:
reserved_metadata.update({metadata: None})
reserved_metadata.update({'_enable_encrypt_messages': False})
return reserved_metadata
class ItemResource(object):
__slots__ = ('_validate', '_queue_controller', '_message_controller',
'_reserved_metadata')
def __init__(self, validate, queue_controller, message_controller):
self._validate = validate
self._queue_controller = queue_controller
self._message_controller = message_controller
@decorators.TransportLog("Queues item")
@acl.enforce("queues:get")
def on_get(self, req, resp, project_id, queue_name):
try:
resp_dict = self._queue_controller.get(queue_name,
project=project_id)
for meta, value in _get_reserved_metadata(self._validate).items():
if not resp_dict.get(meta):
resp_dict[meta] = value
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
except Exception:
description = _(u'Queue metadata could not be retrieved.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.body = utils.to_json(resp_dict)
# status defaults to 200
@decorators.TransportLog("Queues item")
@acl.enforce("queues:create")
def on_put(self, req, resp, project_id, queue_name):
try:
# Place JSON size restriction before parsing
self._validate.queue_metadata_length(req.content_length)
# Deserialize queue metadata
metadata = None
if req.content_length:
document = wsgi_utils.deserialize(req.stream,
req.content_length)
metadata = wsgi_utils.sanitize(document)
self._validate.queue_metadata_putting(metadata)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
try:
created = self._queue_controller.create(queue_name,
metadata=metadata,
project=project_id)
except storage_errors.FlavorDoesNotExist as ex:
LOG.exception('Flavor "%s" does not exist', queue_name)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception:
description = _(u'Queue could not be created.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
resp.location = req.path
@decorators.TransportLog("Queues item")
@acl.enforce("queues:delete")
def on_delete(self, req, resp, project_id, queue_name):
LOG.debug(u'Queue item DELETE - queue: %(queue)s, '
u'project: %(project)s',
{'queue': queue_name, 'project': project_id})
try:
self._queue_controller.delete(queue_name, project=project_id)
except Exception:
description = _(u'Queue could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
@decorators.TransportLog("Queues item")
@acl.enforce("queues:update")
def on_patch(self, req, resp, project_id, queue_name):
"""Allows one to update a queue's metadata.
This method expects the user to submit a JSON object. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
badly formatted input.
:returns: HTTP | 200,400,409,503
"""
LOG.debug(u'PATCH queue - name: %s', queue_name)
try:
# Place JSON size restriction before parsing
self._validate.queue_metadata_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
# NOTE(flwang): See below link to get more details about draft 10,
# tools.ietf.org/html/draft-ietf-appsawg-json-patch-10
content_types = {
'application/openstack-messaging-v2.0-json-patch': 10,
}
if req.content_type not in content_types:
headers = {'Accept-Patch':
', '.join(sorted(content_types.keys()))}
msg = _("Accepted media type for PATCH: %s.")
LOG.debug(msg, headers)
raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers)
if req.content_length:
try:
changes = utils.read_json(req.stream, req.content_length)
changes = wsgi_utils.sanitize(changes, doctype=list)
except utils.MalformedJSON as ex:
LOG.debug(ex)
description = _(u'Request body could not be parsed.')
raise wsgi_errors.HTTPBadRequestBody(description)
except utils.OverflowedJSONInteger as ex:
LOG.debug(ex)
description = _(u'JSON contains integer that is too large.')
raise wsgi_errors.HTTPBadRequestBody(description)
except Exception:
# Error while reading from the network/server
description = _(u'Request body could not be read.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
else:
msg = _("PATCH body could not be empty for update.")
LOG.debug(msg)
raise wsgi_errors.HTTPBadRequestBody(msg)
try:
changes = self._validate.queue_patching(req, changes)
# NOTE(Eva-i): using 'get_metadata' instead of 'get', so
# QueueDoesNotExist error will be thrown in case of non-existent
# queue.
metadata = self._queue_controller.get_metadata(queue_name,
project=project_id)
reserved_metadata = _get_reserved_metadata(self._validate)
for change in changes:
change_method_name = '_do_%s' % change['op']
change_method = getattr(self, change_method_name)
change_method(req, metadata, reserved_metadata, change)
self._validate.queue_metadata_putting(metadata)
self._queue_controller.set_metadata(queue_name,
metadata,
project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
except wsgi_errors.HTTPConflict:
raise
except Exception:
description = _(u'Queue could not be updated.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
for meta, value in _get_reserved_metadata(self._validate).items():
if not metadata.get(meta):
metadata[meta] = value
resp.body = utils.to_json(metadata)
def _do_replace(self, req, metadata, reserved_metadata, change):
path = change['path']
path_child = path[1]
value = change['value']
if path_child in metadata or path_child in reserved_metadata:
metadata[path_child] = value
else:
msg = _("Can't replace non-existent object %s.")
raise wsgi_errors.HTTPConflict(msg % path_child)
def _do_add(self, req, metadata, reserved_metadata, change):
path = change['path']
path_child = path[1]
value = change['value']
metadata[path_child] = value
def _do_remove(self, req, metadata, reserved_metadata, change):
path = change['path']
path_child = path[1]
if path_child in metadata:
metadata.pop(path_child)
elif path_child not in reserved_metadata:
msg = _("Can't remove non-existent object %s.")
raise wsgi_errors.HTTPConflict(msg % path_child)
class CollectionResource(object):
__slots__ = ('_queue_controller', '_validate', '_reserved_metadata')
def __init__(self, validate, queue_controller):
self._queue_controller = queue_controller
self._validate = validate
def _queue_list(self, project_id, path, kfilter, **kwargs):
try:
self._validate.queue_listing(**kwargs)
with_count = kwargs.pop('with_count', False)
results = self._queue_controller.list(project=project_id,
kfilter=kfilter, **kwargs)
# Buffer list of queues
queues = list(next(results))
total_number = None
if with_count:
total_number = self._queue_controller.calculate_resource_count(
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception:
description = _(u'Queues could not be listed.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Got some. Prepare the response.
kwargs['marker'] = next(results) or kwargs.get('marker', '')
reserved_metadata = _get_reserved_metadata(self._validate).items()
for each_queue in queues:
each_queue['href'] = path + '/' + each_queue['name']
if kwargs.get('detailed'):
for meta, value in reserved_metadata:
if not each_queue.get('metadata', {}).get(meta):
each_queue['metadata'][meta] = value
return queues, kwargs['marker'], total_number
def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('detailed', store=kwargs)
req.get_param('name', store=kwargs)
req.get_param_as_bool('with_count', store=kwargs)
queues, marker, total_number = self._queue_list(project_id,
req.path, kfilter,
**kwargs)
links = []
kwargs['marker'] = marker
if queues:
links = [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
response_body = {
'queues': queues,
'links': links
}
if total_number:
response_body['count'] = total_number
resp.body = utils.to_json(response_body)
# status defaults to 200
@decorators.TransportLog("Queues collection")
@acl.enforce("queues:get_all")
def on_get(self, req, resp, project_id):
field = ('marker', 'limit', 'detailed', 'name', 'with_count')
kfilter = copy.deepcopy(req.params)
for key in req.params.keys():
if key in field:
kfilter.pop(key)
kfilter = kfilter if len(kfilter) > 0 else {}
for key in kfilter.keys():
# Since we get the filter value from URL, so need to
# turn the string to integer if using integer filter value.
try:
kfilter[key] = int(kfilter[key])
except ValueError:
continue
self._on_get_with_kfilter(req, resp, project_id, kfilter)
# status defaults to 200