Skip to content

Commit

Permalink
Extract Hive split construction to separate factory
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Dec 23, 2017
1 parent 26f3b45 commit 817fa5d
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 291 deletions.
Expand Up @@ -14,25 +14,22 @@
package com.facebook.presto.hive;

import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.InternalHiveSplit.InternalHiveBlock;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.hive.util.InternalHiveSplitFactory;
import com.facebook.presto.hive.util.ResumableTask;
import com.facebook.presto.hive.util.ResumableTasks;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand All @@ -48,39 +45,32 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.lang.annotation.Annotation;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.facebook.presto.hadoop.HadoopFileStatus.isDirectory;
import static com.facebook.presto.hive.HiveBucketing.HiveBucket;
import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hive.HiveSessionProperties.isForceLocalScheduling;
import static com.facebook.presto.hive.HiveUtil.checkCondition;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.HiveUtil.isSplittable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.String.format;
import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;

Expand Down Expand Up @@ -233,37 +223,11 @@ private ListenableFuture<?> loadSplits()
LocatedFileStatus file = files.next();
if (isDirectory(file)) {
if (recursiveDirWalkerEnabled) {
HiveFileIterator fileIterator = new HiveFileIterator(
file.getPath(),
files.getFileSystem(),
files.getDirectoryLister(),
files.getNamenodeStats(),
files.getPartitionName(),
files.getInputFormat(),
files.getSchema(),
files.getPartitionKeys(),
files.getEffectivePredicate(),
files.getColumnCoercions());
fileIterators.add(fileIterator);
fileIterators.add(files.withPath(file.getPath()));
}
}
else {
boolean splittable = isSplittable(files.getInputFormat(), hdfsEnvironment.getFileSystem(hdfsContext, file.getPath()), file.getPath());

Optional<InternalHiveSplit> internalHiveSplit = createInternalHiveSplit(
files.getPartitionName(),
file.getPath().toString(),
file.getBlockLocations(),
0,
file.getLen(),
file.getLen(),
files.getSchema(),
files.getPartitionKeys(),
splittable,
session,
OptionalInt.empty(),
files.getColumnCoercions(),
getPathDomain(files.getEffectivePredicate()));
Optional<InternalHiveSplit> internalHiveSplit = files.getSplitFactory().createInternalHiveSplit(file);
if (!internalHiveSplit.isPresent()) {
continue;
}
Expand All @@ -286,7 +250,6 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
Properties schema = getPartitionSchema(table, partition.getPartition());
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition());
TupleDomain<HiveColumnHandle> effectivePredicate = (TupleDomain<HiveColumnHandle>) compactEffectivePredicate;
Optional<Domain> pathDomain = getPathDomain(effectivePredicate);

Path path = new Path(getPartitionLocation(table, partition.getPartition()));
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
Expand All @@ -303,34 +266,38 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
for (Path targetPath : getTargetPathsFromSymlink(fs, path)) {
// The input should be in TextInputFormat.
TextInputFormat targetInputFormat = new TextInputFormat();
// the splits must be generated using the file system for the target path
// get the configuration for the target path -- it may be a different hdfs instance
Configuration targetConfiguration = hdfsEnvironment.getConfiguration(hdfsContext, targetPath);
JobConf targetJob = toJobConf(targetConfiguration);
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, targetPath);
JobConf targetJob = toJobConf(targetFilesystem.getConf());
targetJob.setInputFormat(TextInputFormat.class);
targetInputFormat.configure(targetJob);
FileInputFormat.setInputPaths(targetJob, targetPath);
InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0);

lastResult = addSplitsToSource(targetSplits, partitionName, partitionKeys, schema, effectivePredicate, partition.getColumnCoercions(), pathDomain);
InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(targetFilesystem, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions(), isForceLocalScheduling(session));
lastResult = addSplitsToSource(targetSplits, splitFactory);
if (stopped) {
return COMPLETED_FUTURE;
}
}
return lastResult;
}

InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(fs, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions(), isForceLocalScheduling(session));

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
JobConf jobConf = toJobConf(configuration);
FileInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);

return addSplitsToSource(splits, partitionName, partitionKeys, schema, effectivePredicate, partition.getColumnCoercions(), pathDomain);
return addSplitsToSource(splits, splitFactory);
}

// If only one bucket could match: load that one file
HiveFileIterator iterator = new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, inputFormat, schema, partitionKeys, effectivePredicate, partition.getColumnCoercions());
HiveFileIterator iterator = new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, splitFactory);
if (!buckets.isEmpty()) {
int bucketCount = buckets.get(0).getBucketCount();
List<LocatedFileStatus> fileList = listAndSortBucketFiles(iterator, bucketCount);
Expand All @@ -339,23 +306,8 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
for (HiveBucket bucket : buckets) {
int bucketNumber = bucket.getBucketNumber();
LocatedFileStatus file = fileList.get(bucketNumber);
boolean splittable = isSplittable(iterator.getInputFormat(), hdfsEnvironment.getFileSystem(hdfsContext, file.getPath()), file.getPath());

Optional<InternalHiveSplit> internalHiveSplit = createInternalHiveSplit(
iterator.getPartitionName(),
file.getPath().toString(),
file.getBlockLocations(),
0,
file.getLen(),
file.getLen(),
iterator.getSchema(),
iterator.getPartitionKeys(),
splittable,
session,
OptionalInt.of(bucketNumber),
partition.getColumnCoercions(),
pathDomain);
internalHiveSplit.ifPresent(splitList::add);
splitFactory.createInternalHiveSplit(file, bucketNumber)
.ifPresent(splitList::add);
}

return hiveSplitSource.addToQueue(splitList);
Expand All @@ -370,23 +322,8 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) {
LocatedFileStatus file = list.get(bucketIndex);
boolean splittable = isSplittable(iterator.getInputFormat(), hdfsEnvironment.getFileSystem(hdfsContext, file.getPath()), file.getPath());

Optional<InternalHiveSplit> internalHiveSplit = createInternalHiveSplit(
iterator.getPartitionName(),
file.getPath().toString(),
file.getBlockLocations(),
0,
file.getLen(),
file.getLen(),
iterator.getSchema(),
iterator.getPartitionKeys(),
splittable,
session,
OptionalInt.of(bucketIndex),
partition.getColumnCoercions(),
pathDomain);
internalHiveSplit.ifPresent(splitList::add);
splitFactory.createInternalHiveSplit(file, bucketIndex)
.ifPresent(splitList::add);
}

return hiveSplitSource.addToQueue(splitList);
Expand All @@ -396,35 +333,12 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
return COMPLETED_FUTURE;
}

private ListenableFuture<?> addSplitsToSource(
InputSplit[] targetSplits,
String partitionName,
List<HivePartitionKey> partitionKeys,
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
Map<Integer, HiveTypeName> columnCoercions,
Optional<Domain> pathDomain)
private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
ListenableFuture<?> lastResult = COMPLETED_FUTURE;
for (InputSplit inputSplit : targetSplits) {
FileSplit split = (FileSplit) inputSplit;
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, split.getPath());
FileStatus file = targetFilesystem.getFileStatus(split.getPath());
Optional<InternalHiveSplit> internalHiveSplit = createInternalHiveSplit(
partitionName,
file.getPath().toString(),
targetFilesystem.getFileBlockLocations(file, split.getStart(), split.getLength()),
split.getStart(),
split.getLength(),
file.getLen(),
schema,
partitionKeys,
false,
session,
OptionalInt.empty(),
columnCoercions,
pathDomain);
Optional<InternalHiveSplit> internalHiveSplit = splitFactory.createInternalHiveSplit((FileSplit) inputSplit);
if (internalHiveSplit.isPresent()) {
lastResult = hiveSplitSource.addToQueue(internalHiveSplit.get());
}
Expand Down Expand Up @@ -485,113 +399,6 @@ private static List<Path> getTargetPathsFromSymlink(FileSystem fileSystem, Path
}
}

private Optional<InternalHiveSplit> createInternalHiveSplit(
String partitionName,
String path,
BlockLocation[] blockLocations,
long start,
long length,
long fileSize,
Properties schema,
List<HivePartitionKey> partitionKeys,
boolean splittable,
ConnectorSession session,
OptionalInt bucketNumber,
Map<Integer, HiveTypeName> columnCoercions,
Optional<Domain> pathDomain)
throws IOException
{
if (!pathMatchesPredicate(pathDomain, path)) {
return Optional.empty();
}

boolean forceLocalScheduling = HiveSessionProperties.isForceLocalScheduling(session);

// For empty files, some filesystem (e.g. LocalFileSystem) produce one empty block
// while others (e.g. hdfs.DistributedFileSystem) produces no block.
// Synthesize an empty block if one does not already exist.
if (fileSize == 0 && blockLocations.length == 0) {
blockLocations = new BlockLocation[] {new BlockLocation()};
// Turn off force local scheduling because hosts list doesn't exist.
forceLocalScheduling = false;
}

ImmutableList.Builder<InternalHiveBlock> blockBuilder = ImmutableList.builder();
for (BlockLocation blockLocation : blockLocations) {
// clamp the block range
long blockStart = Math.max(start, blockLocation.getOffset());
long blockEnd = Math.min(start + length, blockLocation.getOffset() + blockLocation.getLength());
if (blockStart > blockEnd) {
// block is outside split range
continue;
}
if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) {
// skip zero-width block, except in the special circumstance: slice is empty, and the block covers the empty slice interval.
continue;
}
blockBuilder.add(new InternalHiveBlock(blockStart, blockEnd, getHostAddresses(blockLocation)));
}
List<InternalHiveBlock> blocks = blockBuilder.build();
checkBlocks(blocks, start, length);

if (!splittable) {
// not splittable, use the hosts from the first block if it exists
blocks = ImmutableList.of(new InternalHiveBlock(start, start + length, blocks.get(0).getAddresses()));
}

return Optional.of(new InternalHiveSplit(
partitionName,
path,
start,
start + length,
fileSize,
schema,
partitionKeys,
blocks,
bucketNumber,
splittable,
forceLocalScheduling && allBlocksHaveRealAddress(blocks),
columnCoercions));
}

private static void checkBlocks(List<InternalHiveBlock> blocks, long start, long length)
{
checkArgument(length >= 0);
checkArgument(!blocks.isEmpty());
checkArgument(start == blocks.get(0).getStart());
checkArgument(start + length == blocks.get(blocks.size() - 1).getEnd());
for (int i = 1; i < blocks.size(); i++) {
checkArgument(blocks.get(i - 1).getEnd() == blocks.get(i).getStart());
}
}

private static boolean allBlocksHaveRealAddress(List<InternalHiveBlock> blocks)
{
return blocks.stream()
.map(InternalHiveBlock::getAddresses)
.allMatch(BackgroundHiveSplitLoader::hasRealAddress);
}

private static boolean hasRealAddress(List<HostAddress> addresses)
{
// Hadoop FileSystem returns "localhost" as a default
return addresses.stream().anyMatch(address -> !address.getHostText().equals("localhost"));
}

private static List<HostAddress> getHostAddresses(BlockLocation blockLocation)
{
String[] hosts;
try {
hosts = blockLocation.getHosts();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return Arrays.stream(hosts)
.map(HostAddress::fromString)
.collect(toImmutableList());
}

private static List<HivePartitionKey> getPartitionKeys(Table table, Optional<Partition> partition)
{
if (!partition.isPresent()) {
Expand Down Expand Up @@ -629,25 +436,4 @@ private static String getPartitionLocation(Table table, Optional<Partition> part
}
return partition.get().getStorage().getLocation();
}

private static Optional<Domain> getPathDomain(TupleDomain<HiveColumnHandle> effectivePredicate)
{
if (!effectivePredicate.getDomains().isPresent()) {
return Optional.empty();
}

return effectivePredicate.getDomains().get().entrySet().stream()
.filter(entry -> isPathColumnHandle(entry.getKey()))
.findFirst()
.map(Map.Entry::getValue);
}

private static boolean pathMatchesPredicate(Optional<Domain> pathDomain, String path)
{
if (!pathDomain.isPresent()) {
return true;
}

return pathDomain.get().includesNullableValue(utf8Slice(path));
}
}
Expand Up @@ -263,7 +263,7 @@ public static long parseHiveTimestamp(String value, DateTimeZone timeZone)
return HIVE_TIMESTAMP_PARSER.withZone(timeZone).parseMillis(value);
}

static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, Path path)
public static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, Path path)
{
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
Expand Down

0 comments on commit 817fa5d

Please sign in to comment.