Skip to content

Commit

Permalink
[HUDI-3180] Include files from completed commits while bootstrapping …
Browse files Browse the repository at this point in the history
…metadata table (apache#4519)
  • Loading branch information
nsivabalan authored and Vinoth Govindarajan committed Apr 3, 2022
1 parent 2c497ff commit 114b191
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
Expand Up @@ -746,9 +746,16 @@ protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String cre
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
// filter for files that are part of the completed commits
Map<String, Long> validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap), Option.empty());
});
partitionRecords = partitionRecords.union(fileListRecords);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
Expand All @@ -36,7 +37,10 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -76,6 +80,36 @@ public void testMetadataBootstrapInsertUpsertClean(HoodieTableType tableType) th
bootstrapAndVerify();
}

/**
* Validate that bootstrap considers only files part of completed commit and ignore any extra files.
*/
@Test
public void testMetadataBootstrapWithExtraFiles() throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType, false);
doPreBootstrapWriteOperation(testTable, INSERT, "0000001");
doPreBootstrapWriteOperation(testTable, "0000002");
doPreBootstrapClean(testTable, "0000003", Arrays.asList("0000001"));
doPreBootstrapWriteOperation(testTable, "0000005");
// add few extra files to table. bootstrap should include those files.
String fileName = UUID.randomUUID().toString();
Path baseFilePath = FileCreateUtils.getBaseFilePath(basePath, "p1", "0000006", fileName);
FileCreateUtils.createBaseFile(basePath, "p1", "0000006", fileName, 100);

writeConfig = getWriteConfig(true, true);
initWriteConfigAndMetatableWriter(writeConfig, true);
syncTableMetadata(writeConfig);

// remove those files from table. and then validate.
Files.delete(baseFilePath);

// validate
validateMetadata(testTable);
// after bootstrap do two writes and validate its still functional.
doWriteInsertAndUpsert(testTable);
validateMetadata(testTable);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataBootstrapInsertUpsertRollback(HoodieTableType tableType) throws Exception {
Expand Down
Expand Up @@ -304,6 +304,11 @@ public static void createBaseFile(String basePath, String partitionPath, String
Files.setLastModifiedTime(baseFilePath, FileTime.fromMillis(lastModificationTimeMilli));
}

public static Path getBaseFilePath(String basePath, String partitionPath, String instantTime, String fileId) {
Path parentPath = Paths.get(basePath, partitionPath);
return parentPath.resolve(baseFileName(instantTime, fileId));
}

public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version)
throws Exception {
createLogFile(basePath, partitionPath, instantTime, fileId, version, 0);
Expand Down

0 comments on commit 114b191

Please sign in to comment.