Skip to content

Commit

Permalink
[HUDI-4528] Add diff tool to compare commit metadata (apache#6485)
Browse files Browse the repository at this point in the history
* Add diff tool to compare commit metadata
* Add partition level info to commits and compaction command
* Partition support for compaction archived timeline
* Add diff command test
  • Loading branch information
codope committed Sep 5, 2022
1 parent 24dd007 commit edbd7fd
Show file tree
Hide file tree
Showing 15 changed files with 630 additions and 212 deletions.
Expand Up @@ -181,4 +181,36 @@ public class HoodieTableHeaderFields {
public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX + HEADER_REQUESTED_TIME;
public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX + HEADER_INFLIGHT_TIME;
public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX + HEADER_COMPLETED_TIME;

public static TableHeader getTableHeader() {
return new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
}

public static TableHeader getTableHeaderWithExtraMetadata() {
return new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN);
}
}
229 changes: 86 additions & 143 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java

Large diffs are not rendered by default.

Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.CompactionAdminClient.RenameOpResult;
Expand Down Expand Up @@ -71,6 +70,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo;

/**
* CLI command to display compaction related options.
*/
Expand Down Expand Up @@ -115,24 +116,25 @@ public String compactionShow(
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
unspecifiedDefaultValue = "false") final boolean headerOnly,
@CliOption(key = {"partition"}, help = "Partition value") final String partition)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());

return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
}

@CliCommand(value = "compactions showarchived", help = "Shows compaction details for specified time window")
public String compactionsShowArchived(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days")
@CliOption(key = {"startTs"}, help = "start time for compactions, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day")
@CliOption(key = {"endTs"}, help = "end time for compactions, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, help = "Limit compactions",
unspecifiedDefaultValue = "-1") final Integer limit,
Expand All @@ -141,10 +143,10 @@ public String compactionsShowArchived(
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly) {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10);
startTs = getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = CommitUtil.getTimeDaysAgo(1);
endTs = getTimeDaysAgo(1);
}

HoodieTableMetaClient client = checkAndGetMetaClient();
Expand All @@ -168,7 +170,8 @@ public String compactionShowArchived(
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
unspecifiedDefaultValue = "false") final boolean headerOnly,
@CliOption(key = {"partition"}, help = "Partition value") final String partition)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
Expand All @@ -178,7 +181,7 @@ public String compactionShowArchived(
archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata(
archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
}
Expand Down Expand Up @@ -303,13 +306,13 @@ public String compact(
/**
* Prints all compaction details.
*/
private String printAllCompactions(HoodieDefaultTimeline timeline,
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
boolean includeExtraMetadata,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {
private static String printAllCompactions(HoodieDefaultTimeline timeline,
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
boolean includeExtraMetadata,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {

Stream<HoodieInstant> instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants();
List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = instantsStream
Expand Down Expand Up @@ -405,16 +408,19 @@ private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTim
}
}

protected String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {
protected static String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
boolean headerOnly,
final String partition) {
List<Comparable[]> rows = new ArrayList<>();
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
if (StringUtils.isNullOrEmpty(partition) || partition.equals(op.getPartitionPath())) {
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
}
}
}

Expand Down

0 comments on commit edbd7fd

Please sign in to comment.