Skip to content

Commit

Permalink
fixed race condition in the server upon job cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
lolocohen committed Aug 14, 2019
1 parent 8fcd838 commit 385db1c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 12 deletions.
Expand Up @@ -274,19 +274,32 @@ private void processOfflineReopen(final NodeBundleResults received, final AsyncN
private void process(final NodeBundleResults received, final AsyncNodeContext context) throws Exception {
final TaskBundle bundle = received.first();
final ServerTaskBundleNode nodeBundle = context.removeJobEntry(bundle.getUuid(), bundle.getBundleId());
context.getServer().getDispatchExpirationHandler().cancelAction(ServerTaskBundleNode.makeKey(nodeBundle), false);
boolean requeue = false;
final ServerJob job = nodeBundle.getServerJob();
boolean mustProcess = true;
job.getLock().lock();
try {
final TaskBundle newBundle = received.bundle();
if (debugEnabled) log.debug("read bundle " + newBundle + " from node " + context);
requeue = processResults(context, received, nodeBundle);
} catch (final Throwable t) {
log.error(t.getMessage(), t);
nodeBundle.setJobReturnReason(JobReturnReason.DRIVER_PROCESSING_ERROR);
nodeBundle.resultsReceived(t);
if (job.isCancelled()) mustProcess = false;
else {
for (final ServerTask task: nodeBundle.getTaskList()) task.setReturnedFromNode(true);
}
} finally {
job.getLock().unlock();
}
context.getServer().getDispatchExpirationHandler().cancelAction(ServerTaskBundleNode.makeKey(nodeBundle), false);

if (mustProcess) {
boolean requeue = false;
try {
final TaskBundle newBundle = received.bundle();
if (debugEnabled) log.debug("read bundle " + newBundle + " from node " + context);
requeue = processResults(context, received, nodeBundle);
} catch (final Throwable t) {
log.error(t.getMessage(), t);
nodeBundle.setJobReturnReason(JobReturnReason.DRIVER_PROCESSING_ERROR);
nodeBundle.resultsReceived(t);
}
if (requeue) nodeBundle.resubmit();
}
if (requeue) nodeBundle.resubmit();
//context.setMessage(null);
if (!context.isOffline()) updateMaxJobs(context, bundle);
if (context.getCurrentNbJobs() < context.getMaxJobs()) {
if (debugEnabled) log.debug("updating execution status to ACTIVE for {}", context);
Expand Down
4 changes: 3 additions & 1 deletion server/src/java/org/jppf/server/protocol/ServerJob.java
Expand Up @@ -163,6 +163,7 @@ public void resultsReceived(final ServerTaskBundleNode bundle, final List<DataLo
if (task.getState() == TaskState.RESUBMIT) {
if (traceEnabled) log.trace("task to resubmit: {}", task);
task.setState(TaskState.PENDING);
task.setReturnedFromNode(false);
nbResubmits++;
if (pos > maxPos) maxPos = pos;
if (pos < minPos) minPos = pos;
Expand Down Expand Up @@ -204,6 +205,7 @@ public void resultsReceived(final ServerTaskBundleNode bundle, final Throwable t
if (task.getState() == TaskState.RESUBMIT) {
if (traceEnabled) log.trace("task to resubmit: {}", task);
task.setState(TaskState.PENDING);
task.setReturnedFromNode(false);
nbResubmits++;
if (pos > maxPos) maxPos = pos;
if (pos < minPos) minPos = pos;
Expand Down Expand Up @@ -343,7 +345,7 @@ private CollectionMap<ServerTaskBundleClient, ServerTask> handleCancelledTasks()
if (debugEnabled) log.debug("cancelling tasks for {}", this);
final CollectionMap<ServerTaskBundleClient, ServerTask> clientMap = new SetIdentityMap<>();
for (final ServerTask task: tasks.values()) {
if (!task.isDone()) {
if (!task.isDone() && !task.isReturnedFromNode()) {
task.cancel();
clientMap.putValue(task.getBundle(), task);
}
Expand Down
19 changes: 19 additions & 0 deletions server/src/java/org/jppf/server/protocol/ServerTask.java
Expand Up @@ -78,6 +78,10 @@ public class ServerTask implements Serializable {
* Number of times a task resubmitted itself.
*/
private int resubmitCount;
/**
* Whether this task has returned from the node.
*/
private boolean returnedFromNode;

/**
*
Expand Down Expand Up @@ -295,6 +299,21 @@ private void writeObject(final ObjectOutputStream out) throws IOException {
}
}

/**
* @return whether this task has returned from the node
*/
public boolean isReturnedFromNode() {
return returnedFromNode;
}

/**
* Set whether this task has returned from the node
* @param returnedFromNode {@code true} if this task has returned from a node, {@code false} otherwise.
*/
public void setReturnedFromNode(final boolean returnedFromNode) {
this.returnedFromNode = returnedFromNode;
}

/**
* Reconstitute the {@code ServerTask} instance from a stream (i.e., deserialize it).
* @param in the input stream from which to read the task.
Expand Down
Expand Up @@ -181,6 +181,7 @@ public void run() {
else job.add(new MyTask2(i));
job.getClientSLA().setJobExpirationSchedule(new JPPFSchedule(1000L));
final Task<?> task = client.submit(job).get(0);
print(false, false, "got results for job %d", i);
final Throwable t = task.getThrowable();
assertNull(String.format("got exception in task %d: %s", i, ExceptionUtils.getStackTrace(t)), t);
if (i == 1) {
Expand Down

0 comments on commit 385db1c

Please sign in to comment.