Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure final task info set on worker node failure #18175

Merged
merged 1 commit into from
Jul 11, 2023

Conversation

losipiuk
Copy link
Member

@losipiuk losipiuk commented Jul 7, 2023

With current logic in HttpRemoteTask.fatalUnacknowledgedFailure() it was possible that we only set FAILED status in TaskStatus in ContinuesTaskStatusFetcher but TaskInfo in TaskInfoFetcher will not be updated.

It happened when we entered the else branch for
(cause instanceof TrinoTransportException) condition. The TaskInfoFetcher.taskInfo was not updated then. Subsequent calls to fatalUnacknowledgedFailure where then no-op due to !taskStatus.getState().isDone() condition at the top. While at this time TaskInfoFetcher may no longer be able to update task Info (worker node can be gone already) and it will remain stale forever.

It resulted in a split brain scenario that parts of the system observe Task as failed while others (those depending on TaskInfo) reported that task is some other state (I observed either ABORTING or FAILING, but other people reported RUNNING too).

It was mostly visible in UI which reported task information based on TaskInfo.

The bigger problem which it caused was that it cased queries to hang
forever. The reason for that is that
EventDrivenFaultTolerantQueryScheduler uses finalTaskInfo delivery as a
trigger for remote task completion logic.

Investigated it together with @findepi

Description

Additional context and related issues

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# General
* Fix bug where queries run with `task.retry-policy=TASK` could to hang in presence of worker node failures. ({issue}`18175 `)

@losipiuk
Copy link
Member Author

losipiuk commented Jul 7, 2023

@arhimondr @jamesgpearce PTAL and help me understand:

  • why we are observing hangs of FTE queries when we kill worker nodes mid query? is this really the fix?
  • If so - is it proper fix - especially doesn't it contradict Graceful Task Termination #15478?

@losipiuk losipiuk requested a review from findepi July 7, 2023 15:21
@@ -1030,8 +1030,8 @@ private synchronized void fatalUnacknowledgedFailure(Throwable cause)
updateTaskInfo(getTaskInfo().withTaskStatus(taskStatus));
}
else {
// Let the status callbacks trigger the cleanup command remotely after switching states
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where was the trigger the cleanup command remotely supposed to take place?

@pettyjamesm
Copy link
Member

pettyjamesm commented Jul 7, 2023

I do see that it's possible for a specific sequence of operations around fatalUnacknowledgedFailure to result in a stuck TaskInfo that doesn't align with the state reported by TaskStatus, and from my understanding the necessary chain of events is:

  1. Something callsHttpRemoteTask#fatalUnacknowledgedFailure with an exception that is not a TrinoTransportException. Immediately, this updates TaskStatus, but not TaskInfo
  2. The TaskStatus update listener will then be triggered and call HttpRemoteTask#cleanUpTask which in turn will call HttpRemoteTask#scheduleAsyncCleanupRequest, which is desired, because as far as we know at this point the worker may still be reachable (it wasn't a TrinoTransportException) and we still want to attempt to get the final TaskInfo response from the worker if possible.
  3. The cleanup request does not succeed because the worker is not reachable, and should eventually cause a TrinoTransportException to be thrown and passed into HttpRemoteTask#fatalUnacknowledgedFailure, but that exception is ignored because the check to getTaskStatus().getState().isDone() returns true.

Your change addresses this by not updating the TaskStatus as part of the first failure, so the second failure is recorded correctly- but in doing so we discarded the first failure entirely- so the the task status may not end up reflecting a FAILED state at all (ABORTED or FINISHED are possible depending on the circumstances), but in either case we will have lost the original failure cause and we will have delayed responding to the failure until the cleanup request finally fails with your current change.

To me, the more correct fix would seem to be a change that handles failures inside of scheduleAsyncCleanupRequest as final and updating TaskInfo even when getTaskStatus().getState().isDone(). Does that seem correct based on your investigation?

@losipiuk
Copy link
Member Author

losipiuk commented Jul 7, 2023

I do see that it's possible for a specific sequence of operations around fatalUnacknowledgedFailure to result in a stuck TaskInfo that doesn't align with the state reported by TaskStatus, and from my understanding the necessary chain of events is:

  1. Something callsHttpRemoteTask#fatalUnacknowledgedFailure with an exception that is not a TrinoTransportException. Immediately, this updates TaskStatus, but not TaskInfo
  2. The TaskStatus update listener will then be triggered and call HttpRemoteTask#cleanUpTask which in turn will call HttpRemoteTask#scheduleAsyncCleanupRequest, which is desired, because as far as we know at this point the worker may still be reachable (it wasn't a TrinoTransportException) and we still want to attempt to get the final TaskInfo response from the worker if possible.
  3. The cleanup request does not succeed because the worker is not reachable, and should eventually cause a TrinoTransportException to be thrown and passed into HttpRemoteTask#fatalUnacknowledgedFailure, but that exception is ignored because the check to getTaskStatus().getState().isDone() returns true.

Your change addresses this by not updating the TaskStatus as part of the first failure, so the second failure is recorded correctly- but in doing so we discarded the first failure entirely- so the the task status may not end up reflecting a FAILED state at all (ABORTED is possible), but certainly won't record the original cause in either case.

To me, the more correct fix would seem to be a change that handles failures inside of scheduleAsyncCleanupRequest as final and updating TaskInfo even when getTaskStatus().getState().isDone(). Does that seem correct based on your investigation?

Yeah - it makes total sense. I missed the HttpRemoteTask#cleanUpTask being called from the listener. I will rework.

Still I do not understand why without the fix we are observing those tasks for which there is discrepancy between taskStatus and taskInfo hanging forever and preventing query from completion (in FTE mode).

@losipiuk
Copy link
Member Author

losipiuk commented Jul 7, 2023

@pettyjamesm PTAL if this is more-or-less in line what you were thinking :)

@@ -1001,14 +1003,18 @@ public void onFailure(Throwable t)
}, executor);
}

private synchronized void fatalUnacknowledgedFailure(Throwable cause) {
Copy link
Member

@pettyjamesm pettyjamesm Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making this a top level method is more likely to cause confusion, because this should only be valid to call from within the failure hander in doScheduleAsyncCleanupRequest. Maybe it should be a method defined there and more specifically named something like failedToCleanUpRemoteTask(TrinoTransportException) ?

That way, we know that the update is always guarded by the check for getTaskInfo().getTaskStatus().getState().isDone() which will avoids the potential for replacing some other final state that TaskInfo might have had set on it already and also avoids accumulating multiple redundant TrinoTransportException failures reported from other calls (which will all likely fail very similarly).

@pettyjamesm
Copy link
Member

By the way, @losipiuk - it would be great if we knew what the initial failure was in this scenario because there might be another bug where that exception should be a TrinoTransportException but currently is mishandled / miscategorized.

@losipiuk
Copy link
Member Author

losipiuk commented Jul 7, 2023

By the way, @losipiuk - it would be great if we knew what the initial failure was in this scenario because there might be another bug where that exception should be a TrinoTransportException but currently is mishandled / miscategorized.

I added logging for debugging so I know that exact exception was

io.trino.spi.TrinoException: Expected response code from http://192.168.111.200:8080/v1/task/20230707_120851_00010_tbby5.4.2.0/status to be 200, but was 500
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@6630a715[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@59dce08f[Wrapped task = com.google.common.util.concurrent.TimeoutFuture$Fire@71143f48]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d5f10c9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1011]
        at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
        at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
        at com.google.common.util.concurrent.TimeoutFuture.create(TimeoutFuture.java:49)
        at com.google.common.util.concurrent.Futures.withTimeout(Futures.java:413)
        at com.google.common.util.concurrent.FluentFuture.withTimeout(FluentFuture.java:291)
        at io.airlift.concurrent.MoreFutures.addTimeout(MoreFutures.java:431)
        at io.trino.server.TaskResource.getTaskStatus(TaskResource.java:245)
        at jdk.internal.reflect.GeneratedMethodAccessor278.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
        at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
        at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
        at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
        at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:240)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:697)
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
        at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:357)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764)
        at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1665)
        at io.airlift.http.server.TraceTokenFilter.doFilter(TraceTokenFilter.java:62)
        at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:202)
        at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1635)
        at io.airlift.http.server.TimingFilter.doFilter(TimingFilter.java:51)
        at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:202)
        at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1635)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:527)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:131)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
        at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:822)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:223)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1381)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1303)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129)
        at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:141)
        at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
        at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:51)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122)
        at org.eclipse.jetty.server.Server.handle(Server.java:563)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$0(HttpChannel.java:505)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:762)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:497)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:282)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100)
        at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
        at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:416)
        at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:385)
        at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:272)
        at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.lambda$new$0(AdaptiveExecutionStrategy.java:140)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149)
        at java.base/java.lang.Thread.run(Thread.java:833)

        at io.trino.server.remotetask.SimpleHttpResponseHandler.onSuccess(SimpleHttpResponseHandler.java:62)
        at io.trino.server.remotetask.SimpleHttpResponseHandler.onSuccess(SimpleHttpResponseHandler.java:27)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133)
        at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

we could fix the code to translate exceptions of such shape TrinoTransportException.

We still need the original fix though in general mechanism, though.

@losipiuk losipiuk force-pushed the lo/tasks-hang branch 2 times, most recently from ec44e72 to 87a2e80 Compare July 7, 2023 21:49
if (taskStatus.getState().isDone()) {
log.warn("Task %d already in terminal state %s; cannot overwrite with FAILED due to %s", taskStatus.getTaskId(), taskStatus.getState(), cause);
}
TaskStatus failedTaskStatus = failWith(taskStatus, FAILED, failures);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pettyjamesm PTAL.

I wonder if we should still use failedTaskStatus in updateTaskInfo if taskStatus.getState().getDone() is true.
Or we should just propagate whatever taskStatus is to taskInfo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to use original taskStatus if we cannot override - seems more correct.

@losipiuk
Copy link
Member Author

we could fix the code to translate exceptions of such shape TrinoTransportException.

Actually the exception which caused the original issue should probably not be treated as TrinoTransportException. It does not relate to intermittent node problem.

@@ -960,7 +960,7 @@ public void onSuccess(JsonResponse<TaskInfo> result)
// if cleanup operation has not at least started task termination, mark the task failed
TaskState taskState = getTaskInfo().getTaskStatus().getState();
if (!taskState.isTerminatingOrDone()) {
fatalUnacknowledgedFailure(new TrinoTransportException(REMOTE_TASK_ERROR, fromUri(request.getUri()), format("Unable to %s task at %s, last known state was: %s", action, request.getUri(), taskState)));
fatalAsyncCleanupFailure(new TrinoTransportException(REMOTE_TASK_ERROR, fromUri(request.getUri()), format("Unable to %s task at %s, last known state was: %s", action, request.getUri(), taskState)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should fatalUnacknowledgedFailure lose instanceof TrinoTransportException now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not matter much I think. With TrinoTransportException we have extra remoteHost information which can still be propagate via TaskInfo to user via else code path in fatalAsyncCleanupFailure. May be of some use for debugging - but for sure not crucial.
I have no strong opinion. @pettyjamesm ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's still possible for a fatal unacknowledged failure to be triggered with TrinoTransportException before cleanup is attempted, and when that occurs we can transition the TaskInfo to failed immediately which should be preferable (cleanup will still be attempted, but we're not anticipating that succeeding and we don't need to wait for it to fail before reflecting that in TaskInfo).

@pettyjamesm
Copy link
Member

Actually the exception which caused the original issue should probably not be treated as TrinoTransportException. It does not relate to intermittent node problem.

Agreed, a 5xx response isn't necessarily a fatal exception and we don't need to reclassify that particular failure

Copy link
Member

@pettyjamesm pettyjamesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except for a few minor suggested stylistic / clarity changes.

@@ -960,7 +960,7 @@ public void onSuccess(JsonResponse<TaskInfo> result)
// if cleanup operation has not at least started task termination, mark the task failed
TaskState taskState = getTaskInfo().getTaskStatus().getState();
if (!taskState.isTerminatingOrDone()) {
fatalUnacknowledgedFailure(new TrinoTransportException(REMOTE_TASK_ERROR, fromUri(request.getUri()), format("Unable to %s task at %s, last known state was: %s", action, request.getUri(), taskState)));
fatalAsyncCleanupFailure(new TrinoTransportException(REMOTE_TASK_ERROR, fromUri(request.getUri()), format("Unable to %s task at %s, last known state was: %s", action, request.getUri(), taskState)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's still possible for a fatal unacknowledged failure to be triggered with TrinoTransportException before cleanup is attempted, and when that occurs we can transition the TaskInfo to failed immediately which should be preferable (cleanup will still be attempted, but we're not anticipating that succeeding and we don't need to wait for it to fail before reflecting that in TaskInfo).

With current logic in HttpRemoteTask.fatalUnacknowledgedFailure()
it was possible that we only set `FAILED` status in TaskStatus in
ContinuesTaskStatusFetcher but TaskInfo in TaskInfoFetcher will not be
updated.

It happened when we entered the `else` branch for
`(cause instanceof TrinoTransportException)` condition.
The `TaskInfoFetcher.taskInfo` was not updated then.
Subsequent calls to `fatalUnacknowledgedFailure` where then no-op due
to `!taskStatus.getState().isDone()` condition at the top.
While at this time `TaskInfoFetcher` may no longer be able to update
task Info (worker node can be gone already) and it will remain stale
forever.

It resulted in a split brain scenario that parts of the system observe
Task as failed while others (those depending on TaskInfo) reported that
task is some other state (I observed either ABORTING or FAILING, but
other people reported RUNNING too).

It was mostly visible in UI which reported task information based on
TaskInfo.

The bigger problem which it caused was that it cased queries to hang
forever. The reason for that is that
EventDrivenFaultTolerantQueryScheduler uses finalTaskInfo delivery as a
trigger for remote task completion logic.
@losipiuk losipiuk marked this pull request as ready for review July 10, 2023 16:45
@losipiuk
Copy link
Member Author

@arhimondr is there any particular reason to use final task info listener here:

task.addFinalTaskInfoListener(taskExecutionStats::update);
task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.getTaskStatus())));

Updating taskExecutionStats must stay but it looks we can change RemoteTaskCompletedEvent to use taskState like this:

    task.addStateChangeListener(taskStatus -> eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)));

@losipiuk
Copy link
Member Author

@pettyjamesm updated

@losipiuk
Copy link
Member Author

@arhimondr is there any particular reason to use final task info listener here:

task.addFinalTaskInfoListener(taskExecutionStats::update);
task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.getTaskStatus())));

Updating taskExecutionStats must stay but it looks we can change RemoteTaskCompletedEvent to use taskState like this:

    task.addStateChangeListener(taskStatus -> eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)));

Answering myself for sake of future readers. For case when task completes successfully we need to wait for finalTaskInfo to get Output data size statistics which are obtained from finalTaskInfo via retrieveAndDropSpoolingOutputStats.
Techniacally we could wire listener for task failure via task.addStateChangeListener but it would complicate the codebase.

@losipiuk
Copy link
Member Author

CI: #18226

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants