Permalink
Browse files

Fixed reconfiguration bugs!

  • Loading branch information...
Rebecca Taft
Rebecca Taft committed Mar 15, 2014
1 parent 6d5ae04 commit f2a6cc1ab7537fd07a3e0dd5fa14ae43823fada8
@@ -46,6 +46,8 @@
public List<ReconfigurationRange<? extends Comparable<?>>> dataMigratedIn;
private int rangesMigratedInCount = 0;
private int rangesMigratedOutCount = 0;
private int incomingRangesCount = 0;
private int outgoingRangesCount = 0;
//set of individual keys migrated out/in status, stored in a map by table name as key
public Map<String,Set<Comparable>> migratedKeyIn;
@@ -65,6 +67,22 @@ public ReconfigurationTracking(TwoTieredRangePartitions partitionPlan,List<Recon
this.incoming_ranges = new ArrayList<ReconfigurationRange<? extends Comparable<?>>>();
if (incoming_ranges != null)
this.incoming_ranges.addAll(incoming_ranges);
for(ReconfigurationRange<? extends Comparable<?>> range : this.incoming_ranges) {
if(range.isSingleRange()) {
incomingRangesCount++;
}
else {
incomingRangesCount += range.getMinList().size();
}
}
for(ReconfigurationRange<? extends Comparable<?>> range: this.outgoing_ranges){
if(range.isSingleRange()) {
outgoingRangesCount++;
}
else {
outgoingRangesCount += range.getMinList().size();
}
}
this.partition_id = partition_id;
this.migratedKeyIn = new HashMap<String, Set<Comparable>>();
this.migratedKeyOut = new HashMap<String, Set<Comparable>>();
@@ -91,7 +109,7 @@ public boolean markRangeAsMigratedOut(ReconfigurationRange<? extends Comparable<
boolean added = this.dataMigratedOut.add(range);
if(added){
rangesMigratedOutCount++;
if(rangesMigratedOutCount==this.outgoing_ranges.size()){
if(rangesMigratedOutCount==outgoingRangesCount){
throw new ReconfigurationException(ExceptionTypes.ALL_RANGES_MIGRATED_OUT);
}
}
@@ -131,7 +149,8 @@ public boolean markRangeAsReceived(ReconfigurationRange<? extends Comparable<?>>
boolean added = this.dataMigratedIn.add(range);
if(added){
rangesMigratedInCount++;
if(rangesMigratedInCount==this.incoming_ranges.size()){
LOG.info(String.format("Migrated in range %s-%s, number %s out of %s", range.getMin_inclusive(), range.getMax_exclusive(), rangesMigratedInCount, incomingRangesCount));
if(rangesMigratedInCount==incomingRangesCount){
throw new ReconfigurationException(ExceptionTypes.ALL_RANGES_MIGRATED_IN);
}
}
@@ -391,7 +410,7 @@ public boolean checkKeyOwned(String table_name, Comparable<?> key) throws Reconf
@Override
public boolean checkIfAllRangesAreMigratedIn() {
if(this.incoming_ranges.size() == rangesMigratedInCount){
if(incomingRangesCount == rangesMigratedInCount){
return true;
}
return false;
@@ -88,10 +88,6 @@ public boolean equals(Object o) {
}
public static List<ReconfigurationPlan> naiveSplitReconfigurationPlan(ReconfigurationPlan plan, int numberOfSplits){
List<ReconfigurationPlan> splitPlans = new ArrayList<>();
for(int i = 0; i < numberOfSplits; i++){
splitPlans.add(new ReconfigurationPlan());
}
//The list of partition pairs that exchange data
Set<ReconfigurationPair> migrationPairs = new HashSet<>();
@@ -126,6 +122,12 @@ public boolean equals(Object o) {
pairToSplitMapping.put(new Pair(mPair.from, mPair.to), pairCounter % numberOfSplits);
pairCounter++;
}
List<ReconfigurationPlan> splitPlans = new ArrayList<>();
for(int i = 0; i < numberOfSplits; i++){
splitPlans.add(new ReconfigurationPlan());
}
//put ranges into split rangePLans
for(Entry<Integer, List<ReconfigurationRange<? extends Comparable<?>>>> entry : plan.getIncoming_ranges().entrySet()){
@@ -136,7 +138,7 @@ public boolean equals(Object o) {
splitPlans.get(splitIndex).addRange(range);
}
}
//DEBUG
for (int j = 0; j < numberOfSplits; j++){

0 comments on commit f2a6cc1

Please sign in to comment.