-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract Delta Lake deletion vectors #661
base: main
Are you sure you want to change the base?
Extract Delta Lake deletion vectors #661
Conversation
InternalDeletionVector deletionVector = | ||
actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action); | ||
if (deletionVector != null) { | ||
deletionVectors.put(deletionVector.dataFilePath(), deletionVector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletionVector.dataFilePath() points to the path of the associated Parquet Data File.
We should use deletionVector.getPhysicalPath() instead. Thoughts? @ashvina
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @piyushdubey
The intention is to use path of the data file with which this deletion vector is associated (see comment on line 118). This is used to update the maps of files added and removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I see on line 168, you are concatenating the deletion vectors to the Internal files, which will not add dv and data file both without skipping.
Thanks for the clarification.
Depends on #662 |
9a22abe
to
df587f3
Compare
This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation. Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit. Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion.
df587f3
to
1fd115c
Compare
* binary representation of the deletion vector. The consumer can use the {@link | ||
* #ordinalsIterator()} to extract the ordinals represented in the binary format. | ||
*/ | ||
byte[] binaryRepresentation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently when this field is set to a non-null value the ordinalsIterator
is also set. I think it may be cleaner to remove this and rely directly on the ordinalsIterator
. Is there something in the future though where this may be used directly?
My main worry is that future developers implementing support for deletion vectors may eagerly parse the data into this field.
Configuration conf = new Configuration(); | ||
DeltaLog deltaLog = Mockito.mock(DeltaLog.class); | ||
when(snapshot.deltaLog()).thenReturn(deltaLog); | ||
when(deltaLog.dataPath()).thenReturn(new Path(basePath)); | ||
when(deltaLog.newDeltaHadoopConf()).thenReturn(conf); | ||
|
||
long[] ordinals = {45, 78, 98}; | ||
Mockito.doReturn(ordinals) | ||
.when(actionsConverter) | ||
.parseOrdinalFile(conf, new Path(deleteFilePath), size, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you pull the common testing setup into a helper method?
Similarly, the assertions below can be added to a common method so there are less places update if the assertions need to update due to new field or something like that.
@@ -151,7 +153,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { | |||
// entry which is replaced by a new entry, AddFile with delete vector information. Since the | |||
// same data file is removed and added, we need to remove it from the added and removed file | |||
// maps which are used to track actual added and removed data files. | |||
for (String deletionVector : deletionVectors) { | |||
for (String deletionVector : deletionVectors.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: the name deletionVector
is no longer representative of the actual string. Something like dataFileForDeletionVector
would be more clear
} | ||
|
||
private void validateDeletionInfoForCommit( | ||
TableState tableState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused in the method, is that intentional?
Iterator<Long> iterator = deleteInfo.ordinalsIterator(); | ||
List<Long> deletes = new ArrayList<>(); | ||
iterator.forEachRemaining(deletes::add); | ||
assertEquals(deletes.size(), deleteInfo.getRecordCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also validate the ordinals are correct here?
SourceTable tableConfig = | ||
SourceTable.builder() | ||
.name(testSparkDeltaTable.getTableName()) | ||
.basePath(testSparkDeltaTable.getBasePath()) | ||
.basePath(tableBasePath) | ||
.formatName(TableFormat.DELTA) | ||
.build(); | ||
DeltaConversionSource conversionSource = | ||
conversionSourceProvider.getConversionSourceInstance(tableConfig); | ||
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); | ||
|
||
// validateDeltaPartitioning(internalSnapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this comment?
@@ -91,11 +99,24 @@ void setUp() { | |||
conversionSourceProvider.init(hadoopConf); | |||
} | |||
|
|||
private static class TableState { | |||
Map<String, AddFile> activeFiles; | |||
List<Row> rowsToDelete; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list looks like it is unused, is that intentional?
Fixes: #343 and #345
This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation.
Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit.
Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion.
relates to #627