Skip to content

Commit

Permalink
[HUDI-4608] Fix upgrade command in Hudi CLI (apache#6374)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Aug 15, 2022
1 parent 997200f commit 2633e88
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,10 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps())
.forTable(metaClient.getTableConfig().getTableName()).build();
try {
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
new UpgradeDowngrade(metaClient, updatedConfig, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.valueOf(toVersion), null);
LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
Expand Down Expand Up @@ -83,10 +88,32 @@ public void init() throws Exception {
.withMarkerFile(DEFAULT_THIRD_PARTITION_PATH, "file-3", IOType.MERGE);
}

@Test
public void testDowngradeCommand() throws Exception {
// update hoodie.table.version to 1
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
@AfterEach
public void cleanup() {
if (timelineService != null) {
timelineService.close();
}
}

private static Stream<Arguments> testArgsForUpgradeDowngradeCommand() {
return Arrays.stream(new HoodieTableVersion[][] {
{HoodieTableVersion.FIVE, HoodieTableVersion.ZERO},
{HoodieTableVersion.ZERO, HoodieTableVersion.ONE},
// Table upgrade from version ONE to TWO requires key generator related configs
// such as "hoodie.datasource.write.recordkey.field" which is only available
// when user configures the write job. So the table upgrade from version ONE to TWO
// through CLI is not supported, and user should rely on the automatic upgrade
// in the write client instead.
// {HoodieTableVersion.ONE, HoodieTableVersion.TWO},
{HoodieTableVersion.TWO, HoodieTableVersion.FIVE}
}).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("testArgsForUpgradeDowngradeCommand")
public void testUpgradeDowngradeCommand(HoodieTableVersion fromVersion, HoodieTableVersion toVersion) throws Exception {
// Start with hoodie.table.version to 5
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE);
try (FSDataOutputStream os = metaClient.getFs().create(new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) {
metaClient.getTableConfig().getProps().store(os, "");
}
Expand All @@ -97,28 +124,35 @@ public void testDowngradeCommand() throws Exception {
assertEquals(1, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
}

SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, HoodieTableVersion.ZERO.name());

// verify hoodie.table.version got downgraded
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
if (fromVersion != HoodieTableVersion.FIVE) {
SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, fromVersion.name());
}
verifyTableVersion(fromVersion);

// verify hoodie.table.version
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
assertTableVersionFromPropertyFile();
SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, toVersion.name());
verifyTableVersion(toVersion);

// verify marker files are non existent
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
if (toVersion == HoodieTableVersion.ZERO) {
// verify marker files are non existent
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
}
}
}

private void assertTableVersionFromPropertyFile() throws IOException {
private void verifyTableVersion(HoodieTableVersion expectedVersion) throws IOException {
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
assertEquals(expectedVersion.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode());
assertTableVersionFromPropertyFile(expectedVersion);
}

private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException {
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
// Load the properties and verify
FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile);
HoodieConfig hoodieConfig = HoodieConfig.create(fsDataInputStream);
fsDataInputStream.close();
assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), hoodieConfig
assertEquals(Integer.toString(expectedVersion.versionCode()), hoodieConfig
.getString(HoodieTableConfig.VERSION));
}
}

0 comments on commit 2633e88

Please sign in to comment.