From 89629469964a434ca924f654b6c2c307b3d529f3 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 19 Sep 2022 16:27:11 +0530 Subject: [PATCH] [HUDI-4832] Fix drop partition meta sync (#6662) --- .../common/table/timeline/TimelineUtils.java | 26 +++++++++++- .../metadata/HoodieTableMetadataUtil.java | 18 -------- .../hudi/common/table/TestTimelineUtils.java | 4 +- .../org/apache/hudi/sync/adb/AdbSyncTool.java | 2 +- .../org/apache/hudi/hive/HiveSyncTool.java | 14 +++---- .../apache/hudi/hive/TestHiveSyncTool.java | 38 ++++++++++++----- .../hudi/hive/testutils/HiveTestUtil.java | 2 +- .../hudi/sync/common/HoodieSyncClient.java | 41 ++++++++++--------- 8 files changed, 84 insertions(+), 61 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 9f5f4c23d076..75493e7b463e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -52,11 +53,34 @@ public class TimelineUtils { * Returns partitions that have new data strictly after commitTime. * Does not include internal operations such as clean in the timeline. */ - public static List getPartitionsWritten(HoodieTimeline timeline) { + public static List getWrittenPartitions(HoodieTimeline timeline) { HoodieTimeline timelineToSync = timeline.getWriteTimeline(); return getAffectedPartitions(timelineToSync); } + /** + * Returns partitions that have been deleted or marked for deletion in the given timeline. + * Does not include internal operations such as clean in the timeline. + */ + public static List getDroppedPartitions(HoodieTimeline timeline) { + HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline(); + + return replaceCommitTimeline.getInstants().flatMap(instant -> { + try { + HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes( + replaceCommitTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + if (WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { + Map> partitionToReplaceFileIds = commitMetadata.getPartitionToReplaceFileIds(); + return partitionToReplaceFileIds.keySet().stream(); + } else { + return Stream.empty(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions modified at " + instant, e); + } + }).distinct().filter(partition -> !partition.isEmpty()).collect(Collectors.toList()); + } + /** * Returns partitions that have been modified including internal operations such as clean in the passed timeline. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c7a0df5d6ad3..19634700fb92 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1357,22 +1357,4 @@ public static Set getInflightAndCompletedMetadataPartitions(HoodieTableC inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions()); return inflightAndCompletedPartitions; } - - /** - * Get Last commit's Metadata. - */ - public static Option getLatestCommitMetadata(HoodieTableMetaClient metaClient) { - try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - HoodieInstant instant = timeline.lastInstant().get(); - byte[] data = timeline.getInstantDetails(instant).get(); - return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); - } else { - return Option.empty(); - } - } catch (Exception e) { - throw new HoodieException("Failed to get commit metadata", e); - } - } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 380c4c521255..da078372b5c3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException { assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"})); // verify only commit actions - partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(4, partitions.size()); assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); - partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(3, partitions.size()); assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); } diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java index 1c578b102cfa..5f4a36631a22 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { writtenPartitionsSince = new ArrayList<>(); } else { - writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); } LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size()); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 0374686b7166..adfe52f920d1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; @@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Check if the necessary table exists boolean tableExists = syncClient.tableExists(tableName); - // check if isDropPartition - boolean isDropPartition = syncClient.isDropPartition(); - // Get the parquet schema for this table looking at the latest commit MessageType schema = syncClient.getStorageSchema(); @@ -225,11 +223,13 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName); } LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); - List writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + List writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition); + // find dropped partitions, if any, in the latest commit + Set droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { syncClient.updateLastCommitTimeSynced(tableName); @@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDropPartition) { + private boolean syncPartitions(String tableName, List writtenPartitionsSince, Set droppedPartitions) { boolean partitionsChanged; try { List hivePartitions = syncClient.getAllPartitions(tableName); List partitionEvents = - syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition); + syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions); List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); if (!newPartitions.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 0673e08489ef..1d454786859e 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -197,8 +197,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty()); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); @@ -475,10 +475,10 @@ public void testSyncIncremental(String syncMode) throws Exception { // Lets do the sync reSyncHiveTable(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -754,10 +754,10 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); reinitHiveSyncClient(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); - assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); + assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size()); } @ParameterizedTest @@ -854,17 +854,33 @@ public void testDropPartition(String syncMode) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); + // add a partition but do not sync + String instantTime2 = "101"; + String newPartition = "2010/02/01"; + HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2); + HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME); + partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(1, partitions.size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + + // create two replace commits to delete current partitions, but do not sync in between String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/"); - // create a replace commit to delete current partitions+ - HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true); + String instantTime3 = "102"; + HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true); + String instantTime4 = "103"; + HiveTestUtil.createReplaceCommit(instantTime4, newPartition, WriteOperationType.DELETE_PARTITION, true, true); - // sync drop partitions + // now run hive sync reinitHiveSyncClient(); reSyncHiveTable(); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); assertEquals(0, hivePartitions.size(), - "Table should have 0 partition because of the drop the only one partition"); + "Table should have no partitions"); + assertEquals(instantTime4, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index f3c8b3da5e38..a58f835dab29 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String fsout.close(); } - public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { + public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeReplaceFileName(instantTime)); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 32ade18d0811..af06f5908ce3 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -20,16 +20,13 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -41,8 +38,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; @@ -83,18 +82,17 @@ public boolean isBootstrap() { return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); } - public boolean isDropPartition() { - try { - Option hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient); - - if (hoodieCommitMetadata.isPresent() - && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) { - return true; - } - } catch (Exception e) { - throw new HoodieSyncException("Failed to get commit metadata", e); - } - return false; + /** + * Get the set of dropped partitions since the last synced commit. + * If last sync time is not known then consider only active timeline. + * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. + */ + public Set getDroppedPartitionsSince(Option lastCommitTimeSynced) { + HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) + .mergeTimeline(metaClient.getActiveTimeline()) + .getCommitsTimeline() + .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); + return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); } @Override @@ -106,7 +104,7 @@ public MessageType getStorageSchema() { } } - public List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { + public List getWrittenPartitionsSince(Option lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + config.getString(META_SYNC_BASE_PATH) @@ -118,8 +116,11 @@ public List getPartitionsWrittenToSince(Option lastCommitTimeSyn config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); + return TimelineUtils.getWrittenPartitions( + metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) + .mergeTimeline(metaClient.getActiveTimeline()) + .getCommitsTimeline() + .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); } } @@ -127,7 +128,7 @@ public List getPartitionsWrittenToSince(Option lastCommitTimeSyn * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. * Generate a list of PartitionEvent based on the changes required. */ - public List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDropPartition) { + public List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, Set droppedPartitions) { Map paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); @@ -143,7 +144,7 @@ public List getPartitionEvents(List tablePartitions, // Check if the partition values or if hdfs path is the same List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (isDropPartition) { + if (droppedPartitions.contains(storagePartition)) { events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); } else { if (!storagePartitionValues.isEmpty()) {