Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected void publishStats(IcebergTableStats icebergTableStats) {
}

/**
* Publish commit events. Override this method in li-openhouse to send to Kafka.
* Publish commit events.
*
* @param commitEvents List of commit events to publish
*/
Expand All @@ -110,7 +110,7 @@ protected void publishCommitEvents(List<CommitEventTable> commitEvents) {
}

/**
* Publish partition-level commit events. Override this method in li-openhouse to send to Kafka.
* Publish partition-level commit events.
*
* @param partitionEvents List of partition events to publish
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,74 @@ public static List<CommitEventTable> populateCommitEventTable(Table table, Spark
return commitEventTableList;
}

/**
* Builds an enriched DataFrame containing partition data joined with commit metadata.
*
* <p>This shared helper method queries Iceberg metadata tables (all_entries and snapshots) and
* creates a DataFrame with partition information enriched with commit metadata.
*
* <p>This is a pure query builder - it does not manage caching or counting. The caller is
* responsible for the DataFrame lifecycle (cache, count, collect, unpersist).
*
* <p><b>Output Schema:</b>
*
* <ul>
* <li>snapshot_id: long - Iceberg snapshot ID
* <li>committed_at: long - Commit timestamp in epoch seconds
* <li>operation: string - Commit operation (append, overwrite, delete, etc.)
* <li>summary: map&lt;string,string&gt; - Commit summary metadata
* <li>partition: struct - Partition column values as a struct
* </ul>
*
* <p><b>For unpartitioned tables:</b> Returns null to indicate no partition data available.
*
* <p><b>Visibility:</b> Package-private for testing purposes.
*
* @param table Iceberg Table
* @param spark SparkSession
* @return DataFrame with enriched partition and commit data, or null if unpartitioned
*/
static Dataset<Row> buildEnrichedPartitionDataFrame(Table table, SparkSession spark) {
String fullTableName = table.name();

// Check if table is partitioned
PartitionSpec spec = table.spec();
if (spec.isUnpartitioned()) {
log.info("Table {} is unpartitioned, no enriched partition data to build", fullTableName);
return null;
}

// Query all_entries metadata table for partitions per commit
// Use DISTINCT to deduplicate (snapshot_id, partition) pairs
// No status filter - captures all affected partitions (ADDED or DELETED files)
String allEntriesQuery =
String.format(
"SELECT DISTINCT snapshot_id, data_file.partition " + "FROM %s.all_entries",
table.name());

log.info("Executing all_entries query for table {}: {}", fullTableName, allEntriesQuery);
Dataset<Row> partitionsPerCommitDF = spark.sql(allEntriesQuery);

// Query snapshots to get commit metadata
String snapshotsQuery =
String.format(
"SELECT snapshot_id, committed_at, operation, summary " + "FROM %s.snapshots",
table.name());

Dataset<Row> snapshotsDF = spark.sql(snapshotsQuery);

// Join partitions with commit metadata and return
// Caller manages the lifecycle (cache, count, collect, unpersist)
return partitionsPerCommitDF
.join(snapshotsDF, "snapshot_id")
.select(
functions.col("snapshot_id"),
functions.col("committed_at").cast("long"), // Cast timestamp to epoch seconds
functions.col("operation"),
functions.col("summary"),
functions.col("partition")); // Keep partition struct for transformation
}

/**
* Collect partition-level commit events for a table.
*
Expand Down Expand Up @@ -532,14 +600,17 @@ public static List<CommitEventTablePartitions> populateCommitEventTablePartition
String fullTableName = table.name();
log.info("Collecting partition-level commit events for table: {}", fullTableName);

// Step 1: Check if table is partitioned
PartitionSpec spec = table.spec();
if (spec.isUnpartitioned()) {
log.info("Table {} is unpartitioned, no partition events to collect", fullTableName);
// Step 1: Build enriched DataFrame with partition and commit data using shared helper
Dataset<Row> enrichedDF = buildEnrichedPartitionDataFrame(table, spark);

// Check if any data was found
if (enrichedDF == null) {
log.info("No partition-level commit events found for table: {}", fullTableName);
return Collections.emptyList();
}

// Step 2: Parse table name components
// Step 2: Parse table name components for transformation
PartitionSpec spec = table.spec();
String dbName = getDatabaseName(fullTableName);
if (dbName == null) {
return Collections.emptyList();
Expand All @@ -555,61 +626,27 @@ public static List<CommitEventTablePartitions> populateCommitEventTablePartition
List<String> partitionColumnNames =
spec.fields().stream().map(f -> f.name()).collect(Collectors.toList());

// Step 3: Query all_entries metadata table for partitions per commit
// Use DISTINCT to deduplicate (snapshot_id, partition) pairs
// No status filter - captures all affected partitions (ADDED or DELETED files)
// Note: We query snapshots here even though populateCommitEventTable() also queries it.
// This is intentional to maintain parallel execution (both methods run simultaneously).
// Snapshots query is fast (~10-50ms, hits Iceberg metadata cache).
String allEntriesQuery =
String.format(
"SELECT DISTINCT snapshot_id, data_file.partition " + "FROM %s.all_entries",
table.name());

log.info("Executing all_entries query: {}", allEntriesQuery);
Dataset<Row> partitionsPerCommitDF = spark.sql(allEntriesQuery);
// Step 3: Manage DataFrame lifecycle and collect to driver
// Cache BEFORE first action to materialize and reuse for collection
enrichedDF.cache();

// Cache for reuse
partitionsPerCommitDF.cache();
long totalRecords = partitionsPerCommitDF.count();
// Count triggers cache materialization (single join execution)
long totalRecords = enrichedDF.count();

// Early return if no data found (after cache materialization)
if (totalRecords == 0) {
log.info("No partition-level commit events found for table: {}", fullTableName);
partitionsPerCommitDF.unpersist();
log.info("No partition-level records found for table: {}", fullTableName);
enrichedDF.unpersist();
return Collections.emptyList();
}

log.info(
"Found {} partition-level commit event records for table: {}", totalRecords, fullTableName);

// Step 4: Join with snapshots to get commit metadata
String snapshotsQuery =
String.format(
"SELECT snapshot_id, committed_at, operation, summary " + "FROM %s.snapshots",
table.name());

Dataset<Row> snapshotsDF = spark.sql(snapshotsQuery);

// Join partitions with commit metadata
Dataset<Row> enrichedDF =
partitionsPerCommitDF
.join(snapshotsDF, "snapshot_id")
.select(
functions.col("snapshot_id"),
functions.col("committed_at").cast("long"), // Cast timestamp to epoch seconds
functions.col("operation"),
functions.col("summary"),
functions.col("partition")); // Keep partition struct for Java transformation

// Step 6: Collect to driver and transform in Java with type safety
// This matches populateCommitEventTable() pattern which also uses collectAsList()
// Size is manageable: typically 100K rows × 200 bytes = 20MB
log.info("Collecting {} rows to driver for transformation", totalRecords);
List<Row> rows = enrichedDF.collectAsList();
List<Row> rows = enrichedDF.collectAsList(); // Uses cached data

partitionsPerCommitDF.unpersist();
// Unpersist immediately after collection to free memory
enrichedDF.unpersist();

// Step 7: Delegate transformation to helper method
// Step 4: Delegate transformation to helper method
// Separated for testability and readability
List<CommitEventTablePartitions> result =
transformRowsToPartitionEvents(
Expand Down