Skip to content

Commit

Permalink
Fix query hang when partitions are offline for retention
Browse files Browse the repository at this point in the history
Since partitions are lazily evaluated, a call to partitions.isEmpty() can throw, the exception will be handled externally but the hiveSplitSource won't be marked as failed causing the query to hang
  • Loading branch information
ggilfb committed Feb 5, 2018
1 parent 2cfe423 commit 1c6c5d1
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 13 deletions.
Expand Up @@ -104,9 +104,13 @@ public class BackgroundHiveSplitLoader
private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();

// Purpose of this lock:
// * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource.
// * Read lock: when you need to modify any of the above.
// Make sure the lock is held throughout the period during which they may not be consistent with each other.
// Details:
// * When write lock is acquired, except the holder, no one can do any of the following:
// ** poll from partitions
// ** poll from or push to fileIterators
// ** poll from (or check empty) partitions
// ** poll from (or check empty) or push to fileIterators
// ** push to hiveSplitSource
// * When any of the above three operations is carried out, either a read lock or a write lock must be held.
// * When a series of operations involving two or more of the above three operations are carried out, the lock
Expand Down Expand Up @@ -196,20 +200,38 @@ public TaskStatus process()

private void invokeNoMoreSplitsIfNecessary()
{
if (partitions.isEmpty() && fileIterators.isEmpty()) {
taskExecutionLock.writeLock().lock();
try {
// the write lock guarantees that no one is operating on the partitions, fileIterators, or hiveSplitSource, or half way through doing so.
if (partitions.isEmpty() && fileIterators.isEmpty()) {
// It is legal to call `noMoreSplits` multiple times or after `stop` was called.
// Nothing bad will happen if `noMoreSplits` implementation calls methods that will try to obtain a read lock because the lock is re-entrant.
hiveSplitSource.noMoreSplits();
}
taskExecutionLock.readLock().lock();
try {
// This is an opportunistic check to avoid getting the write lock unnecessarily
if (!partitions.isEmpty() || !fileIterators.isEmpty()) {
return;
}
finally {
taskExecutionLock.writeLock().unlock();
}
catch (Exception e) {
hiveSplitSource.fail(e);
checkState(stopped, "Task is not marked as stopped even though it failed");
return;
}
finally {
taskExecutionLock.readLock().unlock();
}

taskExecutionLock.writeLock().lock();
try {
// the write lock guarantees that no one is operating on the partitions, fileIterators, or hiveSplitSource, or half way through doing so.
if (partitions.isEmpty() && fileIterators.isEmpty()) {
// It is legal to call `noMoreSplits` multiple times or after `stop` was called.
// Nothing bad will happen if `noMoreSplits` implementation calls methods that will try to obtain a read lock because the lock is re-entrant.
hiveSplitSource.noMoreSplits();
}
}
catch (Exception e) {
hiveSplitSource.fail(e);
checkState(stopped, "Task is not marked as stopped even though it failed");
}
finally {
taskExecutionLock.writeLock().unlock();
}
}

private ListenableFuture<?> loadSplits()
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.stats.CounterStat;
Expand Down Expand Up @@ -56,12 +57,14 @@
import static com.facebook.presto.spi.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

public class TestBackgroundHiveSplitLoader
{
Expand Down Expand Up @@ -180,6 +183,18 @@ public void testEmptyFileWithNoBlocks()
assertEquals(splits.get(0).getLength(), 0);
}

@Test
public void testNoHangIfPartitionIsOffline()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoaderOfflinePartitions();
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader, TupleDomain.all());
backgroundHiveSplitLoader.start(hiveSplitSource);

assertThrows(RuntimeException.class, () -> drain(hiveSplitSource));
assertThrows(RuntimeException.class, () -> hiveSplitSource.isFinished());
}

private static List<String> drain(HiveSplitSource source)
throws Exception
{
Expand Down Expand Up @@ -243,6 +258,53 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
false);
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions()
{
ConnectorSession connectorSession = new TestingConnectorSession(
new HiveSessionProperties(new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE))).getSessionProperties());

return new BackgroundHiveSplitLoader(
SIMPLE_TABLE,
createPartitionMetadataWithOfflinePartitions(),
TupleDomain.all(),
createBucketSplitInfo(Optional.empty(), ImmutableList.of()),
connectorSession,
new TestingHdfsEnvironment(),
new NamenodeStats(),
new TestingDirectoryLister(TEST_FILES),
directExecutor(),
2,
false);
}

private static Iterable<HivePartitionMetadata> createPartitionMetadataWithOfflinePartitions()
throws RuntimeException
{
return () -> new AbstractIterator<HivePartitionMetadata>()
{
// This iterator is crafted to return a valid partition for the first calls to
// hasNext() and next(), and then it should throw for the second call to hasNext()
private int position = -1;

@Override
protected HivePartitionMetadata computeNext()
{
position++;
switch (position) {
case 0:
return new HivePartitionMetadata(
new HivePartition(new SchemaTableName("testSchema", "table_name"), ImmutableList.of()),
Optional.empty(),
ImmutableMap.of());
case 1:
throw new RuntimeException("OFFLINE");
default:
return endOfData();
}
}
};
}

private static HiveSplitSource hiveSplitSource(
BackgroundHiveSplitLoader backgroundHiveSplitLoader,
TupleDomain<HiveColumnHandle> compactEffectivePredicate)
Expand Down

0 comments on commit 1c6c5d1

Please sign in to comment.