Skip to content

Commit

Permalink
Consolidate HttpRemoteTask error tracking code
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Feb 6, 2015
1 parent b0eaae3 commit 00e8f56
Showing 1 changed file with 92 additions and 56 deletions.
Expand Up @@ -58,6 +58,7 @@
import org.joda.time.DateTime;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.EOFException;
import java.net.SocketException;
Expand Down Expand Up @@ -106,8 +107,6 @@ public class HttpRemoteTask
private final Session session;
private final String nodeId;
private final PlanFragment planFragment;
private final int maxConsecutiveErrorCount;
private final Duration minErrorDuration;

private final AtomicLong nextSplitId = new AtomicLong();

Expand All @@ -132,11 +131,7 @@ public class HttpRemoteTask
private final JsonCodec<TaskInfo> taskInfoCodec;
private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;

private final RateLimiter errorRequestRateLimiter = RateLimiter.create(0.1);

private final AtomicLong lastSuccessfulRequest = new AtomicLong(System.nanoTime());
private final AtomicLong errorCount = new AtomicLong();
private final Queue<Throwable> errorsSinceLastSuccess = new ConcurrentLinkedQueue<>();
private final RequestErrorTracker errorTracker;

private final AtomicBoolean needsUpdate = new AtomicBoolean(true);

Expand Down Expand Up @@ -175,8 +170,7 @@ public HttpRemoteTask(Session session,
this.executor = executor;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.maxConsecutiveErrorCount = maxConsecutiveErrorCount;
this.minErrorDuration = minErrorDuration;
this.errorTracker = new RequestErrorTracker(taskId, location, maxConsecutiveErrorCount, minErrorDuration);

for (Entry<PlanNodeId, Split> entry : checkNotNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getValue());
Expand Down Expand Up @@ -367,10 +361,7 @@ private synchronized void scheduleUpdate()
return;
}

// don't update too fast in the face of errors
if (errorCount.get() > 0) {
errorRequestRateLimiter.acquire();
}
errorTracker.acquireRequestPermit();

List<TaskSource> sources = getSources();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
Expand Down Expand Up @@ -518,59 +509,28 @@ public void onFailure(Throwable t)
}
}

private synchronized void requestSucceeded(TaskInfo newValue, List<TaskSource> sources)
private void requestSucceeded(TaskInfo newValue, List<TaskSource> sources)
{
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
updateTaskInfo(newValue, sources);

lastSuccessfulRequest.set(System.nanoTime());
errorCount.set(0);
errorsSinceLastSuccess.clear();
errorTracker.requestSucceeded();
}
}

private synchronized void requestFailed(Throwable reason)
private void requestFailed(Throwable reason)
{
// cancellation is not a failure
if (reason instanceof CancellationException) {
return;
}

// if task is done, ignore the error
TaskInfo taskInfo = getTaskInfo();
if (taskInfo.getState().isDone()) {
return;
}

// log failure message
if (isExpectedError(reason)) {
// don't print a stack for known errors
log.warn("Error updating task %s: %s: %s", taskInfo.getTaskId(), reason.getMessage(), taskInfo.getSelf());
}
else {
log.warn(reason, "Error updating task %s: %s", taskInfo.getTaskId(), taskInfo.getSelf());
}
try {
// if task is done, ignore the error
TaskInfo taskInfo = getTaskInfo();
if (taskInfo.getState().isDone()) {
return;
}

// remember the first 10 errors
if (errorsSinceLastSuccess.size() < 10) {
errorsSinceLastSuccess.add(reason);
errorTracker.requestFailed(reason);
}

// fail the task, if we have more than X failures in a row and more than Y seconds have passed since the last request
long errorCount = this.errorCount.incrementAndGet();
Duration timeSinceLastSuccess = Duration.nanosSince(lastSuccessfulRequest.get());
if (errorCount > maxConsecutiveErrorCount && timeSinceLastSuccess.compareTo(minErrorDuration) > 0) {
// it is weird to mark the task failed locally and then cancel the remote task, but there is no way to tell a remote task that it is failed
PrestoException exception = new PrestoException(TOO_MANY_REQUESTS_FAILED,
format("%s (%s - %s failures, time since last success %s)",
WORKER_NODE_ERROR,
taskInfo.getSelf(),
errorCount,
timeSinceLastSuccess.convertTo(TimeUnit.SECONDS)));
for (Throwable error : errorsSinceLastSuccess) {
exception.addSuppressed(error);
}
failTask(exception);
catch (PrestoException e) {
failTask(e);
abort();
}
}
Expand Down Expand Up @@ -816,6 +776,82 @@ public void onFailure(Throwable t)
}
}

@ThreadSafe
private static class RequestErrorTracker
{
private final TaskId taskId;
private final URI taskUri;
private final int maxConsecutiveErrorCount;
private final Duration minErrorDuration;

private final RateLimiter errorRequestRateLimiter = RateLimiter.create(0.1);

private final AtomicLong lastSuccessfulRequest = new AtomicLong(System.nanoTime());
private final AtomicLong errorCount = new AtomicLong();
private final Queue<Throwable> errorsSinceLastSuccess = new ConcurrentLinkedQueue<>();

public RequestErrorTracker(TaskId taskId, URI taskUri, int maxConsecutiveErrorCount, Duration minErrorDuration)
{
this.taskId = taskId;
this.taskUri = taskUri;
this.maxConsecutiveErrorCount = maxConsecutiveErrorCount;
this.minErrorDuration = minErrorDuration;
}

public void acquireRequestPermit()
{
// don't update too fast in the face of errors
if (errorCount.get() > 0) {
errorRequestRateLimiter.acquire();
}
}

public void requestSucceeded()
{
lastSuccessfulRequest.set(System.nanoTime());
errorCount.set(0);
errorsSinceLastSuccess.clear();
}

public void requestFailed(Throwable reason)
throws PrestoException
{
// cancellation is not a failure
if (reason instanceof CancellationException) {
return;
}

// log failure message
if (isExpectedError(reason)) {
// don't print a stack for a known errors
log.warn("Error updating task %s: %s: %s", taskId, reason.getMessage(), taskUri);
}
else {
log.warn(reason, "Error updating task %s: %s", taskId, taskUri);
}

// remember the first 10 errors
if (errorsSinceLastSuccess.size() < 10) {
errorsSinceLastSuccess.add(reason);
}

// fail the task, if we have more than X failures in a row and more than Y seconds have passed since the last request
long errorCount = this.errorCount.incrementAndGet();
Duration timeSinceLastSuccess = Duration.nanosSince(lastSuccessfulRequest.get());
if (errorCount > maxConsecutiveErrorCount && timeSinceLastSuccess.compareTo(minErrorDuration) > 0) {
// it is weird to mark the task failed locally and then cancel the remote task, but there is no way to tell a remote task that it is failed
PrestoException exception = new PrestoException(TOO_MANY_REQUESTS_FAILED,
format("%s (%s - %s failures, time since last success %s)",
WORKER_NODE_ERROR,
taskUri,
errorCount,
timeSinceLastSuccess.convertTo(TimeUnit.SECONDS)));
errorsSinceLastSuccess.forEach(exception::addSuppressed);
throw exception;
}
}
}

public interface SimpleHttpResponseCallback<T>
{
void success(T value);
Expand Down

0 comments on commit 00e8f56

Please sign in to comment.