Skip to content

Commit f612013

Browse files
authored
[FLINK-35051][checkpoint] Treat handling of unaligned checkpoint barrier as the high priority mail (#26660)
1 parent 5334e7d commit f612013

File tree

9 files changed

+322
-72
lines changed

9 files changed

+322
-72
lines changed

flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,26 @@
2222
/** Options to configure behaviour of executing mailbox mails. */
2323
@Internal
2424
public class MailOptionsImpl implements MailboxExecutor.MailOptions {
25-
static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false);
26-
static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(true);
2725

26+
static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false, false);
27+
static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(false, true);
28+
static final MailboxExecutor.MailOptions URGENT = new MailOptionsImpl(true, false);
29+
30+
private final boolean isUrgent;
2831
private final boolean deferrable;
2932

30-
private MailOptionsImpl(boolean deferrable) {
33+
private MailOptionsImpl(boolean isUrgent, boolean deferrable) {
34+
this.isUrgent = isUrgent;
3135
this.deferrable = deferrable;
3236
}
3337

38+
@Override
3439
public boolean isDeferrable() {
3540
return deferrable;
3641
}
42+
43+
@Override
44+
public boolean isUrgent() {
45+
return isUrgent;
46+
}
3747
}

flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,35 @@ public interface MailboxExecutor {
8989
/** Extra options to configure enqueued mails. */
9090
@PublicEvolving
9191
interface MailOptions {
92-
static MailOptions options() {
93-
return MailOptionsImpl.DEFAULT;
94-
}
9592

9693
/**
97-
* Mark this mail as deferrable.
98-
*
99-
* <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
94+
* Runtime can decide to defer execution of deferrable mails. For example, to unblock
10095
* subtask thread as quickly as possible, deferrable mails are not executed during {@link
10196
* #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
10297
* execution of potentially long-running mails.
10398
*/
99+
boolean isDeferrable();
100+
101+
/**
102+
* The urgent mail will be executed first compared to other mails. For example, handling
103+
* unaligned checkpoint barrier or some control mails are expected to be executed as soon as
104+
* possible.
105+
*/
106+
boolean isUrgent();
107+
108+
static MailOptions options() {
109+
return MailOptionsImpl.DEFAULT;
110+
}
111+
112+
/** Mark this mail as deferrable. */
104113
static MailOptions deferrable() {
105114
return MailOptionsImpl.DEFERRABLE;
106115
}
116+
117+
/** Mark this mail as urgent. */
118+
static MailOptions urgent() {
119+
return MailOptionsImpl.URGENT;
120+
}
107121
}
108122

109123
/**

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor mailboxE
130130
() -> {
131131
try {
132132
mailboxExecutor.execute(
133+
MailboxExecutor.MailOptions.urgent(),
133134
this::processPriorityEvents,
134135
"process priority event @ gate %s",
135136
inputGate);

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,8 +1263,14 @@ public CompletableFuture<Boolean> triggerCheckpointAsync(
12631263
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
12641264
checkForcedFullSnapshotSupport(checkpointOptions);
12651265

1266+
MailboxExecutor.MailOptions mailOptions =
1267+
CheckpointOptions.AlignmentType.UNALIGNED == checkpointOptions.getAlignment()
1268+
? MailboxExecutor.MailOptions.urgent()
1269+
: MailboxExecutor.MailOptions.options();
1270+
12661271
CompletableFuture<Boolean> result = new CompletableFuture<>();
12671272
mainMailboxExecutor.execute(
1273+
mailOptions,
12681274
() -> {
12691275
try {
12701276
boolean noUnfinishedInputGates =

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,20 @@ public Mail(
5959
MailboxExecutor.MailOptions.options(),
6060
runnable,
6161
priority,
62+
descriptionFormat,
63+
descriptionArgs);
64+
}
65+
66+
public Mail(
67+
MailboxExecutor.MailOptions mailOptions,
68+
ThrowingRunnable<? extends Exception> runnable,
69+
int priority,
70+
String descriptionFormat,
71+
Object... descriptionArgs) {
72+
this(
73+
mailOptions,
74+
runnable,
75+
priority,
6276
StreamTaskActionExecutor.IMMEDIATE,
6377
descriptionFormat,
6478
descriptionArgs);

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,13 +330,14 @@ private void sendPoisonMail(RunnableWithException mail) {
330330
}
331331

332332
/**
333-
* Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is
334-
* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;
333+
* Sends the given <code>mail</code> with urgent. Intended use is to control this <code>
334+
* MailboxProcessor</code>; no interaction with tasks should be performed;
335335
*/
336336
private void sendControlMail(
337337
RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {
338-
mailbox.putFirst(
338+
mailbox.put(
339339
new Mail(
340+
MailboxExecutor.MailOptions.urgent(),
340341
mail,
341342
Integer.MAX_VALUE /*not used with putFirst*/,
342343
descriptionFormat,

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
* <h3>Threading model</h3>
3434
*
3535
* <p>The mailbox is bound to a mailbox thread passed during creation. Most operations may only
36-
* occur through that thread. Write operations ({@link #put(Mail)}, {@link #putFirst(Mail)}) can be
37-
* executed by any thread. All other methods can only be invoked by the mailbox thread, which is
38-
* passed upon construction. To verify that the current thread is allowed to take any mail, use
39-
* {@link #isMailboxThread()}, but all methods will perform the check themselves and fail
40-
* accordingly if called from another thread.
36+
* occur through that thread. Write operations ({@link #put(Mail)} can be executed by any thread.
37+
* All other methods can only be invoked by the mailbox thread, which is passed upon construction.
38+
* To verify that the current thread is allowed to take any mail, use {@link #isMailboxThread()},
39+
* but all methods will perform the check themselves and fail accordingly if called from another
40+
* thread.
4141
*
4242
* <h3>Life cycle</h3>
4343
*
@@ -54,9 +54,9 @@
5454
* smaller logical chunks, such that the task threads cannot be blocked by a mail that enqueues
5555
* itself and thus provides input starvation.
5656
*
57-
* <p>A batch is created with {@link #createBatch()} and consumed with {@link #tryTakeFromBatch()}.
58-
* Note that there is no blocking {@code takeFromBatch} as batches can only be created and consumed
59-
* from the mailbox thread.
57+
* <p>A batch is consumed with {@link #tryTakeFromBatch()}, and batch will be created during taking
58+
* mail. Note that there is no blocking {@code takeFromBatch} as batches can only be created and
59+
* consumed from the mailbox thread.
6060
*
6161
* <p>Also note that a batch can only be created in the {@link MailboxProcessor#runMailboxLoop()}. A
6262
* batch must not be extended in any of the consuming methods as we may run into task input
@@ -160,7 +160,7 @@ public interface TaskMailbox {
160160

161161
/**
162162
* Enqueues the given mail to the mailbox and blocks until there is capacity for a successful
163-
* put.
163+
* put. The Mail with ({@link MailboxExecutor.MailOptions#isUrgent} will be put in the head.
164164
*
165165
* <p>Mails can be added from any thread.
166166
*
@@ -169,16 +169,6 @@ public interface TaskMailbox {
169169
*/
170170
void put(Mail mail);
171171

172-
/**
173-
* Adds the given action to the head of the mailbox.
174-
*
175-
* <p>Mails can be added from any thread.
176-
*
177-
* @param mail the mail to enqueue.
178-
* @throws MailboxClosedException if the mailbox is quiesced or closed.
179-
*/
180-
void putFirst(Mail mail);
181-
182172
// --- Lifecycle methods
183173

184174
/** This enum represents the states of the mailbox lifecycle. */

0 commit comments

Comments
 (0)