/
sharder.py
1683 lines (1512 loc) · 75.3 KB
/
sharder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import json
import time
from collections import defaultdict
from random import random
import os
import six
from six.moves.urllib.parse import quote
from eventlet import Timeout
from swift.common import internal_client
from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.direct_client import (direct_put_container,
DirectClientException)
from swift.common.exceptions import DeviceUnavailable
from swift.common.ring.utils import is_local_device
from swift.common.swob import str_to_wsgi
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
config_float_value, config_positive_int_value, \
quorum_size, parse_override_options, Everything, config_auto_int_value
from swift.container.backend import ContainerBroker, \
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
SHARD_UPDATE_STATES
from swift.container.replicator import ContainerReplicator
CLEAVE_SUCCESS = 0
CLEAVE_FAILED = 1
CLEAVE_EMPTY = 2
def sharding_enabled(broker):
# NB all shards will by default have been created with
# X-Container-Sysmeta-Sharding set and will therefore be candidates for
# sharding, along with explicitly configured root containers.
sharding = broker.metadata.get('X-Container-Sysmeta-Sharding')
if sharding and config_true_value(sharding[0]):
return True
# if broker has been marked deleted it will have lost sysmeta, but we still
# need to process the broker (for example, to shrink any shard ranges) so
# fallback to checking if it has any shard ranges
if broker.get_shard_ranges():
return True
return False
def make_shard_ranges(broker, shard_data, shards_account_prefix):
timestamp = Timestamp.now()
shard_ranges = []
for data in shard_data:
# Make a copy so we don't mutate the original
kwargs = data.copy()
path = ShardRange.make_path(
shards_account_prefix + broker.root_account,
broker.root_container, broker.container,
timestamp, kwargs.pop('index'))
shard_ranges.append(ShardRange(path, timestamp, **kwargs))
return shard_ranges
def find_missing_ranges(shard_ranges):
"""
Find any ranges in the entire object namespace that are not covered by any
shard range in the given list.
:param shard_ranges: A list of :class:`~swift.utils.ShardRange`
:return: a list of missing ranges
"""
gaps = []
if not shard_ranges:
return ((ShardRange.MIN, ShardRange.MAX),)
if shard_ranges[0].lower > ShardRange.MIN:
gaps.append((ShardRange.MIN, shard_ranges[0].lower))
for first, second in zip(shard_ranges, shard_ranges[1:]):
if first.upper < second.lower:
gaps.append((first.upper, second.lower))
if shard_ranges[-1].upper < ShardRange.MAX:
gaps.append((shard_ranges[-1].upper, ShardRange.MAX))
return gaps
def find_overlapping_ranges(shard_ranges):
"""
Find all pairs of overlapping ranges in the given list.
:param shard_ranges: A list of :class:`~swift.utils.ShardRange`
:return: a set of tuples, each tuple containing ranges that overlap with
each other.
"""
result = set()
for shard_range in shard_ranges:
overlapping = [sr for sr in shard_ranges
if shard_range != sr and shard_range.overlaps(sr)]
if overlapping:
overlapping.append(shard_range)
overlapping.sort()
result.add(tuple(overlapping))
return result
def is_sharding_candidate(shard_range, threshold):
return (shard_range.state == ShardRange.ACTIVE and
shard_range.object_count >= threshold)
def find_sharding_candidates(broker, threshold, shard_ranges=None):
# this should only execute on root containers; the goal is to find
# large shard containers that should be sharded.
# First cut is simple: assume root container shard usage stats are good
# enough to make decision.
# TODO: object counts may well not be the appropriate metric for
# deciding to shrink because a shard with low object_count may have a
# large number of deleted object rows that will need to be merged with
# a neighbour. We may need to expose row count as well as object count.
if shard_ranges is None:
shard_ranges = broker.get_shard_ranges(states=[ShardRange.ACTIVE])
candidates = []
for shard_range in shard_ranges:
if not is_sharding_candidate(shard_range, threshold):
continue
shard_range.update_state(ShardRange.SHARDING,
state_timestamp=Timestamp.now())
shard_range.epoch = shard_range.state_timestamp
candidates.append(shard_range)
return candidates
def find_shrinking_candidates(broker, shrink_threshold, merge_size):
# this should only execute on root containers that have sharded; the
# goal is to find small shard containers that could be retired by
# merging with a neighbour.
# First cut is simple: assume root container shard usage stats are good
# enough to make decision; only merge with upper neighbour so that
# upper bounds never change (shard names include upper bound).
# TODO: object counts may well not be the appropriate metric for
# deciding to shrink because a shard with low object_count may have a
# large number of deleted object rows that will need to be merged with
# a neighbour. We may need to expose row count as well as object count.
shard_ranges = broker.get_shard_ranges()
own_shard_range = broker.get_own_shard_range()
if len(shard_ranges) == 1:
# special case to enable final shard to shrink into root
shard_ranges.append(own_shard_range)
merge_pairs = {}
for donor, acceptor in zip(shard_ranges, shard_ranges[1:]):
if donor in merge_pairs:
# this range may already have been made an acceptor; if so then
# move on. In principle it might be that even after expansion
# this range and its donor(s) could all be merged with the next
# range. In practice it is much easier to reason about a single
# donor merging into a single acceptor. Don't fret - eventually
# all the small ranges will be retired.
continue
if (acceptor.name != own_shard_range.name and
acceptor.state != ShardRange.ACTIVE):
# don't shrink into a range that is not yet ACTIVE
continue
if donor.state not in (ShardRange.ACTIVE, ShardRange.SHRINKING):
# found? created? sharded? don't touch it
continue
proposed_object_count = donor.object_count + acceptor.object_count
if (donor.state == ShardRange.SHRINKING or
(donor.object_count < shrink_threshold and
proposed_object_count < merge_size)):
# include previously identified merge pairs on presumption that
# following shrink procedure is idempotent
merge_pairs[acceptor] = donor
if donor.update_state(ShardRange.SHRINKING):
# Set donor state to shrinking so that next cycle won't use
# it as an acceptor; state_timestamp defines new epoch for
# donor and new timestamp for the expanded acceptor below.
donor.epoch = donor.state_timestamp = Timestamp.now()
if acceptor.lower != donor.lower:
# Update the acceptor container with its expanding state to
# prevent it treating objects cleaved from the donor
# as misplaced.
acceptor.lower = donor.lower
acceptor.timestamp = donor.state_timestamp
return merge_pairs
class CleavingContext(object):
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
last_cleave_to_row=None, cleaving_done=False,
misplaced_done=False, ranges_done=0, ranges_todo=0):
self.ref = ref
self._cursor = None
self.cursor = cursor
self.max_row = max_row
self.cleave_to_row = cleave_to_row
self.last_cleave_to_row = last_cleave_to_row
self.cleaving_done = cleaving_done
self.misplaced_done = misplaced_done
self.ranges_done = ranges_done
self.ranges_todo = ranges_todo
def __iter__(self):
yield 'ref', self.ref
yield 'cursor', self.cursor
yield 'max_row', self.max_row
yield 'cleave_to_row', self.cleave_to_row
yield 'last_cleave_to_row', self.last_cleave_to_row
yield 'cleaving_done', self.cleaving_done
yield 'misplaced_done', self.misplaced_done
yield 'ranges_done', self.ranges_done
yield 'ranges_todo', self.ranges_todo
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
'%s=%r' % prop for prop in self))
def _encode(cls, value):
if value is not None and six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
return value
@property
def cursor(self):
return self._cursor
@cursor.setter
def cursor(self, value):
self._cursor = self._encode(value)
@property
def marker(self):
return self.cursor + '\x00'
@classmethod
def _make_ref(cls, broker):
return broker.get_info()['id']
@classmethod
def load_all(cls, broker):
"""
Returns all cleaving contexts stored in the broker.
:param broker:
:return: list of tuples of (CleavingContext, timestamp)
"""
brokers = broker.get_brokers()
sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps()
for key, (val, timestamp) in sysmeta.items():
# If the value is of length 0, then the metadata is
# marked for deletion
if key.startswith("Context-") and len(val) > 0:
try:
yield cls(**json.loads(val)), timestamp
except ValueError:
continue
@classmethod
def load(cls, broker):
"""
Returns a context dict for tracking the progress of cleaving this
broker's retiring DB. The context is persisted in sysmeta using a key
that is based off the retiring db id and max row. This form of
key ensures that a cleaving context is only loaded for a db that
matches the id and max row when the context was created; if a db is
modified such that its max row changes then a different context, or no
context, will be loaded.
:return: A dict to which cleave progress metadata may be added. The
dict initially has a key ``ref`` which should not be modified by
any caller.
"""
brokers = broker.get_brokers()
ref = cls._make_ref(brokers[0])
data = brokers[-1].get_sharding_sysmeta('Context-' + ref)
data = json.loads(data) if data else {}
data['ref'] = ref
data['max_row'] = brokers[0].get_max_row()
return cls(**data)
def store(self, broker):
broker.set_sharding_sysmeta('Context-' + self.ref,
json.dumps(dict(self)))
def reset(self):
self.cursor = ''
self.ranges_done = 0
self.ranges_todo = 0
self.cleaving_done = False
self.misplaced_done = False
self.last_cleave_to_row = self.cleave_to_row
def start(self):
self.cursor = ''
self.ranges_done = 0
self.ranges_todo = 0
self.cleaving_done = False
self.cleave_to_row = self.max_row
def done(self):
return all((self.misplaced_done, self.cleaving_done,
self.max_row == self.cleave_to_row))
def delete(self, broker):
# These will get reclaimed when `_reclaim_metadata` in
# common/db.py is called.
broker.set_sharding_sysmeta('Context-' + self.ref, '')
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
DEFAULT_SHARD_SHRINK_POINT = 25
DEFAULT_SHARD_MERGE_POINT = 75
class ContainerSharder(ContainerReplicator):
"""Shards containers."""
def __init__(self, conf, logger=None):
logger = logger or get_logger(conf, log_route='container-sharder')
super(ContainerSharder, self).__init__(conf, logger=logger)
if conf.get('auto_create_account_prefix'):
self.logger.warning('Option auto_create_account_prefix is '
'deprecated. Configure '
'auto_create_account_prefix under the '
'swift-constraints section of '
'swift.conf. This option will '
'be ignored in a future release.')
auto_create_account_prefix = \
self.conf['auto_create_account_prefix']
else:
auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
self.shards_account_prefix = (auto_create_account_prefix + 'shards_')
def percent_value(key, default):
try:
value = conf.get(key, default)
return config_float_value(value, 0, 100) / 100.0
except ValueError as err:
raise ValueError("%s: %s" % (str(err), key))
self.shard_shrink_point = percent_value('shard_shrink_point',
DEFAULT_SHARD_SHRINK_POINT)
self.shrink_merge_point = percent_value('shard_shrink_merge_point',
DEFAULT_SHARD_MERGE_POINT)
self.shard_container_threshold = config_positive_int_value(
conf.get('shard_container_threshold',
DEFAULT_SHARD_CONTAINER_THRESHOLD))
self.shrink_size = (self.shard_container_threshold *
self.shard_shrink_point)
self.merge_size = (self.shard_container_threshold *
self.shrink_merge_point)
self.split_size = self.shard_container_threshold // 2
self.scanner_batch_size = config_positive_int_value(
conf.get('shard_scanner_batch_size', 10))
self.cleave_batch_size = config_positive_int_value(
conf.get('cleave_batch_size', 2))
self.cleave_row_batch_size = config_positive_int_value(
conf.get('cleave_row_batch_size', 10000))
self.auto_shard = config_true_value(conf.get('auto_shard', False))
self.sharding_candidates = []
self.recon_candidates_limit = int(
conf.get('recon_candidates_limit', 5))
self.broker_timeout = config_positive_int_value(
conf.get('broker_timeout', 60))
replica_count = self.ring.replica_count
quorum = quorum_size(replica_count)
self.shard_replication_quorum = config_auto_int_value(
conf.get('shard_replication_quorum'), quorum)
if self.shard_replication_quorum > replica_count:
self.logger.warning(
'shard_replication_quorum of %s exceeds replica count %s'
', reducing to %s', self.shard_replication_quorum,
replica_count, replica_count)
self.shard_replication_quorum = replica_count
self.existing_shard_replication_quorum = config_auto_int_value(
conf.get('existing_shard_replication_quorum'),
self.shard_replication_quorum)
if self.existing_shard_replication_quorum > replica_count:
self.logger.warning(
'existing_shard_replication_quorum of %s exceeds replica count'
' %s, reducing to %s', self.existing_shard_replication_quorum,
replica_count, replica_count)
self.existing_shard_replication_quorum = replica_count
# internal client
self.conn_timeout = float(conf.get('conn_timeout', 5))
request_tries = config_positive_int_value(
conf.get('request_tries', 3))
internal_client_conf_path = conf.get('internal_client_conf_path',
'/etc/swift/internal-client.conf')
try:
self.int_client = internal_client.InternalClient(
internal_client_conf_path,
'Swift Container Sharder',
request_tries,
allow_modify_pipeline=False)
except (OSError, IOError) as err:
if err.errno != errno.ENOENT and \
not str(err).endswith(' not found'):
raise
raise SystemExit(
'Unable to load internal client from config: %r (%s)' %
(internal_client_conf_path, err))
self.reported = 0
def _zero_stats(self):
"""Zero out the stats."""
super(ContainerSharder, self)._zero_stats()
# all sharding stats that are additional to the inherited replicator
# stats are maintained under the 'sharding' key in self.stats
self.stats['sharding'] = defaultdict(lambda: defaultdict(int))
self.sharding_candidates = []
def _append_stat(self, category, key, value):
if not self.stats['sharding'][category][key]:
self.stats['sharding'][category][key] = list()
self.stats['sharding'][category][key].append(value)
def _min_stat(self, category, key, value):
current = self.stats['sharding'][category][key]
if not current:
self.stats['sharding'][category][key] = value
else:
self.stats['sharding'][category][key] = min(current, value)
def _max_stat(self, category, key, value):
current = self.stats['sharding'][category][key]
if not current:
self.stats['sharding'][category][key] = value
else:
self.stats['sharding'][category][key] = max(current, value)
def _increment_stat(self, category, key, step=1, statsd=False):
self.stats['sharding'][category][key] += step
if statsd:
statsd_key = '%s_%s' % (category, key)
self.logger.increment(statsd_key)
def _make_stats_info(self, broker, node, own_shard_range):
try:
file_size = os.stat(broker.db_file).st_size
except OSError:
file_size = None
return {'path': broker.db_file,
'node_index': node.get('index'),
'account': broker.account,
'container': broker.container,
'root': broker.root_path,
'object_count': own_shard_range.object_count,
'meta_timestamp': own_shard_range.meta_timestamp.internal,
'file_size': file_size}
def _identify_sharding_candidate(self, broker, node):
own_shard_range = broker.get_own_shard_range()
if is_sharding_candidate(
own_shard_range, self.shard_container_threshold):
self.sharding_candidates.append(
self._make_stats_info(broker, node, own_shard_range))
def _transform_sharding_candidate_stats(self):
category = self.stats['sharding']['sharding_candidates']
candidates = self.sharding_candidates
category['found'] = len(candidates)
candidates.sort(key=lambda c: c['object_count'], reverse=True)
if self.recon_candidates_limit >= 0:
category['top'] = candidates[:self.recon_candidates_limit]
else:
category['top'] = candidates
def _record_sharding_progress(self, broker, node, error):
own_shard_range = broker.get_own_shard_range()
if (broker.get_db_state() in (UNSHARDED, SHARDING) and
own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHARDED)):
info = self._make_stats_info(broker, node, own_shard_range)
info['state'] = own_shard_range.state_text
info['db_state'] = broker.get_db_state()
states = [ShardRange.FOUND, ShardRange.CREATED,
ShardRange.CLEAVED, ShardRange.ACTIVE]
shard_ranges = broker.get_shard_ranges(states=states)
state_count = {}
for state in states:
state_count[ShardRange.STATES[state]] = 0
for shard_range in shard_ranges:
state_count[shard_range.state_text] += 1
info.update(state_count)
info['error'] = error and str(error)
self._append_stat('sharding_in_progress', 'all', info)
def _report_stats(self):
# report accumulated stats since start of one sharder cycle
default_stats = ('attempted', 'success', 'failure')
category_keys = (
('visited', default_stats + ('skipped', 'completed')),
('scanned', default_stats + ('found', 'min_time', 'max_time')),
('created', default_stats),
('cleaved', default_stats + ('min_time', 'max_time',)),
('misplaced', default_stats + ('found', 'placed', 'unplaced')),
('audit_root', default_stats),
('audit_shard', default_stats),
)
now = time.time()
last_report = time.ctime(self.stats['start'])
elapsed = now - self.stats['start']
sharding_stats = self.stats['sharding']
for category, keys in category_keys:
stats = sharding_stats[category]
msg = ' '.join(['%s:%s' % (k, str(stats[k])) for k in keys])
self.logger.info('Since %s %s - %s', last_report, category, msg)
self._transform_sharding_candidate_stats()
dump_recon_cache(
{'sharding_stats': self.stats,
'sharding_time': elapsed,
'sharding_last': now},
self.rcache, self.logger)
self.reported = now
def _periodic_report_stats(self):
if (time.time() - self.reported) >= 3600: # once an hour
self._report_stats()
def _check_node(self, node):
"""
:return: The path to the device, if the node is mounted.
Returns False if the node is unmounted.
"""
if not node:
return False
if not is_local_device(self.ips, self.port,
node['replication_ip'],
node['replication_port']):
return False
try:
return check_drive(self.root, node['device'], self.mount_check)
except ValueError:
self.logger.warning(
'Skipping %(device)s as it is not mounted' % node)
return False
def _fetch_shard_ranges(self, broker, newest=False, params=None,
include_deleted=False):
path = self.int_client.make_path(broker.root_account,
broker.root_container)
params = params or {}
params.setdefault('format', 'json')
headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Deleted': 'true',
'X-Backend-Include-Deleted': str(include_deleted)}
if newest:
headers['X-Newest'] = 'true'
try:
try:
resp = self.int_client.make_request(
'GET', path, headers, acceptable_statuses=(2,),
params=params)
except internal_client.UnexpectedResponse as err:
self.logger.warning("Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
return None
record_type = resp.headers.get('x-backend-record-type')
if record_type != 'shard':
err = 'unexpected record type %r' % record_type
self.logger.error("Failed to get shard ranges from %s: %s",
quote(broker.root_path), err)
return None
try:
data = json.loads(resp.body)
if not isinstance(data, list):
raise ValueError('not a list')
return [ShardRange.from_dict(shard_range)
for shard_range in data]
except (ValueError, TypeError, KeyError) as err:
self.logger.error(
"Failed to get shard ranges from %s: invalid data: %r",
quote(broker.root_path), err)
return None
finally:
self.logger.txn_id = None
def _put_container(self, node, part, account, container, headers, body):
try:
direct_put_container(node, part, account, container,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout,
headers=headers, contents=body)
except DirectClientException as err:
self.logger.warning(
'Failed to put shard ranges to %s:%s/%s: %s',
node['ip'], node['port'], node['device'], err.http_status)
except (Exception, Timeout) as err:
self.logger.exception(
'Failed to put shard ranges to %s:%s/%s: %s',
node['ip'], node['port'], node['device'], err)
else:
return True
return False
def _send_shard_ranges(self, account, container, shard_ranges,
headers=None):
body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
part, nodes = self.ring.get_nodes(account, container)
headers = headers or {}
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
'User-Agent': 'container-sharder %s' % os.getpid(),
'X-Timestamp': Timestamp.now().normal,
'Content-Length': len(body),
'Content-Type': 'application/json'})
pool = GreenAsyncPile(len(nodes))
for node in nodes:
pool.spawn(self._put_container, node, part, account,
container, headers, body)
results = pool.waitall(None)
return results.count(True) >= quorum_size(self.ring.replica_count)
def _get_shard_broker(self, shard_range, root_path, policy_index):
"""
Get a broker for a container db for the given shard range. If one of
the shard container's primary nodes is a local device then that will be
chosen for the db, otherwise the first of the shard container's handoff
nodes that is local will be chosen.
:param shard_range: a :class:`~swift.common.utils.ShardRange`
:param root_path: the path of the shard's root container
:param policy_index: the storage policy index
:returns: a tuple of ``(part, broker, node_id)`` where ``part`` is the
shard container's partition, ``broker`` is an instance of
:class:`~swift.container.backend.ContainerBroker`,
``node_id`` is the id of the selected node.
"""
part = self.ring.get_part(shard_range.account, shard_range.container)
node = self.find_local_handoff_for_part(part)
put_timestamp = Timestamp.now().internal
if not node:
raise DeviceUnavailable(
'No mounted devices found suitable for creating shard broker '
'for %s in partition %s' % (quote(shard_range.name), part))
shard_broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, shard_range.account,
shard_range.container, epoch=shard_range.epoch,
storage_policy_index=policy_index, put_timestamp=put_timestamp)
# Get the valid info into the broker.container, etc
shard_broker.get_info()
shard_broker.merge_shard_ranges(shard_range)
shard_broker.set_sharding_sysmeta('Quoted-Root', quote(root_path))
# NB: we *used* to do
# shard_broker.set_sharding_sysmeta('Root', root_path)
# but that isn't safe for container names with nulls or newlines (or
# possibly some other characters). We consciously *don't* make any
# attempt to set the old meta; during an upgrade, some shards may think
# they are in fact roots, but it cleans up well enough once everyone's
# upgraded.
shard_broker.update_metadata({
'X-Container-Sysmeta-Sharding':
('True', Timestamp.now().internal)})
return part, shard_broker, node['id'], put_timestamp
def _audit_root_container(self, broker):
# This is the root container, and therefore the tome of knowledge,
# all we can do is check there is nothing screwy with the ranges
self._increment_stat('audit_root', 'attempted')
warnings = []
own_shard_range = broker.get_own_shard_range()
if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHARDED):
shard_ranges = broker.get_shard_ranges()
missing_ranges = find_missing_ranges(shard_ranges)
if missing_ranges:
warnings.append(
'missing range(s): %s' %
' '.join(['%s-%s' % (lower, upper)
for lower, upper in missing_ranges]))
for state in ShardRange.STATES:
shard_ranges = broker.get_shard_ranges(states=state)
overlaps = find_overlapping_ranges(shard_ranges)
for overlapping_ranges in overlaps:
warnings.append(
'overlapping ranges in state %s: %s' %
(ShardRange.STATES[state],
' '.join(['%s-%s' % (sr.lower, sr.upper)
for sr in overlapping_ranges])))
if warnings:
self.logger.warning(
'Audit failed for root %s (%s): %s',
broker.db_file, quote(broker.path), ', '.join(warnings))
self._increment_stat('audit_root', 'failure', statsd=True)
return False
self._increment_stat('audit_root', 'success', statsd=True)
return True
def _audit_shard_container(self, broker):
# Get the root view of the world.
self._increment_stat('audit_shard', 'attempted')
warnings = []
errors = []
if not broker.account.startswith(self.shards_account_prefix):
warnings.append('account not in shards namespace %r' %
self.shards_account_prefix)
own_shard_range = broker.get_own_shard_range(no_default=True)
shard_range = None
if own_shard_range:
shard_ranges = self._fetch_shard_ranges(
broker, newest=True,
params={'marker': str_to_wsgi(own_shard_range.lower_str),
'end_marker': str_to_wsgi(own_shard_range.upper_str)},
include_deleted=True)
if shard_ranges:
for shard_range in shard_ranges:
if (shard_range.lower == own_shard_range.lower and
shard_range.upper == own_shard_range.upper and
shard_range.name == own_shard_range.name):
break
else:
# this is not necessarily an error - some replicas of the
# root may not yet know about this shard container
warnings.append('root has no matching shard range')
shard_range = None
else:
warnings.append('unable to get shard ranges from root')
else:
errors.append('missing own shard range')
if warnings:
self.logger.warning(
'Audit warnings for shard %s (%s): %s',
broker.db_file, quote(broker.path), ', '.join(warnings))
if errors:
self.logger.warning(
'Audit failed for shard %s (%s) - skipping: %s',
broker.db_file, quote(broker.path), ', '.join(errors))
self._increment_stat('audit_shard', 'failure', statsd=True)
return False
if shard_range:
self.logger.debug('Updating shard from root %s', dict(shard_range))
broker.merge_shard_ranges(shard_range)
own_shard_range = broker.get_own_shard_range()
delete_age = time.time() - self.reclaim_age
if (own_shard_range.state == ShardRange.SHARDED and
own_shard_range.deleted and
own_shard_range.timestamp < delete_age and
broker.empty()):
broker.delete_db(Timestamp.now().internal)
self.logger.debug('Deleted shard container %s (%s)',
broker.db_file, quote(broker.path))
self._increment_stat('audit_shard', 'success', statsd=True)
return True
def _audit_cleave_contexts(self, broker):
now = Timestamp.now()
for context, last_mod in CleavingContext.load_all(broker):
if Timestamp(last_mod).timestamp + self.reclaim_age < \
now.timestamp:
context.delete(broker)
def _audit_container(self, broker):
if broker.is_deleted():
# if the container has been marked as deleted, all metadata will
# have been erased so no point auditing. But we want it to pass, in
# case any objects exist inside it.
return True
self._audit_cleave_contexts(broker)
if broker.is_root_container():
return self._audit_root_container(broker)
return self._audit_shard_container(broker)
def yield_objects(self, broker, src_shard_range, since_row=None):
"""
Iterates through all objects in ``src_shard_range`` in name order
yielding them in lists of up to CONTAINER_LISTING_LIMIT length.
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
describing the source range.
:param since_row: include only items whose ROWID is greater than
the given row id; by default all rows are included.
:return: a generator of tuples of (list of objects, broker info dict)
"""
for include_deleted in (False, True):
marker = src_shard_range.lower_str
while True:
info = broker.get_info()
info['max_row'] = broker.get_max_row()
start = time.time()
objects = broker.get_objects(
self.cleave_row_batch_size,
marker=marker,
end_marker=src_shard_range.end_marker,
include_deleted=include_deleted,
since_row=since_row)
if objects:
self.logger.debug('got %s objects from %s in %ss',
len(objects), broker.db_file,
time.time() - start)
yield objects, info
if len(objects) < self.cleave_row_batch_size:
break
marker = objects[-1]['name']
def yield_objects_to_shard_range(self, broker, src_shard_range,
dest_shard_ranges):
"""
Iterates through all objects in ``src_shard_range`` to place them in
destination shard ranges provided by the ``next_shard_range`` function.
Yields tuples of (object list, destination shard range in which those
objects belong). Note that the same destination shard range may be
referenced in more than one yielded tuple.
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
describing the source range.
:param dest_shard_ranges: A function which should return a list of
destination shard ranges in name order.
:return: a generator of tuples of
(object list, shard range, broker info dict)
"""
dest_shard_range_iter = dest_shard_range = None
for objs, info in self.yield_objects(broker, src_shard_range):
if not objs:
return
def next_or_none(it):
try:
return next(it)
except StopIteration:
return None
if dest_shard_range_iter is None:
dest_shard_range_iter = iter(dest_shard_ranges())
dest_shard_range = next_or_none(dest_shard_range_iter)
unplaced = False
last_index = next_index = 0
for obj in objs:
if dest_shard_range is None:
# no more destinations: yield remainder of batch and bail
# NB there may be more batches of objects but none of them
# will be placed so no point fetching them
yield objs[last_index:], None, info
return
if obj['name'] <= dest_shard_range.lower:
unplaced = True
elif unplaced:
# end of run of unplaced objects, yield them
yield objs[last_index:next_index], None, info
last_index = next_index
unplaced = False
while (dest_shard_range and
obj['name'] > dest_shard_range.upper):
if next_index != last_index:
# yield the objects in current dest_shard_range
yield (objs[last_index:next_index],
dest_shard_range,
info)
last_index = next_index
dest_shard_range = next_or_none(dest_shard_range_iter)
next_index += 1
if next_index != last_index:
# yield tail of current batch of objects
# NB there may be more objects for the current
# dest_shard_range in the next batch from yield_objects
yield (objs[last_index:next_index],
None if unplaced else dest_shard_range,
info)
def _post_replicate_hook(self, broker, info, responses):
# override superclass behaviour
pass
def _replicate_and_delete(self, broker, dest_shard_range, part,
dest_broker, node_id, info):
success, responses = self._replicate_object(
part, dest_broker.db_file, node_id)
quorum = quorum_size(self.ring.replica_count)
if not success and responses.count(True) < quorum:
self.logger.warning(
'Failed to sufficiently replicate misplaced objects: %s in %s '
'(not removing)', dest_shard_range, quote(broker.path))
return False
if broker.get_info()['id'] != info['id']:
# the db changed - don't remove any objects
success = False
else:
# remove objects up to the max row of the db sampled prior to
# the first object yielded for this destination; objects added
# after that point may not have been yielded and replicated so
# it is not safe to remove them yet
broker.remove_objects(
dest_shard_range.lower_str,
dest_shard_range.upper_str,
max_row=info['max_row'])
success = True
if not success:
self.logger.warning(
'Refused to remove misplaced objects: %s in %s',
dest_shard_range, quote(broker.path))
return success
def _move_objects(self, src_broker, src_shard_range, policy_index,
shard_range_fetcher):
# move objects from src_shard_range in src_broker to destination shard
# ranges provided by shard_range_fetcher
dest_brokers = {} # map shard range -> broker
placed = unplaced = 0
success = True
for objs, dest_shard_range, info in self.yield_objects_to_shard_range(
src_broker, src_shard_range, shard_range_fetcher):
if not dest_shard_range:
unplaced += len(objs)
success = False
continue
if dest_shard_range.name == src_broker.path:
self.logger.debug(
'Skipping source as misplaced objects destination')
# in shrinking context, the misplaced objects might actually be
# correctly placed if the root has expanded this shard but this
# broker has not yet been updated
continue
if dest_shard_range not in dest_brokers:
part, dest_broker, node_id, _junk = self._get_shard_broker(
dest_shard_range, src_broker.root_path, policy_index)
# save the broker info that was sampled prior to the *first*
# yielded objects for this destination
destination = {'part': part,
'dest_broker': dest_broker,
'node_id': node_id,
'info': info}
dest_brokers[dest_shard_range] = destination
else:
destination = dest_brokers[dest_shard_range]
destination['dest_broker'].merge_items(objs)
placed += len(objs)
if unplaced:
self.logger.warning(
'Failed to find destination for at least %s misplaced objects '
'in %s', unplaced, quote(src_broker.path))
# TODO: consider executing the replication jobs concurrently
for dest_shard_range, dest_args in dest_brokers.items():
self.logger.debug('moving misplaced objects found in range %s' %
dest_shard_range)
success &= self._replicate_and_delete(
src_broker, dest_shard_range, **dest_args)
self._increment_stat('misplaced', 'placed', step=placed)
self._increment_stat('misplaced', 'unplaced', step=unplaced)
return success, placed + unplaced
def _make_shard_range_fetcher(self, broker, src_shard_range):
# returns a function that will lazy load shard ranges on demand;
# this means only one lookup is made for all misplaced ranges.
outer = {}
def shard_range_fetcher():
if not outer:
if broker.is_root_container():
ranges = broker.get_shard_ranges(
marker=src_shard_range.lower_str,
end_marker=src_shard_range.end_marker,
states=SHARD_UPDATE_STATES)
else:
# TODO: the root may not yet know about shard ranges to