Permalink
Browse files

Changes on how asyncs are pulled from queue to limit it to one

  • Loading branch information...
Aaron Elmore
Aaron Elmore committed Apr 14, 2014
1 parent c22ab42 commit 3922ecc2a08853c419a8a65e5f6064863d3e079e
Showing with 37 additions and 36 deletions.
  1. +37 −36 src/frontend/edu/brown/hstore/PartitionExecutor.java
@@ -1232,43 +1232,43 @@ private boolean utilityWork() {
if (hstore_conf.global.reconfiguration_enable && reconfig_plan != null){
this.idle_click_count+=1;
//LOG.info("idle click count : " + idle_click_count);
if (reconfiguration_coordinator.getReconfigurationInProgress() && reconfiguration_coordinator.queueAsyncPull()
&& this.asyncRequestPullQueue.isEmpty() == false
&& (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS || System.currentTimeMillis() > this.nextAsyncPullTimeMS )){
if (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS) {
LOG.info(String.format(" ### Adding the next async pull from the asyncRequestPullQueue due to IDLE Clicks. Items : %s IdleCount:%s",
asyncRequestPullQueue.size(),idle_click_count));
} else {
LOG.info(String.format(" ### Adding the next async pull from the asyncRequestPullQueue due to time. Items : %s IdleCount:%s",
asyncRequestPullQueue.size(),idle_click_count));
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
this.work_queue.offer(asyncRequestPullQueue.remove());
FileUtil.appendEventToFile("SCHEDULE_ASYNC, T=1");
} else if (reconfiguration_coordinator.getReconfigurationInProgress() && reconfiguration_coordinator.queueAsyncPull()
&& this.scheduleAsyncPullQueue.isEmpty() == false
&& ((idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS && asyncOutstanding.get() == false) || System.currentTimeMillis() > this.nextAsyncPullTimeMS )) {
if (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS) {
LOG.info(String.format(" ### Pulling and scheduling the next async pull from the scheduleAsyncPullQueue due to IDLE Clicks. Items : %s IdleCount:%s", scheduleAsyncPullQueue.size(),idle_click_count));
} else {
LOG.info(String.format(" ### Pulling and scheduling the next async pull from the scheduleAsyncPullQueue due to time. Items : %s IdleCount:%s", scheduleAsyncPullQueue.size(),idle_click_count));
}
if(asyncOutstanding.get()) {
LOG.warn("Offering async request to the work queue when there is already an async request in progress");
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
ScheduleAsyncPullRequestMessage pullMsg = scheduleAsyncPullQueue.poll();
if (pullMsg != null){
this.work_queue.offer(pullMsg);
asyncOutstanding.set(true);
if (reconfiguration_coordinator.getReconfigurationInProgress() && reconfiguration_coordinator.queueAsyncPull()) {
if (this.asyncRequestPullQueue.isEmpty() == false
&& (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS || System.currentTimeMillis() > this.nextAsyncPullTimeMS )){
//IF the async queue has work and we have passed cycles
if (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS) {
LOG.info(String.format(" ### Adding the next async pull from the asyncRequestPullQueue due to IDLE Clicks. Items : %s IdleCount:%s",
asyncRequestPullQueue.size(),idle_click_count));
} else {
LOG.info(String.format(" ### Adding the next async pull from the asyncRequestPullQueue due to time. Items : %s IdleCount:%s",
asyncRequestPullQueue.size(),idle_click_count));
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
this.work_queue.offer(asyncRequestPullQueue.remove());
FileUtil.appendEventToFile("SCHEDULE_ASYNC, T=1");
} else if (this.scheduleAsyncPullQueue.isEmpty() == false && asyncOutstanding.get() == false
&& ((idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS ) || System.currentTimeMillis() > this.nextAsyncPullTimeMS )) {
//IF the scheduleAsync queue has work and we have passed cycles
if (idle_click_count > MAX_PULL_ASYNC_EVERY_CLICKS) {
LOG.info(String.format(" ### Pulling and scheduling the next async pull from the scheduleAsyncPullQueue due to IDLE Clicks. Items : %s IdleCount:%s", scheduleAsyncPullQueue.size(),idle_click_count));
} else {
LOG.info(String.format(" ### Pulling and scheduling the next async pull from the scheduleAsyncPullQueue due to time. Items : %s IdleCount:%s", scheduleAsyncPullQueue.size(),idle_click_count));
}
if(asyncOutstanding.get()) {
LOG.warn("Offering async request to the work queue when there is already an async request in progress");
}
this.idle_click_count = 0;
nextAsyncPullTimeMS = System.currentTimeMillis() + MIN_MS_BETWEEN_ASYNC_PULLS + rand.nextInt(RAND_MS_BETWEEN_ASYNC_PULLS);
ScheduleAsyncPullRequestMessage pullMsg = scheduleAsyncPullQueue.poll();
if (pullMsg != null){
this.work_queue.offer(pullMsg);
asyncOutstanding.set(true);
}
FileUtil.appendEventToFile("SCHEDULE_ASYNC, T=2");
}
FileUtil.appendEventToFile("SCHEDULE_ASYNC, T=2");
}
if (this.currentDtxn != null) {
@@ -3525,6 +3525,7 @@ public void queueAsyncDataRequestMessageToWorkQueue(){
if(!asyncRequestPullQueue.isEmpty()){
//LOG.info(" ### Adding the next asynch pull from requestPullQueue to the work queue. size : " + asyncRequestPullQueue.size());
//this.work_queue.offer(asyncRequestPullQueue.remove());
FileUtil.appendEventToFile("Skipped asyncRequestQueue");
} else {
LOG.info(" ### RequestPullQueue is empty");
}

0 comments on commit 3922ecc

Please sign in to comment.