Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix resuming subtasks #90

Draft
wants to merge 4 commits into
base: v0.10.5_patched
Choose a base branch
from
Draft

Conversation

snagasawa
Copy link

@snagasawa snagasawa commented Dec 19, 2023

概要

20231218 Digdag retry failedとSubtaskの調査 の問題を解決するためのpatch。
レビューのための実装の解説をまとめたので、20240227 Digdag retry failedによるReference Errorの対応方針 を参照してください。

動作確認

Integration Testを実行。
(ただし、一部でcredentialsが必要なテストがあるのと、元々Digdag本家のCIでもcoverage 100%ではないため、今回の動作確認でもcoverage 100%ではない)

CI_ACCEPTANCE_TEST=true DIGDAG_TEST_POSTGRESQL="$(cat digdag_test_postgresql)" ./gradlew clean digdag-tests:test -PwithoutUi --debug --stacktrace > result.log
  • digdag_test_postgresql
host = localhost
port = 5432
user = digdag
password = password
database = digdag_test
idleTimeout = 10
minimumPoolSize = 0

TODO

  • b79c994 でITを追加したが、実装とは異なる要因でerrorになるため要原因調査

@snagasawa snagasawa force-pushed the fix-resuming-subtasks branch 5 times, most recently from ee980f4 to faf9d38 Compare December 20, 2023 00:49
@snagasawa snagasawa changed the base branch from master to v0.10.4_patched_2 December 20, 2023 00:49
@snagasawa snagasawa force-pushed the fix-resuming-subtasks branch 3 times, most recently from 25391a7 to 1df6fb4 Compare December 21, 2023 07:10
@snagasawa snagasawa force-pushed the fix-resuming-subtasks branch 7 times, most recently from 584db23 to 2ca6c4e Compare February 15, 2024 06:10
@snagasawa snagasawa changed the base branch from v0.10.4_patched_2 to v0.10.5_patched February 15, 2024 06:10
@snagasawa snagasawa force-pushed the fix-resuming-subtasks branch 2 times, most recently from d702ca8 to 67e5426 Compare February 27, 2024 02:43
@snagasawa snagasawa changed the title Fix resuming subtasks fix resuming subtasks Feb 27, 2024
@snagasawa snagasawa marked this pull request as ready for review February 27, 2024 06:48
ResumingTask resumingTask = ImmutableResumingTask.builder()
.sourceTaskId(archivedTask.getId())
.fullName(archivedTask.getFullName())
.config(TaskConfig.validate(workflowTask.getConfig()))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ここのconfigのみarchivedTaskではなく、workflowTaskのconfigをINSERTしている。
これはtaskの定義が変更された場合に新しいrevisionのconfigである必要があるため。

具体的には、この修正がないと以下のIntegration Testでのretryで落ちる。
RetryIT.java

https://github.com/treasure-data/digdag/blob/v0.10.5.1/digdag-tests/src/test/resources/acceptance/retry/retry-1.dig

+step1:
  sh>: touch ${outdir}/1-1.out
+step2:
  +a:
    sh>: touch ${outdir}/1-2a.out
  +b:
    fail>: step2b fail

https://github.com/treasure-data/digdag/blob/v0.10.5.1/digdag-tests/src/test/resources/acceptance/retry/retry-2.dig

+step1:
  sh>: touch ${outdir}/2-1.out
+step2:
  +a:
    sh>: touch ${outdir}/2-2a.out
  +b:
    sh>: touch ${outdir}/2-2b.out

id = store.addSubtask(attemptId, task);
} else {
TaskStateCode state;
switch(archivedTask.getState()) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

元々のTaskControl#addTasksでは、resumingTasksの有無によってBLOCKED or SUCCESSで分岐していたが、6aa7cb9 によって「resumeで取得するtaskがSUCCESSのみ」という前提が変わったため、stateによる分岐が必要になった。

@@ -185,6 +188,133 @@ private static long addTasks(TaskControlStore store,
return rootTaskId;
}

private static long addInitialTasks(TaskControlStore store, long attemptId, long rootTaskId,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既存のTaskControl#addTasks は別の箇所からも参照されているため、ただ単に引数のresumingTasksをArchivedTasksに変更することはできなかった。
そのため、新たにaddInitialTasksを実装した。
参照: INSERT時のparameterの不足

.collect(Collectors.toMap(WorkflowTask::getFullName, task -> indexToId.get(workflowTasks.indexOf(task))));

archivedTasks.stream()
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

すでにINSERT済みのtaskは除外する。


archivedTasks.stream()
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName())
&& archivedTask.getFullName().contains("^sub"))
Copy link
Author

@snagasawa snagasawa Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

単にtask名に「^」を含む場合だと、「^failure-alert」や「^error」などの除外すべきsubtaskも含まれるため、狭義のsubtaskに限定する必要がある。

archivedTasks.stream()
.filter(archivedTask -> !taskNameAndIds.keySet().contains(archivedTask.getFullName())
&& archivedTask.getFullName().contains("^sub"))
.sorted(Comparator.comparingInt((t) -> (int) t.getId()))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

後続のtask_dependenciesテーブルへのINESRTのために、upstreamのtaskを先にtasksテーブルへのINSERTしてIDを採番する必要があるため、あらかじめretry failed前のattemptのTask IDでソートする。

return true;
}
return false;
})
.map(archived -> ResumingTask.of(archived))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResumingTaskに変換せず、ArchivedTaskのまま返すように。
参照: INSERT時のparameterの不足

tasks, ImmutableList.of(),
false, true, true,
resumingTasks);
long taskId = addInitialTasks(store, attemptId, rootTaskId, tasks, archivedTasks);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a463168 (#90) のaddInitialTasksに変更。

.map(task -> {
if (!task.getParentId().isPresent()) {
if (!task.getParentId().isPresent() && task.getState() == TaskStateCode.SUCCESS) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

すでに成功したattemptのresumeは禁止されているため、L310のfilterの変更に合わせて修正。
!task.getParentId().isPresent() は「parentIdがないtask = rootTask」のこと。
(rootTaskがSUCCESS = attemptもSUCCESS)

List<Long> successTasks = tasks.stream()
.filter(task -> task.getState() == TaskStateCode.SUCCESS)
List<Long> ids = tasks.stream()
.filter(t-> statuses.contains(t.getState()))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resumeするtaskをSUCCESSのみから変更。
参照: SUCCESSのみが取得対象


taskNameAndIds.put(archivedSubtask.getFullName(), id);

archivedTasks.stream()
Copy link
Author

@snagasawa snagasawa Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskControl#addTasksと同じようにtask_dependenciesテーブルへのINSERTが必要なため、task名からupstreamのIDを取得しINSERTする。

@snagasawa snagasawa requested a review from a team February 27, 2024 09:09
@katsuyan
Copy link

katsuyan commented Mar 7, 2024

一通りみてみて方針等問題なさそうと思いました。
一通りのテストが終わったタイミングで再度見させていただきます!

@snagasawa snagasawa marked this pull request as draft March 15, 2024 09:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants