Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build AcidInfo only for ORC-ACID tables #8259

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -483,6 +483,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
List<Path> readPaths;
List<HdfsFileStatusWithId> fileStatusOriginalFiles = ImmutableList.of();
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
boolean isFullAcid = AcidUtils.isFullAcidTable(table.getParameters());
if (AcidUtils.isTransactionalTable(table.getParameters())) {
AcidUtils.Directory directory = hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> AcidUtils.getAcidState(
path,
Expand All @@ -491,7 +492,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
false,
true));

if (AcidUtils.isFullAcidTable(table.getParameters())) {
if (isFullAcid) {
// From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version' with value >= '2'.
Path baseOrDeltaPath = directory.getBaseDirectory() != null
? directory.getBaseDirectory()
Expand Down Expand Up @@ -566,24 +567,35 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
e.getNestedDirectoryPath(),
splitFactory.getPartitionName()));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfoBuilder.build()));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = acidInfoBuilder.build();
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

for (HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
List<LocatedFileStatus> locatedFileStatuses = ImmutableList.of((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus());
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(locatedFileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

return lastResult;
}

for (Path readPath : readPaths) {
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfoBuilder.build()));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = acidInfoBuilder.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the method is long anyway, let's use ?: for brevity and to have variables final.

}
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
}

if (!fileStatusOriginalFiles.isEmpty()) {
fileIterators.addLast(generateOriginalFilesSplits(splitFactory, fileStatusOriginalFiles, splittable, acidInfoBuilder));
fileIterators.addLast(generateOriginalFilesSplits(splitFactory, fileStatusOriginalFiles, splittable, acidInfoBuilder, isFullAcid));
}

return COMPLETED_FUTURE;
Expand Down Expand Up @@ -703,12 +715,16 @@ private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
InternalHiveSplitFactory splitFactory,
List<HdfsFileStatusWithId> originalFileLocations,
boolean splittable,
AcidInfo.Builder acidInfoBuilder)
AcidInfo.Builder acidInfoBuilder,
boolean isFullAcid)
Comment on lines +718 to +719
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should acidInfoBuilder be Optional?

{
return originalFileLocations.stream()
.map(HdfsFileStatusWithId::getFileStatus)
.map(fileStatus -> {
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())));
}
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
OptionalInt.empty(),
Expand Down
Expand Up @@ -191,7 +191,38 @@ private void doTestReadFullAcid(boolean isPartitioned, BucketingType bucketingTy

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingType)
public void testReadInsertOnlyOrc(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS ORC");
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeSmokeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnlyParquet(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS PARQUET");
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeSmokeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnlyText(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS TEXTFILE");
}

@Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT)
public void testReadInsertOnlyTextWithCustomFormatProperties()
{
testReadInsertOnly(
false,
NONE,
" ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
" WITH SERDEPROPERTIES ('field.delim'=',', 'line.delim'='\\n', 'serialization.format'=',') " +
" STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
" OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
}

private void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingType, String hiveTableFormatDefinition)
{
if (getHiveVersionMajor() < 3) {
throw new SkipException("Hive transactional tables are supported with Hive version 3 or above");
Expand All @@ -203,7 +234,7 @@ public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingTyp
onHive().executeQuery("CREATE TABLE " + tableName + " (col INT) " +
(isPartitioned ? "PARTITIONED BY (part_col INT) " : "") +
bucketingType.getHiveClustering("col", 4) + " " +
"STORED AS ORC " +
hiveTableFormatDefinition + " " +
hiveTableProperties(INSERT_ONLY, bucketingType));

String hivePartitionString = isPartitioned ? " PARTITION (part_col=2) " : "";
Expand Down Expand Up @@ -458,6 +489,15 @@ public Object[][] partitioningAndBucketingTypeDataProvider()
};
}

@DataProvider
public Object[][] partitioningAndBucketingTypeSmokeDataProvider()
{
return new Object[][] {
{false, BucketingType.NONE},
{true, BucketingType.BUCKETED_DEFAULT},
};
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "testCreateAcidTableDataProvider")
public void testCtasAcidTable(boolean isPartitioned, BucketingType bucketingType)
{
Expand Down