Skip to content

Commit

Permalink
Improved handling for tasks that are removed from a queue. Tasks cann…
Browse files Browse the repository at this point in the history
…ot be resubmitted after removal
  • Loading branch information
Raphfrk committed Aug 2, 2012
1 parent d033ba2 commit 575a30f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 28 deletions.
94 changes: 72 additions & 22 deletions src/main/java/org/spout/engine/scheduler/SpoutTask.java
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.spout.api.geo.cuboid.Region;
import org.spout.api.scheduler.ParallelRunnable;
Expand Down Expand Up @@ -81,11 +82,7 @@ public class SpoutTask implements Task, LongPrioritized {
* Indicates the next scheduled time for the task to be called
*/
private final AtomicLong nextCallTime;
private final AtomicBoolean nextCallTimeLock = new AtomicBoolean(false);
/**
* A flag which indicates if this task is alive.
*/
private final AtomicBoolean alive;
private final AtomicReference<QueueState> queueState = new AtomicReference<QueueState>(QueueState.UNQUEUED);
/**
* A flag indicating if the task is actually executing
*/
Expand Down Expand Up @@ -119,7 +116,6 @@ public class SpoutTask implements Task, LongPrioritized {
public SpoutTask(TaskManager manager, Scheduler scheduler, Object owner, Runnable task, boolean sync, long delay, long period, TaskPriority priority) {
this.taskId = nextTaskId.getAndIncrement();
this.nextCallTime = new AtomicLong(manager.getUpTime() + delay);
this.alive = new AtomicBoolean(true);
this.executing = new AtomicBoolean(false);
this.owner = owner;
this.task = task;
Expand Down Expand Up @@ -171,7 +167,7 @@ public Object getOwner() {

@Override
public boolean isAlive() {
return alive.get();
return !queueState.get().isDead();
}

public long getNextCallTime() {
Expand All @@ -186,7 +182,7 @@ protected long getPeriod() {
* Stops this task.
*/
public void stop() {
alive.set(false);
remove();
}

/**
Expand All @@ -195,7 +191,7 @@ public void stop() {
* @return The task successfully executed.
*/
boolean pulse() {
if (!alive.get()) {
if (queueState.get().isDead()) {
return false;
}

Expand All @@ -216,7 +212,7 @@ boolean pulse() {
updateCallTime();

if (period <= 0) {
alive.set(false);
queueState.set(QueueState.DEAD);
}
} finally {
executing.set(false);
Expand All @@ -225,16 +221,50 @@ boolean pulse() {
return true;
}

public void lockNextCallTime() {
if (!nextCallTimeLock.compareAndSet(false, true)) {
throw new IllegalStateException("Task added in the queue twice without being removed");
public void remove() {
queueState.set(QueueState.DEAD);
}

public boolean setQueued() {
if (!queueState.compareAndSet(QueueState.UNQUEUED, QueueState.QUEUED)) {
boolean success = false;
while (!success) {
QueueState oldState = queueState.get();
switch (oldState) {
case DEAD:
return false;
case QUEUED:
throw new IllegalStateException("Task added in the queue twice without being removed");
case UNQUEUED:
success = queueState.compareAndSet(QueueState.UNQUEUED, QueueState.QUEUED);
break;
default:
throw new IllegalStateException("Unknown queue state " + oldState);
}
}
}
return true;
}

public void unlockNextCallTime() {
if (!nextCallTimeLock.compareAndSet(true, false)) {
throw new IllegalStateException("Task removed from the queue before being added");
public boolean setUnqueued() {
if (!queueState.compareAndSet(QueueState.QUEUED, QueueState.UNQUEUED)) {
boolean success = false;
while (!success) {
QueueState oldState = queueState.get();
switch (oldState) {
case DEAD:
return false;
case UNQUEUED:
throw new IllegalStateException("Task set as unqueued before being set as queued");
case QUEUED:
success = queueState.compareAndSet(QueueState.QUEUED, QueueState.UNQUEUED);
break;
default:
throw new IllegalStateException("Unknown queue state " + oldState);
}
}
}
return true;
}

@Override
Expand Down Expand Up @@ -283,16 +313,20 @@ private void updateCallTime() {
updateCallTime(period);
}

private void updateCallTime(long offset) {
if (nextCallTimeLock.compareAndSet(false, true)) {
private boolean updateCallTime(long offset) {
boolean success = setQueued();
if (!success) {
return false;
}
try {
long now = manager.getUpTime();
if (nextCallTime.addAndGet(offset) <= now) {
nextCallTime.set(now + 1);
}
nextCallTimeLock.set(false);
} else {
throw new IllegalStateException("Attempt made to modify next call time when the task was in the queue.");
}
} finally {
setUnqueued();
}
return true;
}

@Override
Expand All @@ -312,5 +346,21 @@ public void setParallelInfo(ParallelTaskInfo info) {
public long getPriority() {
return nextCallTime.get();
}

private static enum QueueState {
QUEUED, UNQUEUED, DEAD;

public boolean isDead() {
return this == DEAD;
}

public boolean isQueued() {
return this == QUEUED;
}

public boolean isUnQueued() {
return this == UNQUEUED;
}
}

}
Expand Up @@ -140,7 +140,7 @@ public void heartbeat(long delta) {
}

itr.remove();
currentTask.unlockNextCallTime();
currentTask.setUnqueued();

if (!currentTask.isAlive()) {
continue;
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/spout/engine/scheduler/TaskPriorityQueue.java
Expand Up @@ -68,27 +68,27 @@ public Queue<SpoutTask> getPendingTask(long currentTime) {
@Override
public boolean add(SpoutTask task) {
if (task != null) {
task.lockNextCallTime();
if (!task.setQueued()) {
throw new UnsupportedOperationException("Task was dead when adding to the queue");
}
}
return super.add(task);
}

@Override
public boolean remove(SpoutTask task) {
task.remove();
if (!super.remove(task)) {
return false;
}

if (task instanceof SpoutTask) {
((SpoutTask)task).unlockNextCallTime();
}
task.setUnqueued();

return true;
}

@Override
public String toString() {
Iterator<RedirectableConcurrentLinkedQueue<SpoutTask>> iq = queueMap.values().iterator();
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (SpoutTask t : getTasks()) {
Expand Down

0 comments on commit 575a30f

Please sign in to comment.