Skip to content

Commit

Permalink
Move Hive directory recursion into HiveFileIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Dec 23, 2017
1 parent a7969eb commit 77afd57
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 78 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.metastore.Partition; import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.HiveFileIterator; import com.facebook.presto.hive.util.HiveFileIterator;
import com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryNotAllowedException;
import com.facebook.presto.hive.util.InternalHiveSplitFactory; import com.facebook.presto.hive.util.InternalHiveSplitFactory;
import com.facebook.presto.hive.util.ResumableTask; import com.facebook.presto.hive.util.ResumableTask;
import com.facebook.presto.hive.util.ResumableTasks; import com.facebook.presto.hive.util.ResumableTasks;
Expand All @@ -26,6 +27,8 @@
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams; import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand All @@ -49,6 +52,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
Expand All @@ -58,7 +62,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;


import static com.facebook.presto.hadoop.HadoopFileStatus.isDirectory;
import static com.facebook.presto.hive.HiveBucketing.HiveBucket; import static com.facebook.presto.hive.HiveBucketing.HiveBucket;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
Expand All @@ -69,6 +72,9 @@
import static com.facebook.presto.hive.HiveUtil.getInputFormat; import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema; import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -96,7 +102,7 @@ public class BackgroundHiveSplitLoader
private final Executor executor; private final Executor executor;
private final ConnectorSession session; private final ConnectorSession session;
private final ConcurrentLazyQueue<HivePartitionMetadata> partitions; private final ConcurrentLazyQueue<HivePartitionMetadata> partitions;
private final Deque<HiveFileIterator> fileIterators = new ConcurrentLinkedDeque<>(); private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();


// Purpose of this lock: // Purpose of this lock:
// * When write lock is acquired, except the holder, no one can do any of the following: // * When write lock is acquired, except the holder, no one can do any of the following:
Expand Down Expand Up @@ -210,32 +216,20 @@ private void invokeNoMoreSplitsIfNecessary()
private ListenableFuture<?> loadSplits() private ListenableFuture<?> loadSplits()
throws IOException throws IOException
{ {
HiveFileIterator files = fileIterators.poll(); Iterator<InternalHiveSplit> splits = fileIterators.poll();
if (files == null) { if (splits == null) {
HivePartitionMetadata partition = partitions.poll(); HivePartitionMetadata partition = partitions.poll();
if (partition == null) { if (partition == null) {
return COMPLETED_FUTURE; return COMPLETED_FUTURE;
} }
return loadPartition(partition); return loadPartition(partition);
} }


while (files.hasNext() && !stopped) { while (splits.hasNext() && !stopped) {
LocatedFileStatus file = files.next(); ListenableFuture<?> future = hiveSplitSource.addToQueue(splits.next());
if (isDirectory(file)) { if (!future.isDone()) {
if (recursiveDirWalkerEnabled) { fileIterators.addFirst(splits);
fileIterators.add(files.withPath(file.getPath())); return future;
}
}
else {
Optional<InternalHiveSplit> internalHiveSplit = files.getSplitFactory().createInternalHiveSplit(file);
if (!internalHiveSplit.isPresent()) {
continue;
}
ListenableFuture<?> future = hiveSplitSource.addToQueue(internalHiveSplit.get());
if (!future.isDone()) {
fileIterators.addFirst(files);
return future;
}
} }
} }


Expand Down Expand Up @@ -301,7 +295,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, bucketSplitInfo.get())); return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, bucketSplitInfo.get()));
} }


fileIterators.addLast(new HiveFileIterator(path, fs, directoryLister, namenodeStats, partitionName, splitFactory)); fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory));
return COMPLETED_FUTURE; return COMPLETED_FUTURE;
} }


Expand Down Expand Up @@ -329,37 +323,44 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat")); .anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
} }


private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory)
{
return Streams.stream(new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
.map(splitFactory::createInternalHiveSplit)
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
}

private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, BucketSplitInfo bucketSplitInfo) private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, BucketSplitInfo bucketSplitInfo)
{ {
int bucketCount = bucketSplitInfo.getBucketCount(); int bucketCount = bucketSplitInfo.getBucketCount();


// list all files in the partition // list all files in the partition
ArrayList<LocatedFileStatus> list = new ArrayList<>(bucketCount); ArrayList<LocatedFileStatus> files = new ArrayList<>(bucketCount);
HiveFileIterator hiveFileIterator = new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, splitFactory.getPartitionName(), splitFactory); try {
while (hiveFileIterator.hasNext()) { Iterators.addAll(files, new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, FAIL));
LocatedFileStatus next = hiveFileIterator.next(); }
if (isDirectory(next)) { catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does // Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("%s Found sub-directory in bucket directory for partition: %s", CORRUPT_BUCKETING, splitFactory.getPartitionName())); throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("%s Found sub-directory in bucket directory for partition: %s", CORRUPT_BUCKETING, splitFactory.getPartitionName()));
}
list.add(next);
} }


// verify we found one file per bucket // verify we found one file per bucket
if (list.size() != bucketCount) { if (files.size() != bucketCount) {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("%s The number of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", CORRUPT_BUCKETING, list.size(), throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format("%s The number of files in the directory (%s) does not match the declared bucket count (%s) for partition: %s", CORRUPT_BUCKETING, files.size(),
bucketCount, bucketCount,
splitFactory.getPartitionName())); splitFactory.getPartitionName()));
} }


// Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths // Sort FileStatus objects (instead of, e.g., fileStatus.getPath().toString). This matches org.apache.hadoop.hive.ql.metadata.Table.getSortedPaths
list.sort(null); files.sort(null);


// convert files internal splits // convert files internal splits
List<InternalHiveSplit> splitList = new ArrayList<>(); List<InternalHiveSplit> splitList = new ArrayList<>();
for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) { for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) {
if (bucketSplitInfo.isBucketEnabled(bucketNumber)) { if (bucketSplitInfo.isBucketEnabled(bucketNumber)) {
LocatedFileStatus file = list.get(bucketNumber); LocatedFileStatus file = files.get(bucketNumber);
splitFactory.createInternalHiveSplit(file, bucketNumber) splitFactory.createInternalHiveSplit(file, bucketNumber)
.ifPresent(splitList::add); .ifPresent(splitList::add);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,103 +25,154 @@


import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;


import static com.facebook.presto.hadoop.HadoopFileStatus.isDirectory;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public class HiveFileIterator public class HiveFileIterator
extends AbstractIterator<LocatedFileStatus> extends AbstractIterator<LocatedFileStatus>
{ {
public enum NestedDirectoryPolicy
{
IGNORED,
RECURSE,
FAIL
}

private final Deque<Path> paths = new ArrayDeque<>();
private final FileSystem fileSystem; private final FileSystem fileSystem;
private final DirectoryLister directoryLister; private final DirectoryLister directoryLister;
private final NamenodeStats namenodeStats; private final NamenodeStats namenodeStats;
private final Path path; private final NestedDirectoryPolicy nestedDirectoryPolicy;
private final String partitionName;
private final InternalHiveSplitFactory splitFactory;


private RemoteIterator<LocatedFileStatus> remoteIterator; private Iterator<LocatedFileStatus> remoteIterator = Collections.emptyIterator();


public HiveFileIterator( public HiveFileIterator(
Path path, Path path,
FileSystem fileSystem, FileSystem fileSystem,
DirectoryLister directoryLister, DirectoryLister directoryLister,
NamenodeStats namenodeStats, NamenodeStats namenodeStats,
String partitionName, NestedDirectoryPolicy nestedDirectoryPolicy)
InternalHiveSplitFactory splitFactory)
{ {
this.partitionName = requireNonNull(partitionName, "partitionName is null"); paths.addLast(requireNonNull(path, "path is null"));
this.path = requireNonNull(path, "path is null");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.splitFactory = requireNonNull(splitFactory, "splitFactory is null"); this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null");
} }


@Override @Override
protected LocatedFileStatus computeNext() protected LocatedFileStatus computeNext()
{ {
try { while (true) {
if (remoteIterator == null) {
remoteIterator = getLocatedFileStatusRemoteIterator(path);
}

while (remoteIterator.hasNext()) { while (remoteIterator.hasNext()) {
LocatedFileStatus status = getLocatedFileStatus(remoteIterator); LocatedFileStatus status = getLocatedFileStatus(remoteIterator);


// ignore hidden files. Hive ignores files starting with _ and . as well. // Ignore hidden files and directories. Hive ignores files starting with _ and . as well.
String fileName = status.getPath().getName(); String fileName = status.getPath().getName();
if (fileName.startsWith("_") || fileName.startsWith(".")) { if (fileName.startsWith("_") || fileName.startsWith(".")) {
continue; continue;
} }

if (isDirectory(status)) {
switch (nestedDirectoryPolicy) {
case IGNORED:
continue;
case RECURSE:
paths.add(status.getPath());
continue;
case FAIL:
throw new NestedDirectoryNotAllowedException();
}
}

return status; return status;
} }
return endOfData();
} if (paths.isEmpty()) {
catch (FileNotFoundException e) { return endOfData();
throw new PrestoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path); }
} remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, e);
} }
} }


private RemoteIterator<LocatedFileStatus> getLocatedFileStatusRemoteIterator(Path path) private Iterator<LocatedFileStatus> getLocatedFileStatusRemoteIterator(Path path)
throws IOException
{ {
try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) { try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) {
return directoryLister.list(fileSystem, path); return new FileStatusIterator(path, fileSystem, directoryLister, namenodeStats);
}
catch (IOException | RuntimeException e) {
namenodeStats.getListLocatedStatus().recordException(e);
throw e;
} }
} }


private LocatedFileStatus getLocatedFileStatus(RemoteIterator<LocatedFileStatus> iterator) private LocatedFileStatus getLocatedFileStatus(Iterator<LocatedFileStatus> iterator)
throws IOException
{ {
try (TimeStat.BlockTimer ignored = namenodeStats.getRemoteIteratorNext().time()) { try (TimeStat.BlockTimer ignored = namenodeStats.getRemoteIteratorNext().time()) {
return iterator.next(); return iterator.next();
} }
catch (IOException | RuntimeException e) {
namenodeStats.getRemoteIteratorNext().recordException(e);
throw e;
}
} }


public String getPartitionName() private static class FileStatusIterator
implements Iterator<LocatedFileStatus>
{ {
return partitionName; private final Path path;
} private final NamenodeStats namenodeStats;
private final RemoteIterator<LocatedFileStatus> fileStatusIterator;


public InternalHiveSplitFactory getSplitFactory() private FileStatusIterator(Path path, FileSystem fileSystem, DirectoryLister directoryLister, NamenodeStats namenodeStats)
{ {
return splitFactory; this.path = path;
this.namenodeStats = namenodeStats;
try {
this.fileStatusIterator = directoryLister.list(fileSystem, path);
}
catch (IOException e) {
throw processException(e);
}
}

@Override
public boolean hasNext()
{
try {
return fileStatusIterator.hasNext();
}
catch (IOException e) {
throw processException(e);
}
}

@Override
public LocatedFileStatus next()
{
try {
return fileStatusIterator.next();
}
catch (IOException e) {
throw processException(e);
}
}

private PrestoException processException(IOException exception)
{
namenodeStats.getRemoteIteratorNext().recordException(exception);
if (exception instanceof FileNotFoundException) {
throw new PrestoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path);
}
return new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, exception);
}
} }


public HiveFileIterator withPath(Path path) public static class NestedDirectoryNotAllowedException
extends RuntimeException
{ {
return new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, partitionName, splitFactory); public NestedDirectoryNotAllowedException()
{
super("Nested sub-directories are not allowed");
}
} }
} }

0 comments on commit 77afd57

Please sign in to comment.