Skip to content

Commit

Permalink
Add separate error trackers for HttpRemoteTask update and get
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Feb 6, 2015
1 parent edf492f commit 6278c87
Showing 1 changed file with 9 additions and 7 deletions.
Expand Up @@ -131,7 +131,8 @@ public class HttpRemoteTask
private final JsonCodec<TaskInfo> taskInfoCodec;
private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;

private final RequestErrorTracker errorTracker;
private final RequestErrorTracker updateErrorTracker;
private final RequestErrorTracker getErrorTracker;

private final AtomicBoolean needsUpdate = new AtomicBoolean(true);

Expand Down Expand Up @@ -170,7 +171,8 @@ public HttpRemoteTask(Session session,
this.executor = executor;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.errorTracker = new RequestErrorTracker(taskId, location, maxConsecutiveErrorCount, minErrorDuration);
this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxConsecutiveErrorCount, minErrorDuration);
this.getErrorTracker = new RequestErrorTracker(taskId, location, maxConsecutiveErrorCount, minErrorDuration);

for (Entry<PlanNodeId, Split> entry : checkNotNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getValue());
Expand Down Expand Up @@ -361,7 +363,7 @@ private synchronized void scheduleUpdate()
return;
}

errorTracker.acquireRequestPermit();
updateErrorTracker.acquireRequestPermit();

List<TaskSource> sources = getSources();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
Expand Down Expand Up @@ -553,7 +555,7 @@ public void success(TaskInfo value)
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
updateTaskInfo(value, sources);
errorTracker.requestSucceeded();
updateErrorTracker.requestSucceeded();
}
finally {
scheduleUpdate();
Expand All @@ -572,7 +574,7 @@ public void failed(Throwable cause)
// if task not already done, record error
TaskInfo taskInfo = getTaskInfo();
if (!taskInfo.getState().isDone()) {
errorTracker.requestFailed(cause);
updateErrorTracker.requestFailed(cause);
}
}
catch (Error e) {
Expand Down Expand Up @@ -670,7 +672,7 @@ public void success(TaskInfo value)

try {
updateTaskInfo(value, ImmutableList.<TaskSource>of());
errorTracker.requestSucceeded();
getErrorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
Expand All @@ -690,7 +692,7 @@ public void failed(Throwable cause)
// if task not already done, record error
TaskInfo taskInfo = getTaskInfo();
if (!taskInfo.getState().isDone()) {
errorTracker.requestFailed(cause);
getErrorTracker.requestFailed(cause);
}
}
catch (Error e) {
Expand Down

0 comments on commit 6278c87

Please sign in to comment.