forked from voldemort/voldemort
/
RebalanceController.java
689 lines (616 loc) · 32.6 KB
/
RebalanceController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
/*
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.client.rebalance;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.task.DonorBasedRebalanceTask;
import voldemort.client.rebalance.task.RebalanceTask;
import voldemort.client.rebalance.task.StealerBasedRebalanceTask;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.StoreDefinition;
import voldemort.utils.NodeUtils;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.StoreDefinitionUtils;
import voldemort.utils.Utils;
import voldemort.versioning.Versioned;
import com.google.common.collect.Lists;
/**
* Executes a RebalancePlan.
*
*/
public class RebalanceController {
// TODO: Remove server side "optimization" that does not bother to steal
// partition-stores it already hosts. That code will be unnecessary. This
// also affects AdminClient that has an override of this option. Do not
// complete this work until the atomic metadata update is merged with this
// branch. Otherwise, there will be conflicts on .proto changes.
private static final Logger logger = Logger.getLogger(RebalanceController.class);
private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##");
private final AdminClient adminClient;
private final Cluster currentCluster;
private final List<StoreDefinition> currentStoreDefs;
public final static int MAX_PARALLEL_REBALANCING = 1;
public final static int MAX_TRIES_REBALANCING = 2;
public final static long REBALANCING_CLIENT_TIMEOUT_SEC = TimeUnit.DAYS.toSeconds(30);
public final static boolean STEALER_BASED_REBALANCING = true;
private final int maxParallelRebalancing;
private final int maxTriesRebalancing;
private final long rebalancingClientTimeoutSeconds;
private final boolean stealerBasedRebalancing;
public RebalanceController(String bootstrapUrl,
int maxParallelRebalancing,
int maxTriesRebalancing,
long rebalancingClientTimeoutSeconds,
boolean stealerBased) {
this.adminClient = new AdminClient(bootstrapUrl,
new AdminClientConfig(),
new ClientConfig());
Pair<Cluster, List<StoreDefinition>> pair = getCurrentClusterState();
this.currentCluster = pair.getFirst();
this.currentStoreDefs = pair.getSecond();
this.maxParallelRebalancing = maxParallelRebalancing;
this.maxTriesRebalancing = maxTriesRebalancing;
this.rebalancingClientTimeoutSeconds = rebalancingClientTimeoutSeconds;
this.stealerBasedRebalancing = stealerBased;
}
/**
* Probe the existing cluster to retrieve the current cluster xml and stores
* xml.
*
* @return Pair of Cluster and List<StoreDefinition> from current cluster.
*/
private Pair<Cluster, List<StoreDefinition>> getCurrentClusterState() {
// Retrieve the latest cluster metadata from the existing nodes
Versioned<Cluster> currentVersionedCluster = RebalanceUtils.getLatestCluster(NodeUtils.getNodeIds(Lists.newArrayList(adminClient.getAdminClientCluster()
.getNodes())),
adminClient);
Cluster cluster = currentVersionedCluster.getValue();
List<StoreDefinition> storeDefs = RebalanceUtils.getCurrentStoreDefinitions(cluster,
adminClient);
return new Pair<Cluster, List<StoreDefinition>>(cluster, storeDefs);
}
/**
* Construct a plan for the specified final cluster/stores & batchSize given
* the current cluster/stores & configuration of the RebalanceController.
*
* @param finalCluster
* @param finalStoreDefs Needed for zone expansion/shrinking.
* @param batchSize
* @return
*/
public RebalancePlan getPlan(Cluster finalCluster,
List<StoreDefinition> finalStoreDefs,
int batchSize) {
RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs);
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);
String outputDir = null;
return new RebalancePlan(currentCluster,
currentStoreDefs,
finalCluster,
finalStoreDefs,
batchSize,
outputDir);
}
/**
* Construct a plan for the specified final cluster & batchSize given the
* current cluster/stores & configuration of the RebalanceController.
*
* @param finalCluster
* @param batchSize
* @return
*/
public RebalancePlan getPlan(Cluster finalCluster, int batchSize) {
return getPlan(finalCluster, currentStoreDefs, batchSize);
}
public void rebalance(final RebalancePlan rebalancePlan) {
Cluster finalCluster = rebalancePlan.getFinalCluster();
List<StoreDefinition> finalStores = rebalancePlan.getFinalStores();
validatePlan(rebalancePlan);
prepareForRebalance(finalCluster, finalStores);
logger.info("Propagating cluster " + finalCluster + " to all nodes");
// TODO: (atomic cluster/stores update) Add finalStores here so that
// cluster & stores can be updated atomically. Need to rebase first.
RebalanceUtils.propagateCluster(adminClient, finalCluster);
executePlan(rebalancePlan);
}
/**
* Validates all aspects of the plan (i.e., all config files)
*
* @param rebalancePlan
*/
private void validatePlan(RebalancePlan rebalancePlan) {
logger.info("Validating plan state.");
Cluster currentCluster = rebalancePlan.getCurrentCluster();
List<StoreDefinition> currentStores = rebalancePlan.getCurrentStores();
Cluster finalCluster = rebalancePlan.getFinalCluster();
List<StoreDefinition> finalStores = rebalancePlan.getFinalStores();
RebalanceUtils.validateClusterStores(currentCluster, currentStores);
RebalanceUtils.validateClusterStores(finalCluster, finalStores);
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);
RebalanceUtils.validateRebalanceStore(currentStores);
RebalanceUtils.validateRebalanceStore(finalStores);
}
/**
* Validates deployed cluster:
* <ul>
* <li>sets local admin client to finalCluster
* <li>checks that all servers are currently in normal state
* <li>confirms read-only stores can be rebalanced.
* </ul>
*
* @param finalCluster
* @param finalStores
*/
private void prepareForRebalance(Cluster finalCluster, List<StoreDefinition> finalStores) {
logger.info("Validating state of deployed cluster.");
// Reset the cluster that the admin client points at
adminClient.setAdminClientCluster(finalCluster);
// Validate that all the nodes ( new + old ) are in normal state
RebalanceUtils.checkEachServerInNormalState(finalCluster, adminClient);
// Verify all old RO stores exist at version 2
RebalanceUtils.validateReadOnlyStores(finalCluster, finalStores, adminClient);
}
/**
* Executes the rebalance plan. Does so batch-by-batch. Between each batch,
* status is dumped to logger.info.
*
* @param rebalancePlan
*/
private void executePlan(RebalancePlan rebalancePlan) {
logger.info("Starting rebalancing!");
int batchCount = 0;
int partitionStoreCount = 0;
long totalTimeMs = 0;
List<RebalanceBatchPlan> entirePlan = rebalancePlan.getPlan();
int numBatches = entirePlan.size();
int numPartitionStores = rebalancePlan.getPartitionStoresMoved();
for(RebalanceBatchPlan batchPlan: entirePlan) {
logger.info("======== REBALANCING BATCH " + (batchCount + 1) + " ========");
RebalanceUtils.printLog(batchCount, logger, batchPlan.toString());
long startTimeMs = System.currentTimeMillis();
// ACTUALLY DO A BATCH OF REBALANCING!
executeBatch(batchCount, batchPlan);
totalTimeMs += (System.currentTimeMillis() - startTimeMs);
// Bump up the statistics
batchCount++;
partitionStoreCount += batchPlan.getPartitionStoreMoves();
batchStatusLog(batchCount,
numBatches,
partitionStoreCount,
numPartitionStores,
totalTimeMs);
}
}
/**
* Pretty print a progress update after each batch complete.
*
* @param id
* @param batchCount
* @param numBatches
* @param partitionStoreCount
* @param numPartitionStores
* @param totalTimeMs
*/
private void batchStatusLog(int batchCount,
int numBatches,
int partitionStoreCount,
int numPartitionStores,
long totalTimeMs) {
// Calculate the estimated end time and pretty print stats
double rate = 1;
long estimatedTimeMs = 0;
if(numPartitionStores > 0) {
rate = partitionStoreCount / numPartitionStores;
estimatedTimeMs = (long) (totalTimeMs / rate) - totalTimeMs;
}
StringBuilder sb = new StringBuilder();
sb.append("Batch Complete!")
.append(Utils.NEWLINE)
.append("\tbatches moved: ")
.append(batchCount)
.append(" out of ")
.append(numBatches)
.append(Utils.NEWLINE)
.append("\tPartition stores moved: ")
.append(partitionStoreCount)
.append(" out of ")
.append(numPartitionStores)
.append(Utils.NEWLINE)
.append("\tPercent done: ")
.append(decimalFormatter.format(rate * 100.0))
.append(Utils.NEWLINE)
.append("\tEstimated time left: ")
.append(estimatedTimeMs)
.append(" ms (")
.append(TimeUnit.MILLISECONDS.toHours(estimatedTimeMs))
.append(" hours)");
RebalanceUtils.printLog(batchCount, logger, sb.toString());
}
/**
* Executes a batch plan.
*
* @param batchCount Used as the ID of the batch plan. This allows related
* tasks on client- & server-side to pretty print messages in a
* manner that debugging can track specific batch plans across the
* cluster.
* @param batchPlan The batch plan...
*/
private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
final Cluster batchCurrentCluster = batchPlan.getCurrentCluster();
final Cluster batchFinalCluster = batchPlan.getFinalCluster();
final List<StoreDefinition> batchStoreDefs = batchPlan.getStoreDefs();
try {
final List<RebalancePartitionsInfo> rebalancePartitionsInfoList = batchPlan.getBatchPlan();
if(rebalancePartitionsInfoList.isEmpty()) {
RebalanceUtils.printLog(batchCount, logger, "Skipping batch " + batchCount
+ " since it is empty.");
// Even though there is no rebalancing work to do, cluster
// metadata must be updated so that the server is aware of the
// new cluster xml.
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
rebalancePartitionsInfoList,
false,
true,
false,
false,
true);
return;
}
RebalanceUtils.printLog(batchCount, logger, "Starting batch " + batchCount + ".");
// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs,
true);
List<StoreDefinition> readWriteStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs,
false);
boolean hasReadOnlyStores = readOnlyStoreDefs != null && readOnlyStoreDefs.size() > 0;
boolean hasReadWriteStores = readWriteStoreDefs != null
&& readWriteStoreDefs.size() > 0;
// STEP 1 - Cluster state change
boolean finishedReadOnlyPhase = false;
List<RebalancePartitionsInfo> filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList,
readOnlyStoreDefs);
rebalanceStateChange(batchCount,
batchCurrentCluster,
batchFinalCluster,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
finishedReadOnlyPhase);
// STEP 2 - Move RO data
if(hasReadOnlyStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
finishedReadOnlyPhase);
}
// STEP 3 - Cluster change state
finishedReadOnlyPhase = true;
filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList,
readWriteStoreDefs);
rebalanceStateChange(batchCount,
batchCurrentCluster,
batchFinalCluster,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
finishedReadOnlyPhase);
// STEP 4 - Move RW data
if(hasReadWriteStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
finishedReadOnlyPhase);
}
RebalanceUtils.printLog(batchCount, logger, "Successfully terminated batch "
+ batchCount + ".");
} catch(Exception e) {
RebalanceUtils.printErrorLog(batchCount, logger, "Error in batch " + batchCount + " - "
+ e.getMessage(), e);
throw new VoldemortException("Rebalance failed on batch " + batchCount, e);
}
}
/**
*
* Perform a group of state change actions. Also any errors + rollback
* procedures are performed at this level itself.
*
* <pre>
* | Case | hasRO | hasRW | finishedRO | Action |
* | 0 | t | t | t | 2nd one ( cluster change + swap + rebalance state change ) |
* | 1 | t | t | f | 1st one ( rebalance state change ) |
* | 2 | t | f | t | 2nd one ( cluster change + swap ) |
* | 3 | t | f | f | 1st one ( rebalance state change ) |
* | 4 | f | t | t | 2nd one ( cluster change + rebalance state change ) |
* | 5 | f | t | f | ignore |
* | 6 | f | f | t | no stores, exception |
* | 7 | f | f | f | no stores, exception |
* </pre>
*
* Truth table, FTW!
*
* @param taskId Rebalancing task id
* @param batchCurrentCluster Current cluster
* @param batchFinalCluster Transition cluster to propagate
* @param rebalancePartitionPlanList List of partition plan list
* @param hasReadOnlyStores Boolean indicating if read-only stores exist
* @param hasReadWriteStores Boolean indicating if read-write stores exist
* @param finishedReadOnlyStores Boolean indicating if we have finished RO
* store migration
*/
private void rebalanceStateChange(final int taskId,
Cluster batchCurrentCluster,
Cluster batchFinalCluster,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
boolean finishedReadOnlyStores) {
try {
if(!hasReadOnlyStores && !hasReadWriteStores) {
// Case 6 / 7 - no stores, exception
throw new VoldemortException("Cannot get this state since it means there are no stores");
} else if(!hasReadOnlyStores && hasReadWriteStores && !finishedReadOnlyStores) {
// Case 5 - ignore
RebalanceUtils.printLog(taskId,
logger,
"Ignoring state change since there are no read-only stores");
} else if(!hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
// Case 4 - cluster change + rebalance state change
RebalanceUtils.printLog(taskId,
logger,
"Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
rebalancePartitionPlanList,
false,
true,
true,
true,
true);
} else if(hasReadOnlyStores && !finishedReadOnlyStores) {
// Case 1 / 3 - rebalance state change
RebalanceUtils.printLog(taskId, logger, "Rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
rebalancePartitionPlanList,
false,
false,
true,
true,
true);
} else if(hasReadOnlyStores && !hasReadWriteStores && finishedReadOnlyStores) {
// Case 2 - swap + cluster change
RebalanceUtils.printLog(taskId, logger, "Swap + Cluster metadata change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
rebalancePartitionPlanList,
true,
true,
false,
true,
true);
} else {
// Case 0 - swap + cluster change + rebalance state change
RebalanceUtils.printLog(taskId,
logger,
"Swap + Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
rebalancePartitionPlanList,
true,
true,
true,
true,
true);
}
} catch(VoldemortRebalancingException e) {
RebalanceUtils.printErrorLog(taskId,
logger,
"Failure while changing rebalancing state",
e);
throw e;
}
}
// TODO: (refactor) Break this state-machine like method into multiple "sub"
// methods. AFAIK, this method either does the RO stores or the RW stores in
// a batch. I.e., there are at most 2 sub-batches for any given batch. And,
// in practice, there is one sub-batch that is either RO or RW.
// TODO: Fix the javadoc comment to be more easily understood.
/**
* The smallest granularity of rebalancing where-in we move partitions for a
* sub-set of stores. Finally at the end of the movement, the node is
* removed out of rebalance state
*
* <br>
*
* Also any errors + rollback procedures are performed at this level itself.
*
* <pre>
* | Case | hasRO | hasRW | finishedRO | Action |
* | 0 | t | t | t | rollback cluster change + swap |
* | 1 | t | t | f | nothing to do since "rebalance state change" should have removed everything |
* | 2 | t | f | t | won't be triggered since hasRW is false |
* | 3 | t | f | f | nothing to do since "rebalance state change" should have removed everything |
* | 4 | f | t | t | rollback cluster change |
* | 5 | f | t | f | won't be triggered |
* | 6 | f | f | t | won't be triggered |
* | 7 | f | f | f | won't be triggered |
* </pre>
*
* @param taskId Rebalance task id
* @param batchCurrentCluster Cluster to rollback to if we have a problem
* @param rebalancePartitionPlanList The list of rebalance partition plans
* @param hasReadOnlyStores Are we rebalancing any read-only stores?
* @param hasReadWriteStores Are we rebalancing any read-write stores?
* @param finishedReadOnlyStores Have we finished rebalancing of read-only
* stores?
*/
private void executeSubBatch(final int taskId,
final Cluster batchCurrentCluster,
final List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
boolean finishedReadOnlyStores) {
RebalanceUtils.printLog(taskId, logger, "Submitting rebalance tasks ");
// Get an ExecutorService in place used for submitting our tasks
ExecutorService service = RebalanceUtils.createExecutors(maxParallelRebalancing);
// Sub-list of the above list
final List<RebalanceTask> failedTasks = Lists.newArrayList();
final List<RebalanceTask> incompleteTasks = Lists.newArrayList();
// Semaphores for donor nodes - To avoid multiple disk sweeps
Semaphore[] donorPermits = new Semaphore[batchCurrentCluster.getNumberOfNodes()];
for(Node node: batchCurrentCluster.getNodes()) {
donorPermits[node.getId()] = new Semaphore(1);
}
try {
// List of tasks which will run asynchronously
List<RebalanceTask> allTasks = executeTasks(taskId,
service,
rebalancePartitionPlanList,
donorPermits);
// All tasks submitted.
RebalanceUtils.printLog(taskId,
logger,
"All rebalance tasks were submitted ( shutting down in "
+ this.rebalancingClientTimeoutSeconds + " sec )");
// Wait and shutdown after timeout
RebalanceUtils.executorShutDown(service, this.rebalancingClientTimeoutSeconds);
RebalanceUtils.printLog(taskId, logger, "Finished waiting for executors");
// Collects all failures + incomplete tasks from the rebalance
// tasks.
List<Exception> failures = Lists.newArrayList();
for(RebalanceTask task: allTasks) {
if(task.hasException()) {
failedTasks.add(task);
failures.add(task.getError());
} else if(!task.isComplete()) {
incompleteTasks.add(task);
}
}
if(failedTasks.size() > 0) {
throw new VoldemortRebalancingException("Rebalance task terminated unsuccessfully on tasks "
+ failedTasks,
failures);
}
// If there were no failures, then we could have had a genuine
// timeout ( Rebalancing took longer than the operator expected ).
// We should throw a VoldemortException and not a
// VoldemortRebalancingException ( which will start reverting
// metadata ). The operator may want to manually then resume the
// process.
if(incompleteTasks.size() > 0) {
throw new VoldemortException("Rebalance tasks are still incomplete / running "
+ incompleteTasks);
}
} catch(VoldemortRebalancingException e) {
logger.error("Failure while migrating partitions for rebalance task " + taskId);
if(hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
// Case 0
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
null,
true,
true,
false,
false,
false);
} else if(hasReadWriteStores && finishedReadOnlyStores) {
// Case 4
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
null,
false,
true,
false,
false,
false);
}
throw e;
} finally {
if(!service.isShutdown()) {
RebalanceUtils.printErrorLog(taskId,
logger,
"Could not shutdown service cleanly for rebalance task "
+ taskId,
null);
service.shutdownNow();
}
}
}
private List<RebalanceTask> executeTasks(final int taskId,
final ExecutorService service,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
Semaphore[] donorPermits) {
List<RebalanceTask> taskList = Lists.newArrayList();
if(stealerBasedRebalancing) {
for(RebalancePartitionsInfo partitionsInfo: rebalancePartitionPlanList) {
StealerBasedRebalanceTask rebalanceTask = new StealerBasedRebalanceTask(taskId,
partitionsInfo,
rebalancingClientTimeoutSeconds,
maxTriesRebalancing,
donorPermits[partitionsInfo.getDonorId()],
adminClient);
taskList.add(rebalanceTask);
service.execute(rebalanceTask);
}
} else {
// Group by donor nodes
HashMap<Integer, List<RebalancePartitionsInfo>> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList,
false);
for(Entry<Integer, List<RebalancePartitionsInfo>> entries: donorNodeBasedPartitionsInfo.entrySet()) {
DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(taskId,
entries.getValue(),
rebalancingClientTimeoutSeconds,
donorPermits[entries.getValue()
.get(0)
.getDonorId()],
adminClient);
taskList.add(rebalanceTask);
service.execute(rebalanceTask);
}
}
return taskList;
}
public AdminClient getAdminClient() {
return adminClient;
}
public Cluster getCurrentCluster() {
return currentCluster;
}
public List<StoreDefinition> getCurrentStoreDefs() {
return currentStoreDefs;
}
public void stop() {
adminClient.close();
}
}