forked from voldemort/voldemort
/
Repartitioner.java
822 lines (732 loc) · 39.6 KB
/
Repartitioner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.tools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.utils.ClusterUtils;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* RepartitionUtils provides functions that balance the distribution of
* partitions across a cluster.
*
*/
public class Repartitioner {
static Logger logger = Logger.getLogger(Repartitioner.class);
/**
* Recommended (default) number of times to attempt repartitioning.
*/
public final static int DEFAULT_REPARTITION_ATTEMPTS = 5;
/**
* Default number of random partition ID swaps to attempt, if random swaps
* are enabled.
*/
public final static int DEFAULT_RANDOM_SWAP_ATTEMPTS = 100;
/**
* Default number of successful random swaps (i.e., the random swap improves
* balance) after which reparitioning stops, if random swaps are enabled.
*/
public final static int DEFAULT_RANDOM_SWAP_SUCCESSES = 100;
/**
* Default number of greedy partition ID swaps to perform, if greedy swaps
* are enabled. Each greedy partition ID swaps considers (some number of
* partitions per node) X (some number of partitions from rest of cluster)
* and selects the best such swap.
*/
public final static int DEFAULT_GREEDY_SWAP_ATTEMPTS = 5;
/**
* Default setting for which zone IDs to run greedy swap algorithm. null
* implies greedily swapping across all zones.
*/
public final static List<Integer> DEFAULT_GREEDY_ZONE_IDS = null;
/**
* Default (max) number of partition IDs per node to consider, if greedy
* swaps are enabled.
*/
public final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE = 5;
/**
* Default (max) number of partition IDs from all the other nodes in the
* cluster to consider, if greedy swaps are enabled.
*/
public final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE = 25;
/**
* Default limit on length of contiguous partition ID run within a zone. 0
* implies no limit on such runs.
*/
public final static int DEFAULT_MAX_CONTIGUOUS_PARTITIONS = 0;
/**
* Runs a number of distinct algorithms over the specified clusters/store
* defs to better balance partition IDs over nodes such that all nodes have
* similar iops and capacity usage.
*
* The algorithms (in order):
* <ul>
* <li>Get rid of contiguous runs of partition IDs within a zone. Such runs
* make balancing load overall more difficult.
* <li>Balance partition IDs among zones and/or among nodes within zones.
* <li>Randomly swap partition IDs among nodes to improve overall balance.
* (Any swap that improves balance is accepted.)
* <li>Greedily swap partition IDs among nodes to improve overall balance.
* (Some number of swaps are considered and the best of which is accepted.)
* </ul>
*
* This method is used for three key use cases:
* <ul>
* <li>Shuffling : Distribute partition IDs better for an existing cluster.
* <li>Cluster expansion : Distribute partition IDs to take advantage of new
* nodes (added to some of the zones).
* <li>Zone expansion : Distribute partition IDs into a new zone.
* </ul>
*
* @param currentCluster current cluster
* @param currentStoreDefs current store defs
* @param targetCluster target cluster; needed for cluster or zone
* expansion, otherwise pass in same as currentCluster.
* @param targetStoreDefs target store defs; needed for zone expansion,
* otherwise pass in same as currentStores.
* @param outputDir Directory in which to dump cluster xml and analysis
* files.
* @param attempts Number of distinct repartitionings to attempt, the best
* of which is returned.
* @param disableNodeBalancing Disables the core algorithm that balances
* primaries among nodes within each zone.
* @param disableZoneBalancing For the core algorithm that balances
* primaries among nodes in each zone, disable balancing primaries
* among zones.
* @param enableRandomSwaps Enables random swap optimization.
* @param randomSwapAttempts
* @param randomSwapSuccesses
* @param enableGreedySwaps Enables greedy swap optimization.
* @param greedyZoneIds
* @param greedySwapAttempts
* @param greedySwapMaxPartitionsPerNode
* @param greedySwapMaxPartitionsPerZone
* @param maxContiguousPartitionsPerZone
* @return Cluster that has had all specified balancing algorithms run
* against it. The number of zones and number of nodes will match
* that of the specified targetCluster.
*/
public static Cluster repartition(final Cluster currentCluster,
final List<StoreDefinition> currentStoreDefs,
final Cluster targetCluster,
final List<StoreDefinition> targetStoreDefs,
final String outputDir,
final int attempts,
final boolean disableNodeBalancing,
final boolean disableZoneBalancing,
final boolean enableRandomSwaps,
final int randomSwapAttempts,
final int randomSwapSuccesses,
final boolean enableGreedySwaps,
final List<Integer> greedyZoneIds,
final int greedySwapAttempts,
final int greedySwapMaxPartitionsPerNode,
final int greedySwapMaxPartitionsPerZone,
final int maxContiguousPartitionsPerZone) {
PartitionBalance partitionBalance = new PartitionBalance(currentCluster, currentStoreDefs);
RebalanceUtils.dumpAnalysisToFile(outputDir,
RebalanceUtils.currentClusterFileName,
partitionBalance);
Cluster minCluster = targetCluster;
double minUtility = Double.MAX_VALUE;
for(int attempt = 0; attempt < attempts; attempt++) {
Cluster nextCluster = targetCluster;
if(maxContiguousPartitionsPerZone > 0) {
nextCluster = repeatedlyBalanceContiguousPartitionsPerZone(nextCluster,
maxContiguousPartitionsPerZone);
}
if(!disableNodeBalancing) {
nextCluster = balancePrimaryPartitions(nextCluster, !disableZoneBalancing);
}
if(enableRandomSwaps) {
nextCluster = randomShufflePartitions(nextCluster,
randomSwapAttempts,
randomSwapSuccesses,
targetStoreDefs);
}
if(enableGreedySwaps) {
nextCluster = greedyShufflePartitions(nextCluster,
greedySwapAttempts,
greedySwapMaxPartitionsPerNode,
greedySwapMaxPartitionsPerZone,
new ArrayList<Integer>(targetCluster.getZoneIds()),
targetStoreDefs);
}
RebalanceUtils.validateCurrentFinalCluster(currentCluster, nextCluster);
System.out.println("-------------------------\n");
partitionBalance = new PartitionBalance(nextCluster, targetStoreDefs);
double currentUtility = partitionBalance.getUtility();
System.out.println("Optimization number " + attempt + ": " + currentUtility
+ " max/min ratio");
System.out.println("-------------------------\n");
System.out.println(RebalanceUtils.analyzeInvalidMetadataRate(targetCluster,
currentStoreDefs,
nextCluster,
currentStoreDefs));
if(currentUtility <= minUtility) {
minUtility = currentUtility;
minCluster = nextCluster;
RebalanceUtils.dumpClusterToFile(outputDir, RebalanceUtils.finalClusterFileName
+ attempt, minCluster);
RebalanceUtils.dumpAnalysisToFile(outputDir, RebalanceUtils.finalClusterFileName
+ attempt, partitionBalance);
}
System.out.println("-------------------------\n");
}
System.out.println("\n==========================");
System.out.println("Final distribution");
partitionBalance = new PartitionBalance(minCluster, targetStoreDefs);
System.out.println(partitionBalance);
RebalanceUtils.dumpClusterToFile(outputDir, RebalanceUtils.finalClusterFileName, minCluster);
RebalanceUtils.dumpAnalysisToFile(outputDir,
RebalanceUtils.finalClusterFileName,
partitionBalance);
return minCluster;
}
/**
* Determines how many primary partitions each node within each zone should
* have. The list of integers returned per zone is the same length as the
* number of nodes in that zone.
*
* @param targetCluster
* @param targetPartitionsPerZone
* @return A map of zoneId to list of target number of partitions per node
* within zone.
*/
public static HashMap<Integer, List<Integer>> getBalancedNumberOfPrimaryPartitionsPerNode(final Cluster targetCluster,
Map<Integer, Integer> targetPartitionsPerZone) {
HashMap<Integer, List<Integer>> numPartitionsPerNode = Maps.newHashMap();
for(Integer zoneId: targetCluster.getZoneIds()) {
List<Integer> partitionsOnNode = Utils.distributeEvenlyIntoList(targetCluster.getNumberOfNodesInZone(zoneId),
targetPartitionsPerZone.get(zoneId));
numPartitionsPerNode.put(zoneId, partitionsOnNode);
}
return numPartitionsPerNode;
}
/**
* Assign target number of partitions per node to specific node IDs. Then,
* separates Nodes into donorNodes and stealerNodes based on whether the
* node needs to donate or steal primary partitions.
*
* @param targetCluster
* @param numPartitionsPerNodePerZone
* @return a Pair. First element is donorNodes, second element is
* stealerNodes. Each element in the pair is a HashMap of Node to
* Integer where the integer value is the number of partitions to
* store.
*/
public static Pair<HashMap<Node, Integer>, HashMap<Node, Integer>> getDonorsAndStealersForBalance(final Cluster targetCluster,
Map<Integer, List<Integer>> numPartitionsPerNodePerZone) {
HashMap<Node, Integer> donorNodes = Maps.newHashMap();
HashMap<Node, Integer> stealerNodes = Maps.newHashMap();
HashMap<Integer, Integer> numNodesAssignedInZone = Maps.newHashMap();
for(Integer zoneId: targetCluster.getZoneIds()) {
numNodesAssignedInZone.put(zoneId, 0);
}
for(Node node: targetCluster.getNodes()) {
int zoneId = node.getZoneId();
int offset = numNodesAssignedInZone.get(zoneId);
numNodesAssignedInZone.put(zoneId, offset + 1);
int numPartitions = numPartitionsPerNodePerZone.get(zoneId).get(offset);
if(numPartitions < node.getNumberOfPartitions()) {
donorNodes.put(node, numPartitions);
} else if(numPartitions > node.getNumberOfPartitions()) {
stealerNodes.put(node, numPartitions);
}
}
// Print out donor/stealer information
for(Node node: donorNodes.keySet()) {
System.out.println("Donor Node: " + node.getId() + ", zoneId " + node.getZoneId()
+ ", numPartitions " + node.getNumberOfPartitions()
+ ", target number of partitions " + donorNodes.get(node));
}
for(Node node: stealerNodes.keySet()) {
System.out.println("Stealer Node: " + node.getId() + ", zoneId " + node.getZoneId()
+ ", numPartitions " + node.getNumberOfPartitions()
+ ", target number of partitions " + stealerNodes.get(node));
}
return new Pair<HashMap<Node, Integer>, HashMap<Node, Integer>>(donorNodes, stealerNodes);
}
// TODO: (refactor) rename targetCluster -> interimCluster
/**
* This method balances primary partitions among nodes within a zone, and
* optionally primary partitions among zones. The balancing is done at the
* level of partitionIds. Such partition Id movement may, or may not, result
* in data movement during a rebalancing. See RebalancePlan for the object
* responsible for determining which partition-stores move where for a
* specific repartitioning.
*
* @param targetCluster
* @param balanceZones indicates whether or not number of primary partitions
* per zone should be balanced.
* @return
*/
public static Cluster balancePrimaryPartitions(final Cluster targetCluster, boolean balanceZones) {
System.out.println("Balance number of partitions across all nodes and zones.");
Map<Integer, Integer> targetPartitionsPerZone;
if(balanceZones) {
targetPartitionsPerZone = Utils.distributeEvenlyIntoMap(targetCluster.getZoneIds(),
targetCluster.getNumberOfPartitions());
System.out.println("numPartitionsPerZone");
for(int zoneId: targetCluster.getZoneIds()) {
System.out.println(zoneId + " : "
+ targetCluster.getNumberOfPartitionsInZone(zoneId) + " -> "
+ targetPartitionsPerZone.get(zoneId));
}
System.out.println("numNodesPerZone");
for(int zoneId: targetCluster.getZoneIds()) {
System.out.println(zoneId + " : " + targetCluster.getNumberOfNodesInZone(zoneId));
}
} else {
// Keep number of partitions per zone the same.
targetPartitionsPerZone = new HashMap<Integer, Integer>();
for(int zoneId: targetCluster.getZoneIds()) {
targetPartitionsPerZone.put(zoneId,
targetCluster.getNumberOfPartitionsInZone(zoneId));
}
}
HashMap<Integer, List<Integer>> numPartitionsPerNodeByZone = getBalancedNumberOfPrimaryPartitionsPerNode(targetCluster,
targetPartitionsPerZone);
Pair<HashMap<Node, Integer>, HashMap<Node, Integer>> donorsAndStealers = getDonorsAndStealersForBalance(targetCluster,
numPartitionsPerNodeByZone);
HashMap<Node, Integer> donorNodes = donorsAndStealers.getFirst();
List<Node> donorNodeKeys = new ArrayList<Node>(donorNodes.keySet());
HashMap<Node, Integer> stealerNodes = donorsAndStealers.getSecond();
List<Node> stealerNodeKeys = new ArrayList<Node>(stealerNodes.keySet());
/*
* There is no "intelligence" here about which partition IDs are moved
* where. The RebalancePlan object owns determining how to move data
* around to meet a specific repartitioning. That said, a little bit of
* intelligence here may go a long way. For example, for zone expansion
* data could be minimized by:
*
* (1) Selecting a minimal # of partition IDs for the new zoneto
* minimize how much the ring in existing zones is perturbed;
*
* (2) Selecting partitions for the new zone from contiguous runs of
* partition IDs in other zones that are not currently n-ary partitions
* for other primary partitions;
*
* (3) Some combination of (1) and (2)...
*/
// Go over every stealerNode and steal partition Ids from donor nodes
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
Collections.shuffle(stealerNodeKeys, new Random(System.currentTimeMillis()));
for(Node stealerNode: stealerNodeKeys) {
int partitionsToSteal = stealerNodes.get(stealerNode)
- stealerNode.getNumberOfPartitions();
System.out.println("Node (" + stealerNode.getId() + ") in zone ("
+ stealerNode.getZoneId() + ") has partitionsToSteal of "
+ partitionsToSteal);
while(partitionsToSteal > 0) {
Collections.shuffle(donorNodeKeys, new Random(System.currentTimeMillis()));
// Repeatedly loop over donor nodes to distribute stealing
for(Node donorNode: donorNodeKeys) {
Node currentDonorNode = returnCluster.getNodeById(donorNode.getId());
// Only steal from donor nodes with extra partitions
int partitionsToDonate = currentDonorNode.getNumberOfPartitions()
- donorNodes.get(donorNode);
if(partitionsToDonate <= 0) {
continue;
}
List<Integer> donorPartitions = Lists.newArrayList(currentDonorNode.getPartitionIds());
Collections.shuffle(donorPartitions, new Random(System.currentTimeMillis()));
for(int donorPartition: donorPartitions) {
Cluster intermediateCluster = RebalanceUtils.createUpdatedCluster(returnCluster,
stealerNode.getId(),
Lists.newArrayList(donorPartition));
returnCluster = intermediateCluster;
partitionsToSteal--;
partitionsToDonate--;
System.out.println("Stealer node " + stealerNode.getId() + ", donor node "
+ currentDonorNode.getId() + ", partition stolen "
+ donorPartition);
if(partitionsToSteal == 0 || partitionsToDonate == 0)
break;
}
if(partitionsToSteal == 0)
break;
}
}
}
return returnCluster;
}
/**
* Loops over cluster and repeatedly tries to break up contiguous runs of
* partitions. After each phase of breaking up contiguous partitions, random
* partitions are selected to move between zones to balance the number of
* partitions in each zone. The second phase may re-introduce contiguous
* partition runs in another zone. Therefore, this overall process is
* repeated multiple times.
*
* @param nextCluster
* @param maxContiguousPartitionsPerZone See RebalanceCLI.
* @return
*/
public static Cluster repeatedlyBalanceContiguousPartitionsPerZone(final Cluster targetCluster,
final int maxContiguousPartitionsPerZone) {
System.out.println("Looping to evenly balance partitions across zones while limiting contiguous partitions");
// This loop is hard to make definitive. I.e., there are corner cases
// for small clusters and/or clusters with few partitions for which it
// may be impossible to achieve tight limits on contiguous run lenghts.
// Therefore, a constant number of loops are run. Note that once the
// goal is reached, the loop becomes a no-op.
int repeatContigBalance = 10;
Cluster nextCluster = targetCluster;
for(int i = 0; i < repeatContigBalance; i++) {
nextCluster = balanceContiguousPartitionsPerZone(nextCluster,
maxContiguousPartitionsPerZone);
nextCluster = balancePrimaryPartitions(nextCluster, false);
System.out.println("Completed round of balancing contiguous partitions: round "
+ (i + 1) + " of " + repeatContigBalance);
}
return nextCluster;
}
/**
* Ensures that no more than maxContiguousPartitionsPerZone partitions are
* contiguous within a single zone.
*
* Moves the necessary partitions to break up contiguous runs from each zone
* to some other random zone/node. There is some chance that such random
* moves could result in contiguous partitions in other zones.
*
* @param targetCluster Target cluster metadata
* @param maxContiguousPartitionsPerZone See RebalanceCLI.
* @return Return a pair of cluster metadata and number of primary
* partitions that have moved.
*/
public static Cluster balanceContiguousPartitionsPerZone(final Cluster targetCluster,
final int maxContiguousPartitionsPerZone) {
System.out.println("Balance number of contiguous partitions within a zone.");
System.out.println("numPartitionsPerZone");
for(int zoneId: targetCluster.getZoneIds()) {
System.out.println(zoneId + " : " + targetCluster.getNumberOfPartitionsInZone(zoneId));
}
System.out.println("numNodesPerZone");
for(int zoneId: targetCluster.getZoneIds()) {
System.out.println(zoneId + " : " + targetCluster.getNumberOfNodesInZone(zoneId));
}
// Break up contiguous partitions within each zone
HashMap<Integer, List<Integer>> partitionsToRemoveFromZone = Maps.newHashMap();
System.out.println("Contiguous partitions");
for(Integer zoneId: targetCluster.getZoneIds()) {
System.out.println("\tZone: " + zoneId);
Map<Integer, Integer> partitionToRunLength = ClusterUtils.getMapOfContiguousPartitions(targetCluster,
zoneId);
List<Integer> partitionsToRemoveFromThisZone = new ArrayList<Integer>();
for(Map.Entry<Integer, Integer> entry: partitionToRunLength.entrySet()) {
if(entry.getValue() > maxContiguousPartitionsPerZone) {
List<Integer> contiguousPartitions = new ArrayList<Integer>(entry.getValue());
for(int partitionId = entry.getKey(); partitionId < entry.getKey()
+ entry.getValue(); partitionId++) {
contiguousPartitions.add(partitionId
% targetCluster.getNumberOfPartitions());
}
System.out.println("Contiguous partitions: " + contiguousPartitions);
partitionsToRemoveFromThisZone.addAll(Utils.removeItemsToSplitListEvenly(contiguousPartitions,
maxContiguousPartitionsPerZone));
}
}
partitionsToRemoveFromZone.put(zoneId, partitionsToRemoveFromThisZone);
System.out.println("\t\tPartitions to remove: " + partitionsToRemoveFromThisZone);
}
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
Random r = new Random();
for(int zoneId: returnCluster.getZoneIds()) {
for(int partitionId: partitionsToRemoveFromZone.get(zoneId)) {
// Pick a random other zone Id
List<Integer> otherZoneIds = new ArrayList<Integer>();
for(int otherZoneId: returnCluster.getZoneIds()) {
if(otherZoneId != zoneId) {
otherZoneIds.add(otherZoneId);
}
}
int whichOtherZoneId = otherZoneIds.get(r.nextInt(otherZoneIds.size()));
// Pick a random node from other zone ID
int whichNodeOffset = r.nextInt(returnCluster.getNumberOfNodesInZone(whichOtherZoneId));
int whichNodeId = new ArrayList<Integer>(returnCluster.getNodeIdsInZone(whichOtherZoneId)).get(whichNodeOffset);
// Steal partition from one zone to another!
returnCluster = RebalanceUtils.createUpdatedCluster(returnCluster,
whichNodeId,
Lists.newArrayList(partitionId));
}
}
return returnCluster;
}
/**
* Swaps two specified partitions
*
* @return modified cluster metadata.
*/
public static Cluster swapPartitions(final Cluster targetCluster,
final int nodeIdA,
final int partitionIdA,
final int nodeIdB,
final int partitionIdB) {
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
// Swap partitions between nodes!
returnCluster = RebalanceUtils.createUpdatedCluster(returnCluster,
nodeIdA,
Lists.newArrayList(partitionIdB));
returnCluster = RebalanceUtils.createUpdatedCluster(returnCluster,
nodeIdB,
Lists.newArrayList(partitionIdA));
return returnCluster;
}
/**
* Within a single zone, swaps one random partition on one random node with
* another random partition on different random node.
*
* @param targetCluster
* @param zoneId Zone ID within which to shuffle partitions
* @return
*/
public static Cluster swapRandomPartitionsWithinZone(final Cluster targetCluster,
final int zoneId) {
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
Random r = new Random();
List<Integer> nodeIdsInZone = new ArrayList<Integer>(targetCluster.getNodeIdsInZone(zoneId));
if(nodeIdsInZone.size() == 0) {
return returnCluster;
}
// Select random stealer node
int stealerNodeOffset = r.nextInt(nodeIdsInZone.size());
Integer stealerNodeId = nodeIdsInZone.get(stealerNodeOffset);
// Select random stealer partition
List<Integer> stealerPartitions = returnCluster.getNodeById(stealerNodeId)
.getPartitionIds();
if(stealerPartitions.size() == 0) {
return targetCluster;
}
int stealerPartitionOffset = r.nextInt(stealerPartitions.size());
int stealerPartitionId = stealerPartitions.get(stealerPartitionOffset);
// Select random donor node
List<Integer> donorNodeIds = new ArrayList<Integer>();
donorNodeIds.addAll(nodeIdsInZone);
donorNodeIds.remove(stealerNodeId);
if(donorNodeIds.isEmpty()) { // No donor nodes!
return returnCluster;
}
int donorIdOffset = r.nextInt(donorNodeIds.size());
Integer donorNodeId = donorNodeIds.get(donorIdOffset);
// Select random donor partition
List<Integer> donorPartitions = returnCluster.getNodeById(donorNodeId).getPartitionIds();
int donorPartitionOffset = r.nextInt(donorPartitions.size());
int donorPartitionId = donorPartitions.get(donorPartitionOffset);
return swapPartitions(returnCluster,
stealerNodeId,
stealerPartitionId,
donorNodeId,
donorPartitionId);
}
/**
* Randomly shuffle partitions between nodes within every zone.
*
* @param targetCluster Target cluster object.
* @param randomSwapAttempts See RebalanceCLI.
* @param randomSwapSuccesses See RebalanceCLI.
* @param storeDefs List of store definitions
* @return
*/
public static Cluster randomShufflePartitions(final Cluster targetCluster,
final int randomSwapAttempts,
final int randomSwapSuccesses,
List<StoreDefinition> storeDefs) {
List<Integer> zoneIds = new ArrayList<Integer>(targetCluster.getZoneIds());
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility();
int successes = 0;
for(int i = 0; i < randomSwapAttempts; i++) {
Collections.shuffle(zoneIds, new Random(System.currentTimeMillis()));
for(Integer zoneId: zoneIds) {
Cluster shuffleResults = swapRandomPartitionsWithinZone(returnCluster, zoneId);
double nextUtility = new PartitionBalance(shuffleResults, storeDefs).getUtility();
if(nextUtility < currentUtility) {
System.out.println("Swap improved max-min ratio: " + currentUtility + " -> "
+ nextUtility + " (improvement " + successes
+ " on swap attempt " + i + " in zone " + zoneId + ")");
successes++;
returnCluster = shuffleResults;
currentUtility = nextUtility;
}
}
if(successes >= randomSwapSuccesses) {
// Enough successes, move on.
break;
}
}
return returnCluster;
}
/**
* For each node in specified zones, tries swapping some minimum number of
* random partitions per node with some minimum number of random partitions
* from other specified nodes. Chooses the best swap in each iteration.
* Large values of the greedSwapMaxPartitions... arguments make this method
* equivalent to comparing every possible swap. This may get very expensive.
*
* @param targetCluster
* @param zoneId Zone ID within which to shuffle partitions
* @param greedySwapMaxPartitionsPerNode See RebalanceCLI.
* @param greedySwapMaxPartitionsPerZone See RebalanceCLI.
* @param storeDefs
* @return
*/
public static Cluster swapGreedyRandomPartitions(final Cluster targetCluster,
final List<Integer> nodeIds,
final int greedySwapMaxPartitionsPerNode,
final int greedySwapMaxPartitionsPerZone,
List<StoreDefinition> storeDefs) {
System.out.println("GreedyRandom : nodeIds:" + nodeIds);
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility();
int nodeIdA = -1;
int nodeIdB = -1;
int partitionIdA = -1;
int partitionIdB = -1;
for(int nodeIdEh: nodeIds) {
System.out.println("GreedyRandom : processing nodeId:" + nodeIdEh);
List<Integer> partitionIdsEh = new ArrayList<Integer>();
partitionIdsEh.addAll(returnCluster.getNodeById(nodeIdEh).getPartitionIds());
Collections.shuffle(partitionIdsEh);
int maxPartitionsInEh = Math.min(greedySwapMaxPartitionsPerNode, partitionIdsEh.size());
for(int offsetEh = 0; offsetEh < maxPartitionsInEh; ++offsetEh) {
Integer partitionIdEh = partitionIdsEh.get(offsetEh);
List<Pair<Integer, Integer>> partitionIdsZone = new ArrayList<Pair<Integer, Integer>>();
for(int nodeIdBee: nodeIds) {
if(nodeIdBee == nodeIdEh)
continue;
for(Integer partitionIdBee: returnCluster.getNodeById(nodeIdBee)
.getPartitionIds()) {
partitionIdsZone.add(new Pair<Integer, Integer>(nodeIdBee, partitionIdBee));
}
}
Collections.shuffle(partitionIdsZone);
int maxPartitionsInZone = Math.min(greedySwapMaxPartitionsPerZone,
partitionIdsZone.size());
for(int offsetZone = 0; offsetZone < maxPartitionsInZone; offsetZone++) {
Integer nodeIdBee = partitionIdsZone.get(offsetZone).getFirst();
Integer partitionIdBee = partitionIdsZone.get(offsetZone).getSecond();
Cluster swapResult = swapPartitions(returnCluster,
nodeIdEh,
partitionIdEh,
nodeIdBee,
partitionIdBee);
double swapUtility = new PartitionBalance(swapResult, storeDefs).getUtility();
if(swapUtility < currentUtility) {
currentUtility = swapUtility;
System.out.println(" -> " + currentUtility);
nodeIdA = nodeIdEh;
partitionIdA = partitionIdEh;
nodeIdB = nodeIdBee;
partitionIdB = partitionIdBee;
}
}
}
}
if(nodeIdA == -1) {
return returnCluster;
}
return swapPartitions(returnCluster, nodeIdA, partitionIdA, nodeIdB, partitionIdB);
}
/**
* Within a single zone, tries swapping some minimum number of random
* partitions per node with some minimum number of random partitions from
* other nodes within the zone. Chooses the best swap in each iteration.
* Large values of the greedSwapMaxPartitions... arguments make this method
* equivalent to comparing every possible swap. This is very expensive.
*
* Normal case should be :
*
* #zones X #nodes/zone X max partitions/node X max partitions/zone
*
* @param targetCluster Target cluster object.
* @param greedyAttempts See RebalanceCLI.
* @param greedySwapMaxPartitionsPerNode See RebalanceCLI.
* @param greedySwapMaxPartitionsPerZone See RebalanceCLI.
* @param zoneIds The set of zoneIds to do. Each zone is done independently.
* null will consider all nodes in entire cluster at once.
* @param storeDefs
* @return
*/
/**
*
* @param targetCluster
* @param greedyAttempts
* @param greedySwapMaxPartitionsPerNode
* @param greedySwapMaxPartitionsPerZone
* @param zoneIds
* @param storeDefs
* @return
*/
public static Cluster greedyShufflePartitions(final Cluster targetCluster,
final int greedyAttempts,
final int greedySwapMaxPartitionsPerNode,
final int greedySwapMaxPartitionsPerZone,
List<Integer> zoneIds,
List<StoreDefinition> storeDefs) {
final int specialZoneId = -1;
if(zoneIds == null) {
zoneIds = new ArrayList<Integer>();
zoneIds.add(specialZoneId); // Special value.
}
Cluster returnCluster = ClusterUtils.copyCluster(targetCluster);
if(zoneIds.isEmpty()) {
logger.warn("greedyShufflePartitions invoked with empty list of zone IDs.");
return returnCluster;
}
double currentUtility = new PartitionBalance(returnCluster, storeDefs).getUtility();
for(int i = 0; i < greedyAttempts; i++) {
Collections.shuffle(zoneIds, new Random(System.currentTimeMillis()));
for(Integer zoneId: zoneIds) {
System.out.println("Greedy swap attempt: zone " + zoneId + " , attempt " + i
+ " of " + greedyAttempts);
List<Integer> nodeIds;
if(zoneId == specialZoneId) {
nodeIds = new ArrayList<Integer>(targetCluster.getNodeIds());
} else {
nodeIds = new ArrayList<Integer>(targetCluster.getNodeIdsInZone(zoneId));
}
Cluster shuffleResults = swapGreedyRandomPartitions(returnCluster,
nodeIds,
greedySwapMaxPartitionsPerNode,
greedySwapMaxPartitionsPerZone,
storeDefs);
double nextUtility = new PartitionBalance(shuffleResults, storeDefs).getUtility();
if(nextUtility == currentUtility) {
System.out.println("Not improving for zone: " + zoneId);
} else {
System.out.println("Swap improved max-min ratio: " + currentUtility + " -> "
+ nextUtility + " (swap attempt " + i + " in zone " + zoneId
+ ")");
returnCluster = shuffleResults;
currentUtility = nextUtility;
}
}
}
return returnCluster;
}
}