Skip to content

Commit

Permalink
Build AcidInfo only for ORC-ACID tables
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk authored and findepi committed Jun 11, 2021
1 parent 25295d0 commit 79b34c5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
Expand Up @@ -483,6 +483,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
List<Path> readPaths;
List<HdfsFileStatusWithId> fileStatusOriginalFiles = ImmutableList.of();
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
boolean isFullAcid = AcidUtils.isFullAcidTable(table.getParameters());
if (AcidUtils.isTransactionalTable(table.getParameters())) {
AcidUtils.Directory directory = hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> AcidUtils.getAcidState(
path,
Expand All @@ -491,7 +492,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
false,
true));

if (AcidUtils.isFullAcidTable(table.getParameters())) {
if (isFullAcid) {
// From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version' with value >= '2'.
Path baseOrDeltaPath = directory.getBaseDirectory() != null
? directory.getBaseDirectory()
Expand Down Expand Up @@ -566,24 +567,35 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
e.getNestedDirectoryPath(),
splitFactory.getPartitionName()));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfoBuilder.build()));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = acidInfoBuilder.build();
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

for (HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
List<LocatedFileStatus> locatedFileStatuses = ImmutableList.of((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus());
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath())));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(locatedFileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
}

return lastResult;
}

for (Path readPath : readPaths) {
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfoBuilder.build()));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = acidInfoBuilder.build();
}
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
}

if (!fileStatusOriginalFiles.isEmpty()) {
fileIterators.addLast(generateOriginalFilesSplits(splitFactory, fileStatusOriginalFiles, splittable, acidInfoBuilder));
fileIterators.addLast(generateOriginalFilesSplits(splitFactory, fileStatusOriginalFiles, splittable, acidInfoBuilder, isFullAcid));
}

return COMPLETED_FUTURE;
Expand Down Expand Up @@ -703,12 +715,16 @@ private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
InternalHiveSplitFactory splitFactory,
List<HdfsFileStatusWithId> originalFileLocations,
boolean splittable,
AcidInfo.Builder acidInfoBuilder)
AcidInfo.Builder acidInfoBuilder,
boolean isFullAcid)
{
return originalFileLocations.stream()
.map(HdfsFileStatusWithId::getFileStatus)
.map(fileStatus -> {
Optional<AcidInfo> acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())));
Optional<AcidInfo> acidInfo = Optional.empty();
if (isFullAcid) {
acidInfo = Optional.of(acidInfoBuilder.buildWithRequiredOriginalFiles(getRequiredBucketNumber(fileStatus.getPath())));
}
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
OptionalInt.empty(),
Expand Down
Expand Up @@ -191,7 +191,38 @@ private void doTestReadFullAcid(boolean isPartitioned, BucketingType bucketingTy

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingType)
public void testReadInsertOnlyOrc(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS ORC");
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeSmokeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnlyParquet(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS PARQUET");
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "partitioningAndBucketingTypeSmokeDataProvider", timeOut = TEST_TIMEOUT)
@Flaky(issue = "https://github.com/trinodb/trino/issues/4927", match = "Hive table .* is corrupt. Found sub-directory in bucket directory for partition")
public void testReadInsertOnlyText(boolean isPartitioned, BucketingType bucketingType)
{
testReadInsertOnly(isPartitioned, bucketingType, "STORED AS TEXTFILE");
}

@Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT)
public void testReadInsertOnlyTextWithCustomFormatProperties()
{
testReadInsertOnly(
false,
NONE,
" ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
" WITH SERDEPROPERTIES ('field.delim'=',', 'line.delim'='\\n', 'serialization.format'=',') " +
" STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
" OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
}

private void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingType, String hiveTableFormatDefinition)
{
if (getHiveVersionMajor() < 3) {
throw new SkipException("Hive transactional tables are supported with Hive version 3 or above");
Expand All @@ -203,7 +234,7 @@ public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingTyp
onHive().executeQuery("CREATE TABLE " + tableName + " (col INT) " +
(isPartitioned ? "PARTITIONED BY (part_col INT) " : "") +
bucketingType.getHiveClustering("col", 4) + " " +
"STORED AS ORC " +
hiveTableFormatDefinition + " " +
hiveTableProperties(INSERT_ONLY, bucketingType));

String hivePartitionString = isPartitioned ? " PARTITION (part_col=2) " : "";
Expand Down Expand Up @@ -458,6 +489,15 @@ public Object[][] partitioningAndBucketingTypeDataProvider()
};
}

@DataProvider
public Object[][] partitioningAndBucketingTypeSmokeDataProvider()
{
return new Object[][] {
{false, BucketingType.NONE},
{true, BucketingType.BUCKETED_DEFAULT},
};
}

@Test(groups = HIVE_TRANSACTIONAL, dataProvider = "testCreateAcidTableDataProvider")
public void testCtasAcidTable(boolean isPartitioned, BucketingType bucketingType)
{
Expand Down

0 comments on commit 79b34c5

Please sign in to comment.