forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 3
/
StreamTaskSourceInput.java
165 lines (142 loc) · 6.23 KB
/
StreamTaskSourceInput.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.SourceOperator;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Implementation of {@link StreamTaskInput} that reads data from the {@link SourceOperator} and
* returns the {@link DataInputStatus} to indicate whether the source state is available,
* unavailable or finished.
*/
@Internal
public class StreamTaskSourceInput<T> implements StreamTaskInput<T>, CheckpointableInput {
private final SourceOperator<T, ?> operator;
private final int inputGateIndex;
private final AvailabilityHelper isBlockedAvailability = new AvailabilityHelper();
private final List<InputChannelInfo> inputChannelInfos;
private final int inputIndex;
public StreamTaskSourceInput(
SourceOperator<T, ?> operator, int inputGateIndex, int inputIndex) {
this.operator = checkNotNull(operator);
this.inputGateIndex = inputGateIndex;
inputChannelInfos = Collections.singletonList(new InputChannelInfo(inputGateIndex, 0));
isBlockedAvailability.resetAvailable();
this.inputIndex = inputIndex;
}
@Override
public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
/**
* Safe guard against best efforts availability checks. If despite being unavailable someone
* polls the data from this source while it's blocked, it should return {@link
* DataInputStatus.NOTHING_AVAILABLE}.
*/
if (isBlockedAvailability.isApproximatelyAvailable()) {
return operator.emitNext(output);
}
return DataInputStatus.NOTHING_AVAILABLE;
}
@Override
public CompletableFuture<?> getAvailableFuture() {
return isBlockedAvailability.and(operator);
}
@Override
public void blockConsumption(InputChannelInfo channelInfo) {
isBlockedAvailability.resetUnavailable();
}
@Override
public void resumeConsumption(InputChannelInfo channelInfo) {
isBlockedAvailability.getUnavailableToResetAvailable().complete(null);
}
@Override
public List<InputChannelInfo> getChannelInfos() {
return inputChannelInfos;
}
@Override
public int getNumberOfInputChannels() {
return inputChannelInfos.size();
}
/**
* This method is used with unaligned checkpoints to mark the arrival of a first {@link
* CheckpointBarrier}. For chained sources, there is no {@link CheckpointBarrier} per se flowing
* through the job graph. We can assume that an imaginary {@link CheckpointBarrier} was produced
* by the source, at any point of time of our choosing.
*
* <p>We are choosing to interpret it, that {@link CheckpointBarrier} for sources was received
* immediately as soon as we receive either checkpoint start RPC, or {@link CheckpointBarrier}
* from a network input. So that we can checkpoint state of the source and all of the other
* operators at the same time.
*
* <p>Also we are choosing to block the source, as a best effort optimisation as: - either there
* is no backpressure and the checkpoint "alignment" will happen very quickly anyway - or there
* is a backpressure, and it's better to prioritize processing data from the network to speed up
* checkpointing. From the cluster resource utilisation perspective, by blocking chained source
* doesn't block any resources from being used, as this task running the source has a backlog of
* buffered input data waiting to be processed.
*
* <p>However from the correctness point of view, {@link #checkpointStarted(CheckpointBarrier)}
* and {@link #checkpointStopped(long)} methods could be empty no-op.
*/
@Override
public void checkpointStarted(CheckpointBarrier barrier) {
blockConsumption(null);
}
@Override
public void checkpointStopped(long cancelledCheckpointId) {
resumeConsumption(null);
}
@Override
public int getInputGateIndex() {
return inputGateIndex;
}
@Override
public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException {}
@Override
public int getInputIndex() {
return inputIndex;
}
@Override
public void close() {
// SourceOperator is closed via OperatorChain
}
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
return CompletableFuture.completedFuture(null);
}
public OperatorID getOperatorID() {
return operator.getOperatorID();
}
public SourceOperator<T, ?> getOperator() {
return operator;
}
// Configure StreamTaskSourceInput#emitNext to emit at most one record to the given DataOutput.
public void disableEmitNextLoop() {
operator.disableEmitNextLoop();
}
}