Skip to content

Commit

Permalink
ParallelCombiner: Fix buffer leak on exception in "combine". (apache#…
Browse files Browse the repository at this point in the history
…5630) (apache#5633)

Once a buffer is acquired, we need to make sure to release it if an
exception is thrown before the closeable iterator is created.
  • Loading branch information
gianm authored and b-slim committed Apr 12, 2018
1 parent aa37a78 commit ce89de5
Showing 1 changed file with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,55 @@ public CloseableIterator<Entry<KeyType>> combine(
)
{
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
final Closer closer = Closer.create();
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
// required number of buffers maximizing the parallelism.
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
combineBuffer,
minimumRequiredBufferCapacity,
concurrencyHint,
sortedIterators.size()
);
closer.register(combineBufferHolder);

final int leafCombineDegree = degreeAndNumBuffers.lhs;
final int numBuffers = degreeAndNumBuffers.rhs;
final int sliceSize = combineBuffer.capacity() / numBuffers;
try {
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
// required number of buffers maximizing the parallelism.
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
combineBuffer,
minimumRequiredBufferCapacity,
concurrencyHint,
sortedIterators.size()
);

final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
final int leafCombineDegree = degreeAndNumBuffers.lhs;
final int numBuffers = degreeAndNumBuffers.rhs;
final int sliceSize = combineBuffer.capacity() / numBuffers;

final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
leafCombineDegree,
mergedDictionary
);
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);

final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
leafCombineDegree,
mergedDictionary
);

final Closer closer = Closer.create();
closer.register(combineBufferHolder);
closer.register(() -> checkCombineFutures(combineFutures));
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
closer.register(() -> checkCombineFutures(combineFutures));

return CloseableIterators.wrap(combineIterator, closer);
return CloseableIterators.wrap(combineIterator, closer);
}
catch (Throwable t) {
try {
closer.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
}

private static void checkCombineFutures(List<Future> combineFutures)
Expand Down Expand Up @@ -289,11 +300,11 @@ private int computeRequiredBufferNum(int numChildNodes, int combineDegree)
* Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input
* iterators asynchronously.
*
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree for the current level
* @param dictionary merged dictionary
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree for the current level
* @param dictionary merged dictionary
*
* @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all
* executed combining tasks
Expand Down

0 comments on commit ce89de5

Please sign in to comment.