Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable FileSplits to be obtained directly from InputFormat #7002

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Deque;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class BackgroundHiveSplitLoader
implements HiveSplitLoader
{
private static final String CORRUPT_BUCKETING = "Hive table is corrupt. It is declared as being bucketed, but the files do not match the bucketing declaration.";
private static final String USE_FILE_SPLITS_ANNOTATION_SIMPLE_NAME = "UseFileSplitsFromInputFormat";

public static final CompletableFuture<?> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);

Expand Down Expand Up @@ -269,6 +271,49 @@ private CompletableFuture<?> loadSplits()
return COMPLETED_FUTURE;
}

private boolean addSplitsToSource(InputSplit[] targetSplits,
String partitionName,
Properties schema,
List<HivePartitionKey> partitionKeys,
TupleDomain<HiveColumnHandle> effectivePredicate,
Map<Integer, HiveType> columnCoercions) throws IOException
{
for (InputSplit inputSplit : targetSplits) {
FileSplit split = (FileSplit) inputSplit;
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(session.getUser(), split.getPath());
FileStatus file = targetFilesystem.getFileStatus(split.getPath());
hiveSplitSource.addToQueue(createHiveSplits(
partitionName,
file.getPath().toString(),
targetFilesystem.getFileBlockLocations(file, split.getStart(), split.getLength()),
split.getStart(),
split.getLength(),
schema,
partitionKeys,
false,
session,
OptionalInt.empty(),
effectivePredicate,
columnCoercions));
if (stopped) {
return true;
}
}
return false;
}

private boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
Annotation[] annotations = inputFormat.getClass().getAnnotations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use streams for this:

return Arrays.stream(inputFormat.getClass().getAnnotations())
        .map(Annotation::annotationType)
        .map(Class::getSimpleName)
        .anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));

for (Annotation annotation : annotations) {
String annotationSimpleName = annotation.annotationType().getSimpleName();
if (USE_FILE_SPLITS_ANNOTATION_SIMPLE_NAME.equals(annotationSimpleName)) {
return true;
}
}
return false;
}

private void loadPartition(HivePartitionMetadata partition)
throws IOException
{
Expand Down Expand Up @@ -299,31 +344,23 @@ private void loadPartition(HivePartitionMetadata partition)
FileInputFormat.setInputPaths(targetJob, targetPath);
InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0);

for (InputSplit inputSplit : targetSplits) {
FileSplit split = (FileSplit) inputSplit;
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(session.getUser(), split.getPath());
FileStatus file = targetFilesystem.getFileStatus(split.getPath());
hiveSplitSource.addToQueue(createHiveSplits(
partitionName,
file.getPath().toString(),
targetFilesystem.getFileBlockLocations(file, split.getStart(), split.getLength()),
split.getStart(),
split.getLength(),
schema,
partitionKeys,
false,
session,
OptionalInt.empty(),
effectivePredicate,
partition.getColumnCoercions()));
if (stopped) {
return;
}
if (addSplitsToSource(targetSplits, partitionName, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions())) {
return;
}
}
return;
}

// To support custom input formats, we want to call the inputFormat.getSplits to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
JobConf jobConf = new JobConf(configuration);
FileInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);

addSplitsToSource(splits, partitionName, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions());
return;
}

// If only one bucket could match: load that one file
HiveFileIterator iterator = new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions());
if (!buckets.isEmpty()) {
Expand Down