Skip to content

Commit

Permalink
Make SqlTask heartbeat update explicit
Browse files Browse the repository at this point in the history
SqlTask implicitly updates the heartbeat when getting TaskInfo.  Since the
TaskInfo is fetched while checking for abandoned tasks, the tasks can never
be abandoned.
  • Loading branch information
dain committed Feb 24, 2015
1 parent 7e84475 commit 5dbe67f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
Expand Up @@ -133,10 +133,13 @@ public TaskId getTaskId()
return taskStateMachine.getTaskId();
}

public TaskInfo getTaskInfo()
public void recordHeartbeat()
{
lastHeartbeat.set(DateTime.now());
}

public TaskInfo getTaskInfo()
{
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
return createTaskInfo(taskHolderReference.get());
}
Expand Down Expand Up @@ -192,7 +195,6 @@ private TaskInfo createTaskInfo(TaskHolder taskHolder)
public ListenableFuture<TaskInfo> getTaskInfo(TaskState callersCurrentState)
{
checkNotNull(callersCurrentState, "callersCurrentState is null");
lastHeartbeat.set(DateTime.now());

// If the caller's current state is already done, just return the current
// state of this task as it will either be done or possibly still running
Expand Down Expand Up @@ -224,8 +226,6 @@ public TaskInfo updateTask(Session session, PlanFragment fragment, List<TaskSour
}
}

lastHeartbeat.set(DateTime.now());

if (taskExecution != null) {
// addSources checks for task completion, so update the buffers first and the task might complete earlier
sharedBuffer.setOutputBuffers(outputBuffers);
Expand All @@ -248,17 +248,13 @@ public ListenableFuture<BufferResult> getTaskResults(TaskId outputName, long sta
checkNotNull(outputName, "outputName is null");
checkArgument(maxSize.toBytes() > 0, "maxSize must be at least 1 byte");

lastHeartbeat.set(DateTime.now());

return sharedBuffer.get(outputName, startingSequenceId, maxSize);
}

public TaskInfo abortTaskResults(TaskId outputId)
{
checkNotNull(outputId, "outputId is null");

lastHeartbeat.set(DateTime.now());

log.debug("Aborting task %s output %s", taskId, outputId);
sharedBuffer.abort(outputId);

Expand All @@ -274,16 +270,12 @@ public void failed(Throwable cause)

public TaskInfo cancel()
{
lastHeartbeat.set(DateTime.now());

taskStateMachine.cancel();
return getTaskInfo();
}

public TaskInfo abort()
{
lastHeartbeat.set(DateTime.now());

taskStateMachine.abort();
return getTaskInfo();
}
Expand Down
Expand Up @@ -187,7 +187,9 @@ public TaskInfo getTaskInfo(TaskId taskId)
{
checkNotNull(taskId, "taskId is null");

return tasks.getUnchecked(taskId).getTaskInfo();
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.getTaskInfo();
}

@Override
Expand All @@ -196,7 +198,9 @@ public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, TaskState currentSt
checkNotNull(taskId, "taskId is null");
checkNotNull(currentState, "currentState is null");

return tasks.getUnchecked(taskId).getTaskInfo(currentState);
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.getTaskInfo(currentState);
}

@Override
Expand All @@ -208,7 +212,9 @@ public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment
checkNotNull(sources, "sources is null");
checkNotNull(outputBuffers, "outputBuffers is null");

return tasks.getUnchecked(taskId).updateTask(session, fragment, sources, outputBuffers);
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}

@Override
Expand Down

0 comments on commit 5dbe67f

Please sign in to comment.