Skip to content

Commit 97af1ef

Browse files
authored
[FLINK-37422][state/forst] Respect the maxTransferBytes when using path copying in ForSt (#26700)
1 parent f612013 commit 97af1ef

File tree

2 files changed

+59
-6
lines changed

2 files changed

+59
-6
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ private HandleAndLocalPath copyFileToCheckpoint(
127127
sourceStateHandle,
128128
checkpointStreamFactory,
129129
stateScope,
130-
tmpResourcesRegistry);
130+
tmpResourcesRegistry,
131+
maxTransferBytes);
131132
if (targetStateHandle != null) {
132133
LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, targetStateHandle);
133134
} else {
@@ -153,15 +154,22 @@ private HandleAndLocalPath copyFileToCheckpoint(
153154
* @param checkpointStreamFactory The checkpoint stream factory
154155
* @param stateScope The state scope
155156
* @param tmpResourcesRegistry The temporary resources registry
157+
* @param maxTransferBytes The max transfer bytes
156158
* @return The target state handle if path-copying is successful, otherwise null
157159
*/
158160
private @Nullable StreamStateHandle tryPathCopyingToCheckpoint(
159161
@Nonnull StreamStateHandle sourceHandle,
160162
CheckpointStreamFactory checkpointStreamFactory,
161163
CheckpointedStateScope stateScope,
162-
CloseableRegistry tmpResourcesRegistry) {
164+
CloseableRegistry tmpResourcesRegistry,
165+
long maxTransferBytes) {
163166

164167
try {
168+
// skip if there is a limit of transfer bytes
169+
if (maxTransferBytes > 0 && maxTransferBytes != Long.MAX_VALUE) {
170+
return null;
171+
}
172+
165173
// copy the file by duplicating
166174
if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, stateScope)) {
167175
return null;

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.UUID;
6969

7070
import static org.assertj.core.api.Assertions.assertThat;
71+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
7172

7273
/** Unit test for {@link ReusableDataTransferStrategy}. */
7374
@ExtendWith(ParameterizedTestExtension.class)
@@ -169,6 +170,10 @@ private byte[] genRandomBytes(int length) {
169170
}
170171

171172
private void createDbFiles(List<String> fileNames) throws IOException {
173+
createDbFiles(fileNames, 2048);
174+
}
175+
176+
private void createDbFiles(List<String> fileNames, int fileLength) throws IOException {
172177
for (String fileName : fileNames) {
173178
Path dir =
174179
FileOwnershipDecider.shouldAlwaysBeLocal(new Path(fileName))
@@ -177,7 +182,7 @@ private void createDbFiles(List<String> fileNames) throws IOException {
177182
FSDataOutputStream output =
178183
dbDelegateFileSystem.create(
179184
new Path(dir, fileName), FileSystem.WriteMode.OVERWRITE);
180-
output.write(genRandomBytes(2048));
185+
output.write(genRandomBytes(fileLength));
181186
output.sync();
182187
output.close();
183188
dbFilePaths.put(fileName, new Path(dir, fileName));
@@ -253,13 +258,18 @@ private void assertFilesReusedToCheckpoint(List<HandleAndLocalPath> checkpointHa
253258
}
254259

255260
private DBFilesSnapshot snapshot(DataTransferStrategy strategy) throws IOException {
261+
return snapshot(strategy, Long.MAX_VALUE);
262+
}
263+
264+
private DBFilesSnapshot snapshot(DataTransferStrategy strategy, long maxTransferBytes)
265+
throws IOException {
256266
DBFilesSnapshot snapshot = new DBFilesSnapshot();
257267
for (String fileName : dbFilePaths.keySet()) {
258268
Path dbFilePath = dbFilePaths.get(fileName);
259269
HandleAndLocalPath handleAndLocalPath =
260270
strategy.transferToCheckpoint(
261271
dbFilePath,
262-
MAX_TRANSFER_BYTES,
272+
maxTransferBytes,
263273
checkpointStreamFactory,
264274
CheckpointedStateScope.SHARED,
265275
closeableRegistry,
@@ -395,8 +405,6 @@ public static List<Object[]> parameters() {
395405

396406
@TempDir static java.nio.file.Path tempDir;
397407

398-
private static final long MAX_TRANSFER_BYTES = Long.MAX_VALUE;
399-
400408
private DBFilesContainer createDb(
401409
JobID jobID,
402410
int subtaskIndex,
@@ -638,4 +646,41 @@ void testUncompletedCheckpoint() throws IOException {
638646
lastSnapshot.checkFilesExist(false, dbDirUnderCpDir);
639647
lastSnapshot.checkFilesExist(true, false);
640648
}
649+
650+
private void createDbFilesWithExactSize(
651+
DBFilesContainer db, List<String> newDbFileNames, int fileLength) throws IOException {
652+
db.createDbFiles(newDbFileNames, fileLength);
653+
for (String fileName : newDbFileNames) {
654+
long fileLen =
655+
db.dbDelegateFileSystem.getFileStatus(db.dbFilePaths.get(fileName)).getLen();
656+
assertThat(fileLen).isEqualTo(fileLength);
657+
}
658+
db.checkDbFilesExist(newDbFileNames);
659+
}
660+
661+
@TestTemplate
662+
public void testSnapshotWithMaxTransferBytes() throws IOException {
663+
FileNameGenerator fileNameGenerator = new FileNameGenerator();
664+
JobID jobID = new JobID();
665+
Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy =
666+
createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying);
667+
DBFilesContainer db = dbAndStrategy.f0;
668+
DataTransferStrategy strategy = dbAndStrategy.f1;
669+
670+
// skip the cases when db files are reused for snapshots
671+
assumeFalse(strategy instanceof ReusableDataTransferStrategy);
672+
System.out.println(strategy.getClass());
673+
674+
// create new files for DB
675+
createDbFilesWithExactSize(db, fileNameGenerator.genMultipleFileNames(4, 4), 2048);
676+
createDbFilesWithExactSize(db, fileNameGenerator.genMultipleFileNames(4, 4), 128);
677+
678+
// create a snapshot
679+
DBFilesSnapshot lastSnapshot = db.snapshot(strategy, 1024);
680+
db.assertFilesReusedToCheckpoint(lastSnapshot.getStateHandles());
681+
682+
for (Tuple2<Path, HandleAndLocalPath> tuple : lastSnapshot.dbSnapshotFiles.values()) {
683+
assertThat(tuple.f1.getStateSize()).isLessThanOrEqualTo(1024);
684+
}
685+
}
641686
}

0 commit comments

Comments
 (0)