Skip to content

Commit

Permalink
Fix eager loading of Hive partitions
Browse files Browse the repository at this point in the history
Remove defensive copy of Hive partitions to avoid eagerly loading all
the partitions
  • Loading branch information
cberner committed May 8, 2015
1 parent b7499a8 commit 50da260
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.spi.TupleDomain; import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -47,9 +46,7 @@
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


Expand Down Expand Up @@ -82,7 +79,7 @@ public class BackgroundHiveSplitLoader
private final Executor executor; private final Executor executor;
private final ConnectorSession session; private final ConnectorSession session;
private final AtomicInteger outstandingTasks = new AtomicInteger(); private final AtomicInteger outstandingTasks = new AtomicInteger();
private final Queue<HivePartitionMetadata> partitions = new ConcurrentLinkedQueue<>(); private final ConcurrentLazyQueue<HivePartitionMetadata> partitions;
private final Deque<HiveFileIterator> fileIterators = new ConcurrentLinkedDeque<>(); private final Deque<HiveFileIterator> fileIterators = new ConcurrentLinkedDeque<>();
private final AtomicInteger remainingInitialSplits; private final AtomicInteger remainingInitialSplits;


Expand Down Expand Up @@ -120,7 +117,7 @@ public BackgroundHiveSplitLoader(
this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled; this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
this.forceLocalScheduling = forceLocalScheduling; this.forceLocalScheduling = forceLocalScheduling;
this.executor = executor; this.executor = executor;
Iterables.addAll(this.partitions, partitions); this.partitions = new ConcurrentLazyQueue<>(partitions);
} }


@Override @Override
Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import javax.annotation.concurrent.GuardedBy;

import java.util.Iterator;

public class ConcurrentLazyQueue<E>
{
@GuardedBy("this")
private final Iterator<E> iterator;

public ConcurrentLazyQueue(Iterable<E> iterable)
{
this.iterator = iterable.iterator();
}

public synchronized boolean isEmpty()
{
return !iterator.hasNext();
}

public synchronized E poll()
{
if (!iterator.hasNext()) {
return null;
}
return iterator.next();
}
}

0 comments on commit 50da260

Please sign in to comment.