/
report.py
2411 lines (2169 loc) · 113 KB
/
report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import functools
import random
import re
import time
from keystoneauth1 import exceptions as ks_exc
import os_traits
from oslo_log import log as logging
from oslo_middleware import request_id
from oslo_utils import versionutils
import retrying
from nova.compute import provider_tree
from nova.compute import utils as compute_utils
import nova.conf
from nova import exception
from nova.i18n import _
from nova import objects
from nova import rc_fields as fields
from nova.scheduler import utils as scheduler_utils
from nova import utils
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
VCPU = fields.ResourceClass.VCPU
MEMORY_MB = fields.ResourceClass.MEMORY_MB
DISK_GB = fields.ResourceClass.DISK_GB
_RE_INV_IN_USE = re.compile("Inventory for (.+) on resource provider "
"(.+) in use")
WARN_EVERY = 10
PLACEMENT_CLIENT_SEMAPHORE = 'placement_client'
RESHAPER_VERSION = '1.30'
CONSUMER_GENERATION_VERSION = '1.28'
NESTED_AC_VERSION = '1.29'
ALLOW_RESERVED_EQUAL_TOTAL_INVENTORY_VERSION = '1.26'
POST_RPS_RETURNS_PAYLOAD_API_VERSION = '1.20'
AGGREGATE_GENERATION_VERSION = '1.19'
NESTED_PROVIDER_API_VERSION = '1.14'
POST_ALLOCATIONS_API_VERSION = '1.13'
AggInfo = collections.namedtuple('AggInfo', ['aggregates', 'generation'])
TraitInfo = collections.namedtuple('TraitInfo', ['traits', 'generation'])
ProviderAllocInfo = collections.namedtuple(
'ProviderAllocInfo', ['allocations'])
def warn_limit(self, msg):
if self._warn_count:
self._warn_count -= 1
else:
self._warn_count = WARN_EVERY
LOG.warning(msg)
def safe_connect(f):
@functools.wraps(f)
def wrapper(self, *a, **k):
try:
return f(self, *a, **k)
except ks_exc.EndpointNotFound:
warn_limit(
self, 'The placement API endpoint was not found.')
# Reset client session so there is a new catalog, which
# gets cached when keystone is first successfully contacted.
self._client = self._create_client()
except ks_exc.MissingAuthPlugin:
warn_limit(
self, 'No authentication information found for placement API.')
except ks_exc.Unauthorized:
warn_limit(
self, 'Placement service credentials do not work.')
except ks_exc.DiscoveryFailure:
# TODO(_gryf): Looks like DiscoveryFailure is not the only missing
# exception here. In Pike we should take care about keystoneauth1
# failures handling globally.
warn_limit(self,
'Discovering suitable URL for placement API failed.')
except ks_exc.ConnectFailure:
LOG.warning('Placement API service is not responding.')
return wrapper
class Retry(Exception):
def __init__(self, operation, reason):
self.operation = operation
self.reason = reason
def retries(f):
"""Decorator to retry a call three times if it raises Retry
Note that this returns the actual value of the inner call on success
or returns False if all the retries fail.
"""
@functools.wraps(f)
def wrapper(self, *a, **k):
for retry in range(0, 4):
try:
sleep_time = random.uniform(0, retry * 2)
time.sleep(sleep_time)
return f(self, *a, **k)
except Retry as e:
LOG.debug(
'Unable to %(op)s because %(reason)s; retrying...',
{'op': e.operation, 'reason': e.reason})
LOG.error('Failed scheduler client operation %s: out of retries',
f.__name__)
return False
return wrapper
def _compute_node_to_inventory_dict(compute_node):
"""Given a supplied `objects.ComputeNode` object, return a dict, keyed
by resource class, of various inventory information.
:param compute_node: `objects.ComputeNode` object to translate
"""
result = {}
# NOTE(jaypipes): Ironic virt driver will return 0 values for vcpus,
# memory_mb and disk_gb if the Ironic node is not available/operable
if compute_node.vcpus > 0:
result[VCPU] = {
'total': compute_node.vcpus,
'reserved': CONF.reserved_host_cpus,
'min_unit': 1,
'max_unit': compute_node.vcpus,
'step_size': 1,
'allocation_ratio': compute_node.cpu_allocation_ratio,
}
if compute_node.memory_mb > 0:
result[MEMORY_MB] = {
'total': compute_node.memory_mb,
'reserved': CONF.reserved_host_memory_mb,
'min_unit': 1,
'max_unit': compute_node.memory_mb,
'step_size': 1,
'allocation_ratio': compute_node.ram_allocation_ratio,
}
if compute_node.local_gb > 0:
# TODO(johngarbutt) We should either move to reserved_host_disk_gb
# or start tracking DISK_MB.
reserved_disk_gb = compute_utils.convert_mb_to_ceil_gb(
CONF.reserved_host_disk_mb)
result[DISK_GB] = {
'total': compute_node.local_gb,
'reserved': reserved_disk_gb,
'min_unit': 1,
'max_unit': compute_node.local_gb,
'step_size': 1,
'allocation_ratio': compute_node.disk_allocation_ratio,
}
return result
def _instance_to_allocations_dict(instance):
"""Given an `objects.Instance` object, return a dict, keyed by resource
class of the amount used by the instance.
:param instance: `objects.Instance` object to translate
"""
alloc_dict = scheduler_utils.resources_from_flavor(instance,
instance.flavor)
# Remove any zero allocations.
return {key: val for key, val in alloc_dict.items() if val}
def _move_operation_alloc_request(source_allocs, dest_alloc_req):
"""Given existing allocations for a source host and a new allocation
request for a destination host, return a new allocation_request that
contains resources claimed against both source and destination, accounting
for shared providers.
Also accounts for a resize to the same host where the source and dest
compute node resource providers are going to be the same. In that case
we sum the resource allocations for the single provider.
:param source_allocs: Dict, keyed by resource provider UUID, of resources
allocated on the source host
:param dest_alloc_req: The allocation_request for resources against the
destination host
"""
LOG.debug("Doubling-up allocation_request for move operation. Current "
"allocations: %s", source_allocs)
# Remove any allocations against resource providers that are
# already allocated against on the source host (like shared storage
# providers)
cur_rp_uuids = set(source_allocs.keys())
new_rp_uuids = set(dest_alloc_req['allocations']) - cur_rp_uuids
current_allocs = {
cur_rp_uuid: {'resources': alloc['resources']}
for cur_rp_uuid, alloc in source_allocs.items()
}
new_alloc_req = {'allocations': current_allocs}
for rp_uuid in dest_alloc_req['allocations']:
if rp_uuid in new_rp_uuids:
new_alloc_req['allocations'][rp_uuid] = dest_alloc_req[
'allocations'][rp_uuid]
elif not new_rp_uuids:
# If there are no new_rp_uuids that means we're resizing to
# the same host so we need to sum the allocations for
# the compute node (and possibly shared providers) using both
# the current and new allocations.
# Note that we sum the allocations rather than take the max per
# resource class between the current and new allocations because
# the compute node/resource tracker is going to adjust for
# decrementing any old allocations as necessary, the scheduler
# shouldn't make assumptions about that.
scheduler_utils.merge_resources(
new_alloc_req['allocations'][rp_uuid]['resources'],
dest_alloc_req['allocations'][rp_uuid]['resources'])
LOG.debug("New allocation_request containing both source and "
"destination hosts in move operation: %s", new_alloc_req)
return new_alloc_req
def _extract_inventory_in_use(body):
"""Given an HTTP response body, extract the resource classes that were
still in use when we tried to delete inventory.
:returns: String of resource classes or None if there was no InventoryInUse
error in the response body.
"""
match = _RE_INV_IN_USE.search(body)
if match:
return match.group(1)
return None
def get_placement_request_id(response):
if response is not None:
return response.headers.get(request_id.HTTP_RESP_HEADER_REQUEST_ID)
class SchedulerReportClient(object):
"""Client class for updating the scheduler."""
def __init__(self, adapter=None):
"""Initialize the report client.
:param adapter: A prepared keystoneauth1 Adapter for API communication.
If unspecified, one is created based on config options in the
[placement] section.
"""
self._adapter = adapter
# An object that contains a nova-compute-side cache of resource
# provider and inventory information
self._provider_tree = provider_tree.ProviderTree()
# Track the last time we updated providers' aggregates and traits
self._association_refresh_time = {}
self._client = self._create_client()
# NOTE(danms): Keep track of how naggy we've been
self._warn_count = 0
@utils.synchronized(PLACEMENT_CLIENT_SEMAPHORE)
def _create_client(self):
"""Create the HTTP session accessing the placement service."""
# Flush provider tree and associations so we start from a clean slate.
self._provider_tree = provider_tree.ProviderTree()
self._association_refresh_time = {}
client = self._adapter or utils.get_ksa_adapter('placement')
# Set accept header on every request to ensure we notify placement
# service of our response body media type preferences.
client.additional_headers = {'accept': 'application/json'}
return client
def get(self, url, version=None, global_request_id=None):
headers = ({request_id.INBOUND_HEADER: global_request_id}
if global_request_id else {})
return self._client.get(url, microversion=version, headers=headers)
def post(self, url, data, version=None, global_request_id=None):
headers = ({request_id.INBOUND_HEADER: global_request_id}
if global_request_id else {})
# NOTE(sdague): using json= instead of data= sets the
# media type to application/json for us. Placement API is
# more sensitive to this than other APIs in the OpenStack
# ecosystem.
return self._client.post(url, json=data, microversion=version,
headers=headers)
def put(self, url, data, version=None, global_request_id=None):
# NOTE(sdague): using json= instead of data= sets the
# media type to application/json for us. Placement API is
# more sensitive to this than other APIs in the OpenStack
# ecosystem.
kwargs = {'microversion': version,
'headers': {request_id.INBOUND_HEADER:
global_request_id} if global_request_id else {}}
if data is not None:
kwargs['json'] = data
return self._client.put(url, **kwargs)
def delete(self, url, version=None, global_request_id=None):
headers = ({request_id.INBOUND_HEADER: global_request_id}
if global_request_id else {})
return self._client.delete(url, microversion=version, headers=headers)
@safe_connect
def get_allocation_candidates(self, context, resources):
"""Returns a tuple of (allocation_requests, provider_summaries,
allocation_request_version).
The allocation_requests are a collection of potential JSON objects that
can be passed to the PUT /allocations/{consumer_uuid} Placement REST
API to claim resources against one or more resource providers that meet
the requested resource constraints.
The provider summaries is a dict, keyed by resource provider UUID, of
inventory and capacity information and traits for any resource
provider involved in the allocation_requests.
:returns: A tuple with a list of allocation_request dicts, a dict of
provider information, and the microversion used to request
this data from placement, or (None, None, None) if the
request failed
:param context: The security context
:param nova.scheduler.utils.ResourceRequest resources:
A ResourceRequest object representing the requested resources,
traits, and aggregates from the request spec.
Example member_of (aggregates) value in resources:
[('foo', 'bar'), ('baz',)]
translates to:
"Candidates are in either 'foo' or 'bar', but definitely in 'baz'"
"""
# Note that claim_resources() will use this version as well to
# make allocations by `PUT /allocations/{consumer_uuid}`
version = NESTED_AC_VERSION
qparams = resources.to_querystring()
url = "/allocation_candidates?%s" % qparams
resp = self.get(url, version=version,
global_request_id=context.global_id)
if resp.status_code == 200:
data = resp.json()
return (data['allocation_requests'], data['provider_summaries'],
version)
args = {
'resource_request': str(resources),
'status_code': resp.status_code,
'err_text': resp.text,
}
msg = ("Failed to retrieve allocation candidates from placement "
"API for filters: %(resource_request)s\n"
"Got %(status_code)d: %(err_text)s.")
LOG.error(msg, args)
return None, None, None
@safe_connect
def _get_provider_aggregates(self, context, rp_uuid):
"""Queries the placement API for a resource provider's aggregates.
:param rp_uuid: UUID of the resource provider to grab aggregates for.
:return: A namedtuple comprising:
* .aggregates: A set() of string aggregate UUIDs, which may
be empty if the specified provider is associated with no
aggregates.
* .generation: The resource provider generation.
:raise: ResourceProviderAggregateRetrievalFailed on errors. In
particular, we raise this exception (as opposed to returning
None or the empty set()) if the specified resource provider
does not exist.
"""
resp = self.get("/resource_providers/%s/aggregates" % rp_uuid,
version=AGGREGATE_GENERATION_VERSION,
global_request_id=context.global_id)
if resp.status_code == 200:
data = resp.json()
return AggInfo(aggregates=set(data['aggregates']),
generation=data['resource_provider_generation'])
placement_req_id = get_placement_request_id(resp)
msg = ("[%(placement_req_id)s] Failed to retrieve aggregates from "
"placement API for resource provider with UUID %(uuid)s. "
"Got %(status_code)d: %(err_text)s.")
args = {
'placement_req_id': placement_req_id,
'uuid': rp_uuid,
'status_code': resp.status_code,
'err_text': resp.text,
}
LOG.error(msg, args)
raise exception.ResourceProviderAggregateRetrievalFailed(uuid=rp_uuid)
@safe_connect
def _get_provider_traits(self, context, rp_uuid):
"""Queries the placement API for a resource provider's traits.
:param context: The security context
:param rp_uuid: UUID of the resource provider to grab traits for.
:return: A namedtuple comprising:
* .traits: A set() of string trait names, which may be
empty if the specified provider has no traits.
* .generation: The resource provider generation.
:raise: ResourceProviderTraitRetrievalFailed on errors. In particular,
we raise this exception (as opposed to returning None or the
empty set()) if the specified resource provider does not exist.
"""
resp = self.get("/resource_providers/%s/traits" % rp_uuid,
version='1.6', global_request_id=context.global_id)
if resp.status_code == 200:
json = resp.json()
return TraitInfo(traits=set(json['traits']),
generation=json['resource_provider_generation'])
placement_req_id = get_placement_request_id(resp)
LOG.error(
"[%(placement_req_id)s] Failed to retrieve traits from "
"placement API for resource provider with UUID %(uuid)s. Got "
"%(status_code)d: %(err_text)s.",
{'placement_req_id': placement_req_id, 'uuid': rp_uuid,
'status_code': resp.status_code, 'err_text': resp.text})
raise exception.ResourceProviderTraitRetrievalFailed(uuid=rp_uuid)
@safe_connect
def _get_resource_provider(self, context, uuid):
"""Queries the placement API for a resource provider record with the
supplied UUID.
:param context: The security context
:param uuid: UUID identifier for the resource provider to look up
:return: A dict of resource provider information if found or None if no
such resource provider could be found.
:raise: ResourceProviderRetrievalFailed on error.
"""
resp = self.get("/resource_providers/%s" % uuid,
version=NESTED_PROVIDER_API_VERSION,
global_request_id=context.global_id)
if resp.status_code == 200:
data = resp.json()
return data
elif resp.status_code == 404:
return None
else:
placement_req_id = get_placement_request_id(resp)
msg = ("[%(placement_req_id)s] Failed to retrieve resource "
"provider record from placement API for UUID %(uuid)s. Got "
"%(status_code)d: %(err_text)s.")
args = {
'uuid': uuid,
'status_code': resp.status_code,
'err_text': resp.text,
'placement_req_id': placement_req_id,
}
LOG.error(msg, args)
raise exception.ResourceProviderRetrievalFailed(uuid=uuid)
@safe_connect
def _get_sharing_providers(self, context, agg_uuids):
"""Queries the placement API for a list of the resource providers
associated with any of the specified aggregates and possessing the
MISC_SHARES_VIA_AGGREGATE trait.
:param context: The security context
:param agg_uuids: Iterable of string UUIDs of aggregates to filter on.
:return: A list of dicts of resource provider information, which may be
empty if no provider exists with the specified UUID.
:raise: ResourceProviderRetrievalFailed on error.
"""
if not agg_uuids:
return []
aggs = ','.join(agg_uuids)
url = "/resource_providers?member_of=in:%s&required=%s" % (
aggs, os_traits.MISC_SHARES_VIA_AGGREGATE)
resp = self.get(url, version='1.18',
global_request_id=context.global_id)
if resp.status_code == 200:
return resp.json()['resource_providers']
msg = _("[%(placement_req_id)s] Failed to retrieve sharing resource "
"providers associated with the following aggregates from "
"placement API: %(aggs)s. Got %(status_code)d: %(err_text)s.")
args = {
'aggs': aggs,
'status_code': resp.status_code,
'err_text': resp.text,
'placement_req_id': get_placement_request_id(resp),
}
LOG.error(msg, args)
raise exception.ResourceProviderRetrievalFailed(message=msg % args)
@safe_connect
def _get_providers_in_tree(self, context, uuid):
"""Queries the placement API for a list of the resource providers in
the tree associated with the specified UUID.
:param context: The security context
:param uuid: UUID identifier for the resource provider to look up
:return: A list of dicts of resource provider information, which may be
empty if no provider exists with the specified UUID.
:raise: ResourceProviderRetrievalFailed on error.
"""
resp = self.get("/resource_providers?in_tree=%s" % uuid,
version=NESTED_PROVIDER_API_VERSION,
global_request_id=context.global_id)
if resp.status_code == 200:
return resp.json()['resource_providers']
# Some unexpected error
placement_req_id = get_placement_request_id(resp)
msg = ("[%(placement_req_id)s] Failed to retrieve resource provider "
"tree from placement API for UUID %(uuid)s. Got "
"%(status_code)d: %(err_text)s.")
args = {
'uuid': uuid,
'status_code': resp.status_code,
'err_text': resp.text,
'placement_req_id': placement_req_id,
}
LOG.error(msg, args)
raise exception.ResourceProviderRetrievalFailed(uuid=uuid)
@safe_connect
def _create_resource_provider(self, context, uuid, name,
parent_provider_uuid=None):
"""Calls the placement API to create a new resource provider record.
:param context: The security context
:param uuid: UUID of the new resource provider
:param name: Name of the resource provider
:param parent_provider_uuid: Optional UUID of the immediate parent
:return: A dict of resource provider information object representing
the newly-created resource provider.
:raise: ResourceProviderCreationFailed or
ResourceProviderRetrievalFailed on error.
"""
url = "/resource_providers"
payload = {
'uuid': uuid,
'name': name,
}
if parent_provider_uuid is not None:
payload['parent_provider_uuid'] = parent_provider_uuid
# Bug #1746075: First try the microversion that returns the new
# provider's payload.
resp = self.post(url, payload,
version=POST_RPS_RETURNS_PAYLOAD_API_VERSION,
global_request_id=context.global_id)
placement_req_id = get_placement_request_id(resp)
if resp:
msg = ("[%(placement_req_id)s] Created resource provider record "
"via placement API for resource provider with UUID "
"%(uuid)s and name %(name)s.")
args = {
'uuid': uuid,
'name': name,
'placement_req_id': placement_req_id,
}
LOG.info(msg, args)
return resp.json()
# TODO(efried): Push error codes from placement, and use 'em.
name_conflict = 'Conflicting resource provider name:'
if resp.status_code == 409 and name_conflict not in resp.text:
# Another thread concurrently created a resource provider with the
# same UUID. Log a warning and then just return the resource
# provider object from _get_resource_provider()
msg = ("[%(placement_req_id)s] Another thread already created a "
"resource provider with the UUID %(uuid)s. Grabbing that "
"record from the placement API.")
args = {
'uuid': uuid,
'placement_req_id': placement_req_id,
}
LOG.info(msg, args)
return self._get_resource_provider(context, uuid)
# A provider with the same *name* already exists, or some other error.
msg = ("[%(placement_req_id)s] Failed to create resource provider "
"record in placement API for UUID %(uuid)s. Got "
"%(status_code)d: %(err_text)s.")
args = {
'uuid': uuid,
'status_code': resp.status_code,
'err_text': resp.text,
'placement_req_id': placement_req_id,
}
LOG.error(msg, args)
raise exception.ResourceProviderCreationFailed(name=name)
def _ensure_resource_provider(self, context, uuid, name=None,
parent_provider_uuid=None):
"""Ensures that the placement API has a record of a resource provider
with the supplied UUID. If not, creates the resource provider record in
the placement API for the supplied UUID, passing in a name for the
resource provider.
If found or created, the provider's UUID is returned from this method.
If the resource provider for the supplied uuid was not found and the
resource provider record could not be created in the placement API, an
exception is raised.
If this method returns successfully, callers are assured that the
placement API contains a record of the provider; and that the local
cache of resource provider information contains a record of:
- The specified provider
- All providers in its tree
- All providers associated via aggregate with all providers in said
tree
and for each of those providers:
- The UUIDs of its aggregates
- The trait strings associated with the provider
Note that if the provider did not exist prior to this call, the above
reduces to just the specified provider as a root, with no aggregates or
traits.
:param context: The security context
:param uuid: UUID identifier for the resource provider to ensure exists
:param name: Optional name for the resource provider if the record
does not exist. If empty, the name is set to the UUID
value
:param parent_provider_uuid: Optional UUID of the immediate parent,
which must have been previously _ensured.
"""
# NOTE(efried): We currently have no code path where we need to set the
# parent_provider_uuid on a previously-parent-less provider - so we do
# NOT handle that scenario here.
# TODO(efried): Reinstate this optimization if possible.
# For now, this is removed due to the following:
# - update_provider_tree adds a child with some bogus inventory (bad
# resource class) or trait (invalid trait name).
# - update_from_provider_tree creates the child in placement and adds
# it to the cache, then attempts to add the bogus inventory/trait.
# The latter fails, so update_from_provider_tree invalidates the
# cache entry by removing the child from the cache.
# - Ordinarily, we would rely on the code below (_get_providers_in_tree
# and _provider_tree.populate_from_iterable) to restore the child to
# the cache on the next iteration. BUT since the root is still
# present in the cache, the commented-out block will cause that part
# of this method to be skipped.
# if self._provider_tree.exists(uuid):
# # If we had the requested provider locally, refresh it and its
# # descendants, but only if stale.
# for u in self._provider_tree.get_provider_uuids(uuid):
# self._refresh_associations(context, u, force=False)
# return uuid
# We don't have it locally; check placement or create it.
created_rp = None
rps_to_refresh = self._get_providers_in_tree(context, uuid)
if not rps_to_refresh:
created_rp = self._create_resource_provider(
context, uuid, name or uuid,
parent_provider_uuid=parent_provider_uuid)
# If @safe_connect can't establish a connection to the placement
# service, like if placement isn't running or nova-compute is
# mis-configured for authentication, we'll get None back and need
# to treat it like we couldn't create the provider (because we
# couldn't).
if created_rp is None:
raise exception.ResourceProviderCreationFailed(
name=name or uuid)
# Don't add the created_rp to rps_to_refresh. Since we just
# created it, it has no aggregates or traits.
self._provider_tree.populate_from_iterable(
rps_to_refresh or [created_rp])
# At this point, the whole tree exists in the local cache.
for rp_to_refresh in rps_to_refresh:
# NOTE(efried): _refresh_associations doesn't refresh inventory
# (yet) - see that method's docstring for the why.
self._refresh_and_get_inventory(context, rp_to_refresh['uuid'])
self._refresh_associations(context, rp_to_refresh['uuid'],
force=True)
return uuid
@safe_connect
def _delete_provider(self, rp_uuid, global_request_id=None):
resp = self.delete('/resource_providers/%s' % rp_uuid,
global_request_id=global_request_id)
# Check for 404 since we don't need to warn/raise if we tried to delete
# something which doesn"t actually exist.
if resp or resp.status_code == 404:
if resp:
LOG.info("Deleted resource provider %s", rp_uuid)
# clean the caches
try:
self._provider_tree.remove(rp_uuid)
except ValueError:
pass
self._association_refresh_time.pop(rp_uuid, None)
return
msg = ("[%(placement_req_id)s] Failed to delete resource provider "
"with UUID %(uuid)s from the placement API. Got "
"%(status_code)d: %(err_text)s.")
args = {
'placement_req_id': get_placement_request_id(resp),
'uuid': rp_uuid,
'status_code': resp.status_code,
'err_text': resp.text
}
LOG.error(msg, args)
# On conflict, the caller may wish to delete allocations and
# redrive. (Note that this is not the same as a
# PlacementAPIConflict case.)
if resp.status_code == 409:
raise exception.ResourceProviderInUse()
raise exception.ResourceProviderDeletionFailed(uuid=rp_uuid)
def _get_inventory(self, context, rp_uuid):
url = '/resource_providers/%s/inventories' % rp_uuid
result = self.get(url, global_request_id=context.global_id)
if not result:
# TODO(efried): Log.
return None
return result.json()
def _refresh_and_get_inventory(self, context, rp_uuid):
"""Helper method that retrieves the current inventory for the supplied
resource provider according to the placement API.
If the cached generation of the resource provider is not the same as
the generation returned from the placement API, we update the cached
generation and attempt to update inventory if any exists, otherwise
return empty inventories.
"""
curr = self._get_inventory(context, rp_uuid)
if curr is None:
return None
LOG.debug('Updating ProviderTree inventory for provider %s from '
'_refresh_and_get_inventory using data: %s', rp_uuid,
curr['inventories'])
self._provider_tree.update_inventory(
rp_uuid, curr['inventories'],
generation=curr['resource_provider_generation'])
return curr
def _refresh_associations(self, context, rp_uuid, force=False,
refresh_sharing=True):
"""Refresh aggregates, traits, and (optionally) aggregate-associated
sharing providers for the specified resource provider uuid.
Only refresh if there has been no refresh during the lifetime of
this process, CONF.compute.resource_provider_association_refresh
seconds have passed, or the force arg has been set to True.
Note that we do *not* refresh inventories. The reason is largely
historical: all code paths that get us here are doing inventory refresh
themselves.
:param context: The security context
:param rp_uuid: UUID of the resource provider to check for fresh
aggregates and traits
:param force: If True, force the refresh
:param refresh_sharing: If True, fetch all the providers associated
by aggregate with the specified provider,
including their traits and aggregates (but not
*their* sharing providers).
:raise: On various placement API errors, one of:
- ResourceProviderAggregateRetrievalFailed
- ResourceProviderTraitRetrievalFailed
- ResourceProviderRetrievalFailed
"""
if force or self._associations_stale(rp_uuid):
# Refresh aggregates
agg_info = self._get_provider_aggregates(context, rp_uuid)
# If @safe_connect makes the above return None, this will raise
# TypeError. Good.
aggs, generation = agg_info.aggregates, agg_info.generation
msg = ("Refreshing aggregate associations for resource provider "
"%s, aggregates: %s")
LOG.debug(msg, rp_uuid, ','.join(aggs or ['None']))
# NOTE(efried): This will blow up if called for a RP that doesn't
# exist in our _provider_tree.
self._provider_tree.update_aggregates(
rp_uuid, aggs, generation=generation)
# Refresh traits
trait_info = self._get_provider_traits(context, rp_uuid)
# If @safe_connect makes the above return None, this will raise
# TypeError. Good.
traits, generation = trait_info.traits, trait_info.generation
msg = ("Refreshing trait associations for resource provider %s, "
"traits: %s")
LOG.debug(msg, rp_uuid, ','.join(traits or ['None']))
# NOTE(efried): This will blow up if called for a RP that doesn't
# exist in our _provider_tree.
self._provider_tree.update_traits(
rp_uuid, traits, generation=generation)
if refresh_sharing:
# Refresh providers associated by aggregate
for rp in self._get_sharing_providers(context, aggs):
if not self._provider_tree.exists(rp['uuid']):
# NOTE(efried): Right now sharing providers are always
# treated as roots. This is deliberate. From the
# context of this compute's RP, it doesn't matter if a
# sharing RP is part of a tree.
self._provider_tree.new_root(
rp['name'], rp['uuid'],
generation=rp['generation'])
# Now we have to (populate or) refresh that guy's traits
# and aggregates (but not *his* aggregate-associated
# providers). No need to override force=True for newly-
# added providers - the missing timestamp will always
# trigger them to refresh.
self._refresh_associations(context, rp['uuid'],
force=force,
refresh_sharing=False)
self._association_refresh_time[rp_uuid] = time.time()
def _associations_stale(self, uuid):
"""Respond True if aggregates and traits have not been refreshed
"recently".
Associations are stale if association_refresh_time for this uuid is not
set or is more than CONF.compute.resource_provider_association_refresh
seconds ago.
Always False if CONF.compute.resource_provider_association_refresh is
zero.
"""
rpar = CONF.compute.resource_provider_association_refresh
refresh_time = self._association_refresh_time.get(uuid, 0)
# If refresh is disabled, associations are "never" stale. (But still
# load them if we haven't yet done so.)
if rpar == 0 and refresh_time != 0:
# TODO(efried): If refresh is disabled, we could avoid touching the
# _association_refresh_time dict anywhere, but that would take some
# nontrivial refactoring.
return False
return (time.time() - refresh_time) > rpar
def _update_inventory_attempt(self, context, rp_uuid, inv_data):
"""Update the inventory for this resource provider if needed.
:param context: The security context
:param rp_uuid: The resource provider UUID for the operation
:param inv_data: The new inventory for the resource provider
:returns: True if the inventory was updated (or did not need to be),
False otherwise.
"""
# TODO(jaypipes): Should we really be calling the placement API to get
# the current inventory for every resource provider each and every time
# update_resource_stats() is called? :(
curr = self._refresh_and_get_inventory(context, rp_uuid)
if curr is None:
LOG.debug('No inventory for provider: %s', rp_uuid, inv_data)
return False
cur_gen = curr['resource_provider_generation']
# Check to see if we need to update placement's view
if not self._provider_tree.has_inventory_changed(rp_uuid, inv_data):
LOG.debug('Inventory has not changed for provider %s based '
'on inventory data: %s', rp_uuid, inv_data)
return True
payload = {
'resource_provider_generation': cur_gen,
'inventories': inv_data,
}
url = '/resource_providers/%s/inventories' % rp_uuid
# NOTE(vdrok): in microversion 1.26 it is allowed to have inventory
# records with reserved value equal to total
version = ALLOW_RESERVED_EQUAL_TOTAL_INVENTORY_VERSION
result = self.put(url, payload, version=version,
global_request_id=context.global_id)
if result.status_code == 409:
LOG.info('[%(placement_req_id)s] Inventory update conflict for '
'%(resource_provider_uuid)s with generation ID '
'%(generation)s',
{'placement_req_id': get_placement_request_id(result),
'resource_provider_uuid': rp_uuid,
'generation': cur_gen})
# NOTE(jaypipes): There may be cases when we try to set a
# provider's inventory that results in attempting to delete an
# inventory record for a resource class that has an active
# allocation. We need to catch this particular case and raise an
# exception here instead of returning False, since we should not
# re-try the operation in this case.
#
# A use case for where this can occur is the following:
#
# 1) Provider created for each Ironic baremetal node in Newton
# 2) Inventory records for baremetal node created for VCPU,
# MEMORY_MB and DISK_GB
# 3) A Nova instance consumes the baremetal node and allocation
# records are created for VCPU, MEMORY_MB and DISK_GB matching
# the total amount of those resource on the baremetal node.
# 3) Upgrade to Ocata and now resource tracker wants to set the
# provider's inventory to a single record of resource class
# CUSTOM_IRON_SILVER (or whatever the Ironic node's
# "resource_class" attribute is)
# 4) Scheduler report client sends the inventory list containing a
# single CUSTOM_IRON_SILVER record and placement service
# attempts to delete the inventory records for VCPU, MEMORY_MB
# and DISK_GB. An exception is raised from the placement service
# because allocation records exist for those resource classes,
# and a 409 Conflict is returned to the compute node. We need to
# trigger a delete of the old allocation records and then set
# the new inventory, and then set the allocation record to the
# new CUSTOM_IRON_SILVER record.
rc = _extract_inventory_in_use(result.text)
if rc is not None:
raise exception.InventoryInUse(
resource_classes=rc,
resource_provider=rp_uuid,
)
# Invalidate our cache and re-fetch the resource provider
# to be sure to get the latest generation.
self._provider_tree.remove(rp_uuid)
# NOTE(jaypipes): We don't need to pass a name parameter to
# _ensure_resource_provider() because we know the resource provider
# record already exists. We're just reloading the record here.
self._ensure_resource_provider(context, rp_uuid)
return False
elif not result:
placement_req_id = get_placement_request_id(result)
LOG.warning('[%(placement_req_id)s] Failed to update inventory '
'for resource provider %(uuid)s: %(status)i %(text)s',
{'placement_req_id': placement_req_id,
'uuid': rp_uuid,
'status': result.status_code,
'text': result.text})
# log the body at debug level
LOG.debug('[%(placement_req_id)s] Failed inventory update request '
'for resource provider %(uuid)s with body: %(payload)s',
{'placement_req_id': placement_req_id,
'uuid': rp_uuid,
'payload': payload})
return False
if result.status_code != 200:
placement_req_id = get_placement_request_id(result)
LOG.info('[%(placement_req_id)s] Received unexpected response '
'code %(code)i while trying to update inventory for '
'resource provider %(uuid)s: %(text)s',
{'placement_req_id': placement_req_id,
'uuid': rp_uuid,
'code': result.status_code,
'text': result.text})
return False
# Update our view of the generation for next time
updated_inventories_result = result.json()
new_gen = updated_inventories_result['resource_provider_generation']
LOG.debug('Updating ProviderTree inventory for provider %s with '
'generation %s from _update_inventory_attempt with data: '
'%s', rp_uuid, new_gen, inv_data)
self._provider_tree.update_inventory(rp_uuid, inv_data,
generation=new_gen)
return True
@safe_connect
def _update_inventory(self, context, rp_uuid, inv_data):
for attempt in (1, 2, 3):
if not self._provider_tree.exists(rp_uuid):
# NOTE(danms): Either we failed to fetch/create the RP
# on our first attempt, or a previous attempt had to
# invalidate the cache, and we were unable to refresh
# it. Bail and try again next time.
LOG.warning('Unable to refresh my resource provider record')
return False
if self._update_inventory_attempt(context, rp_uuid, inv_data):
return True
time.sleep(1)