Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROPOSAL] Worker node failover based on subtask lineage #3308

Open
zhongchun opened this issue Dec 29, 2022 · 0 comments
Open

[PROPOSAL] Worker node failover based on subtask lineage #3308

zhongchun opened this issue Dec 29, 2022 · 0 comments

Comments

@zhongchun
Copy link
Contributor

Problem

Currently, the supervisor will cancel the execution of the entire stage after receiving an error report that the execution of the subtask fails when the MainPool process exits in Mars. For traditional batch processing, just rerun is good. However, this is very unfriendly to the scenario of a large job, because it has many subtasks and takes a long time to run. Once it fails, it will be expensive to rerun. At the same time, large jobs generally require much more nodes, and the probability of corresponding node failures will also increase. Once a node failure causes the MainPool to exit, the data on the corresponding node will be lost, and subsequent dependencies execution will fail. For example, a job has been running for more than 20 hours, and the execution is 90% complete, but because a certain MainPool exits, the entire job will fail. Large jobs are relatively common in modern data processing. A job will take up 1200 nodes or more, and it will take about 40 hours.
In order to solve the node failure problem and ensure the stable and normal operation of jobs, a complete node failover solution is required.

Solution

Failover architecture

As shown in the figure above, the green arrow is the normal execution flow, and the red arrow is the exception handling execution flow.

  • TaskSupervisorService first tiles a high-level computing graph into a low-level fine-grained computing graph. A node in this graph is a subtask; then, the subtask graph is handed over to SchedulingSupervisorService for scheduling.
  • Under normal circumstances, TaskManagerActor submits coarse-grained graphs to TaskProcessor, and after processing (such as tiles, fuses, and lineage records) it is then handed over to SubtaskManagerActor for scheduling and execution. When a subtask finishes executing, the TaskManagerActor is notified.
  • When an error occurs, such as a node fails, it will go to the exception handling process of the red arrow. First, perform error detection to determine whether recovery is possible, if possible, perform error recovery, and if not, report an error.

Lineage relationship management

Lineage relationship management is mainly for the convenience of backtracking data. When a subtask fails due to lack of upstream dependent data (node failure will cause data loss on it), lineage relationship can help us quickly find the subtask that generated the data. Then we can rerun these subtasks and restore the data.
There are two types of lineage relationship data in the same SubtaskGraph and in different SubtaskGraph.

  • In the same SubtaskGraph: When a subtask in a task only depends on its own subtasks, only one SubtaskGraph is involved; at this time, it is only necessary to find the upstream in the current graph.
  • In different SubtaskGraph: When a task needs to use the data of the previous task, two SubtaskGraph will be involved; at this time, it is necessary to find the upstream in the two SubtaskGraph.
    For the first case, the existing upstream and downstream information in the SubtaskGraph already meets the requirements; while the second case, we need to find the upstream across the SubtaskGraph, so we should record a concise lineage relationship to facilitate quick search.
    In fact, lineage relationship management only needs to record the last subtasks of each SubtaskGraph, that is, the result subtasks.

Error Detection

Error analysis

After Node exits, what errors will there be? How to detect? How to recover?

  • It will report ServerClosed error if SubtaskManagerActor continue to submit subtasks to the failed node.
  • Subtasks that depend on the data of the failed node will fail to execute, and there are two types of errors:
    • ServerClosed will be reported when MainPool is preparing data while the dependent node has exited.
    • DataNotExist will be reported when the MainPool is executing while the dependent node has exited.

How to catch these errors

  • In the first case, we can catch the ServerClosed directly after SubtaskManagerActor is submitted.
  • In the second case, we need to catch exceptions in TaskStageProcessor when set_subtask_result.

Error Recovery

For the first case:

  • Mark the ServerClosed node as stopped to avoid submitting other subtasks to it.
  • Resubmit failed subtasks to other nodes.
  • Resubmit subtasks queued on the failed node to other nodes.
    For the second case, we need to process as shown in the figure below:
    Error recovery
  • Find the dependent subtasks of the failed subtask. If the dependent subtask does not support re-running, cancel the current stage and report an error; if it supports re-running, add these subtasks to the lost_objects_queue (there is a priority field in the subtask, which can be determined according to this priority to schedule).
  • When crossing subtask graphs, it is not enough to distinguish the dependent subtasks found only by priority, and the earlier subtask must be rerun first. In order to better distinguish priorities, the execution order + subtask priority are used as priority considerations. That is, when recording lineage relationship, it is necessary to record the execution order of the subtask graph.
  • Record the dependent subtasks and failed subtasks, and when the dependent subtasks backtrack successfully, continue to schedule the failed subtasks.
  • Every time when scheduling, the subtask in the lost_objects_queue is prioritized, and the subtask is selected according to the priority; then the subtask in the worker queue is scheduled.

Node Management

Node management mainly includes two aspects:

  • Just mark it as stopped when a node fails, which is equivalent to removing it from the cluster to avoid subsequent submissions.
  • Use autoscale to add new nodes to the cluster if the cluster resources are insufficient.

Todos

Checkpoint

There will be a lot of backtracking, and the cost will be relatively high if the execution chain of the job is relatively long, especially when there are wide dependencies. In order to solve this problem, we checkpoint the critical operation stage and materialize the intermediate data to external storage. In this way, excessive lineage searches and subtask backtracking can be avoided.
Of course, the checkpoint function is configurable and does not depend strongly on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant