Permalink
Browse files

Fixes to hanging outstanding flag

  • Loading branch information...
Aaron Elmore
Aaron Elmore committed Apr 28, 2014
1 parent 112a4ae commit 03adf7a72222137c1d3fbf6f9de45535034fc81b
@@ -6212,6 +6212,12 @@ public void initReconfiguration(ReconfigurationPlan reconfig_plan, Reconfigurati
this.outgoing_ranges = reconfig_plan.getOutgoing_ranges().get(this.partitionId);
this.incoming_ranges = reconfig_plan.getIncoming_ranges().get(this.partitionId);
this.reconfiguration_tracker = new ReconfigurationTracking(planned_partitions, reconfig_plan, this.partitionId);
if (asyncOutstanding.getAndSet(false)){
LOG.warn("Async Outstanding was set to true!!!");
}
if (!scheduleAsyncPullQueue.isEmpty()){
LOG.warn("Schedule async pull queue was not empty... " + StringUtil.join(",",scheduleAsyncPullQueue));
}
if (this.catalogContext.jarPath.getName().contains("ycsb")){
LOG.info("Pulling ranges instead of keys ************");
ReconfigurationTracking.PULL_SINGLE_KEY = false;
@@ -6340,7 +6346,6 @@ public void receiveTuples(Long txnId, int oldPartitionId, int newPartitionId, St
// We have received a single key
if(moreDataComing == false) {
LOG.debug(String.format("(%s) marking key as received %s %s ", this.partitionId, table_name, minInclusive));
this.reconfiguration_tracker.markKeyAsReceived(table_name, minInclusive);
if(isAsyncRequest){
LOG.trace("Last chunk received for async request, unsetting async in progress");
asyncOutstanding.set(false);
@@ -6351,6 +6356,7 @@ public void receiveTuples(Long txnId, int oldPartitionId, int newPartitionId, St
long timeTaken = System.currentTimeMillis() - startTime;
FileUtil.appendEventToFile(String.format("ASYNC_PULL_COMPLETED, MS=%s, PULL_ID=%s",timeTaken, pullId));
}
this.reconfiguration_tracker.markKeyAsReceived(table_name, minInclusive);
} else {
if (debug.val) LOG.debug(String.format("PE (%s) keyreceived, but more data is coming. %s %s ", this.partitionId, table_name, minInclusive));
}
@@ -6364,8 +6370,6 @@ public void receiveTuples(Long txnId, int oldPartitionId, int newPartitionId, St
new ReconfigurationRange<Long>(table_name, VoltType.BIGINT, minInclusive, maxExclusive, oldPartitionId, newPartitionId));
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
} else {
this.reconfiguration_tracker.markRangeAsReceived(
new ReconfigurationRange<Long>(table_name, VoltType.BIGINT, minInclusive, maxExclusive, oldPartitionId, newPartitionId));
if(isAsyncRequest){
LOG.trace("Last chunk received for async request, unsetting async in progress");
asyncOutstanding.set(false);
@@ -6376,6 +6380,8 @@ public void receiveTuples(Long txnId, int oldPartitionId, int newPartitionId, St
long timeTaken = System.currentTimeMillis() - startTime;
FileUtil.appendEventToFile(String.format("ASYNC_PULL_COMPLETED, MS=%s, PULL_ID=%s",timeTaken, pullId));
}
this.reconfiguration_tracker.markRangeAsReceived(
new ReconfigurationRange<Long>(table_name, VoltType.BIGINT, minInclusive, maxExclusive, oldPartitionId, newPartitionId));
}
if(this.reconfiguration_tracker.checkIfAllRangesAreMigratedIn()){
@@ -327,7 +327,11 @@ public ReconfigurationPlan initReconfiguration(Integer leaderId, Reconfiguration
} else {
throw new Exception("Unsupported hasher : " + absHasher.getClass());
}
FileUtil.appendEventToFile(reconfig_plan.planDebug);
if (reconfig_plan!=null) {
FileUtil.appendEventToFile(reconfig_plan.planDebug);
} else {
FileUtil.appendEventToFile("Null Reconfig plan");
}
this.planned_partitions = hasher.getPartitions();
if (reconfigurationProtocol == ReconfigurationProtocols.STOPCOPY) {
if (reconfig_plan != null){

0 comments on commit 03adf7a

Please sign in to comment.