From a9f1b8d5ad754a3afd4ea32cea2ef16a8801a5d1 Mon Sep 17 00:00:00 2001 From: Aleksei Statkevich Date: Mon, 10 Oct 2016 11:06:27 -0700 Subject: [PATCH] Make RaptorPageSink#finish() truly asynchronous --- .../presto/raptor/RaptorPageSink.java | 24 +++++++++---------- .../raptor/storage/OrcStorageManager.java | 21 +++++++++------- .../raptor/storage/StoragePageSink.java | 3 ++- .../storage/organization/ShardCompactor.java | 5 ++-- .../raptor/storage/TestOrcStorageManager.java | 9 +++---- .../organization/TestShardCompactor.java | 5 ++-- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java index 6a353e316efd..b07a7a00d009 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java @@ -35,7 +35,6 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -47,8 +46,8 @@ import static com.facebook.presto.spi.type.DateType.DATE; import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.concurrent.MoreFutures.allAsList; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toList; @@ -133,18 +132,17 @@ public CompletableFuture appendPage(Page page) @Override public CompletableFuture> finish() { - List shards = new ArrayList<>(); - for (PageBuffer pageBuffer : pageWriter.getPageBuffers()) { + List>> futureSlices = pageWriter.getPageBuffers().stream().map(pageBuffer -> { pageBuffer.flush(); - shards.addAll(pageBuffer.getStoragePageSink().commit()); - } - - ImmutableList.Builder fragments = ImmutableList.builder(); - for (ShardInfo shard : shards) { - fragments.add(Slices.wrappedBuffer(shardInfoCodec.toJsonBytes(shard))); - } - // TODO: process asynchronously - return completedFuture(fragments.build()); + CompletableFuture> futureShards = pageBuffer.getStoragePageSink().commit(); + return futureShards.thenApply(shards -> shards.stream() + .map(shard -> Slices.wrappedBuffer(shardInfoCodec.toJsonBytes(shard))) + .collect(toList())); + }).collect(toList()); + + return allAsList(futureSlices).thenApply(lists -> lists.stream() + .flatMap(Collection::stream) + .collect(toList())); } @Override diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java index a21ba57bbf4a..8b1127c13624 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java @@ -50,7 +50,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -103,6 +102,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.propagateIfInstanceOf; +import static io.airlift.concurrent.MoreFutures.allAsList; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.json.JsonCodec.jsonCodec; @@ -110,6 +110,7 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.stream.Collectors.toList; import static org.joda.time.DateTimeZone.UTC; @@ -134,6 +135,7 @@ public class OrcStorageManager private final DataSize minAvailableSpace; private final TypeManager typeManager; private final ExecutorService deletionExecutor; + private final ExecutorService commitExecutor; @Inject public OrcStorageManager( @@ -200,12 +202,14 @@ public OrcStorageManager( this.shardRecorder = requireNonNull(shardRecorder, "shardRecorder is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.deletionExecutor = newFixedThreadPool(deletionThreads, daemonThreadsNamed("raptor-delete-" + connectorId + "-%s")); + this.commitExecutor = newCachedThreadPool(daemonThreadsNamed("raptor-commit-" + connectorId + "-%s")); } @PreDestroy public void shutdown() { deletionExecutor.shutdownNow(); + commitExecutor.shutdown(); } @Override @@ -593,20 +597,19 @@ public void flush() } @Override - public List commit() + public CompletableFuture> commit() { checkState(!committed, "already committed"); committed = true; flush(); - // backup jobs depend on the staging files, so wait until all backups have finished - futures.forEach(MoreFutures::getFutureValue); - - for (ShardInfo shard : shards) { - writeShard(shard.getShardUuid()); - } - return ImmutableList.copyOf(shards); + return allAsList(futures).thenApplyAsync(ignored -> { + for (ShardInfo shard : shards) { + writeShard(shard.getShardUuid()); + } + return ImmutableList.copyOf(shards); + }, commitExecutor); } @SuppressWarnings("ResultOfMethodCallIgnored") diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StoragePageSink.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StoragePageSink.java index 9487d16f8671..88232ff0f135 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StoragePageSink.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StoragePageSink.java @@ -17,6 +17,7 @@ import com.facebook.presto.spi.Page; import java.util.List; +import java.util.concurrent.CompletableFuture; public interface StoragePageSink { @@ -30,7 +31,7 @@ public interface StoragePageSink void flush(); - List commit(); + CompletableFuture> commit(); void rollback(); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java index 9e49ae48a802..503537495f25 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java @@ -45,6 +45,7 @@ import static com.facebook.presto.raptor.storage.Row.extractRow; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.units.Duration.nanosSince; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -107,7 +108,7 @@ private List compact(StoragePageSink storagePageSink, OptionalInt buc } } } - return storagePageSink.commit(); + return getFutureValue(storagePageSink.commit()); } public List compactSorted(long transactionId, OptionalInt bucketNumber, Set uuids, List columns, List sortColumnIds, List sortOrders) @@ -151,7 +152,7 @@ public List compactSorted(long transactionId, OptionalInt bucketNumbe rowSources.add(rowSource); } outputPageSink.flush(); - List shardInfos = outputPageSink.commit(); + List shardInfos = getFutureValue(outputPageSink.commit()); updateStats(uuids.size(), shardInfos.size(), nanosSince(start).toMillis()); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java index 40371d6a4b1e..7187ff024aa1 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java @@ -84,6 +84,7 @@ import static com.google.common.hash.Hashing.md5; import static com.google.common.io.Files.createTempDir; import static com.google.common.io.Files.hash; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.wrappedBuffer; @@ -183,7 +184,7 @@ public void testWriter() List recordedShards = shardRecorder.getShards(); assertEquals(recordedShards.size(), 1); - List shards = sink.commit(); + List shards = getFutureValue(sink.commit()); assertEquals(shards.size(), 1); ShardInfo shardInfo = Iterables.getOnlyElement(shards); @@ -262,7 +263,7 @@ public void testReader() .build(); sink.appendPages(pages); - List shards = sink.commit(); + List shards = getFutureValue(sink.commit()); assertEquals(shards.size(), 1); UUID uuid = Iterables.getOnlyElement(shards).getShardUuid(); @@ -321,7 +322,7 @@ public void testRewriter() .row(456L, "bye") .build(); sink.appendPages(pages); - List shards = sink.commit(); + List shards = getFutureValue(sink.commit()); assertEquals(shardRecorder.getShards().size(), 1); @@ -658,7 +659,7 @@ private List columnStats(List columnTypes, Object[]... rows) OrcStorageManager manager = createOrcStorageManager(); StoragePageSink sink = createStoragePageSink(manager, columnIds, columnTypes); sink.appendPages(rowPagesBuilder(columnTypes).rows(rows).build()); - List shards = sink.commit(); + List shards = getFutureValue(sink.commit()); assertEquals(shards.size(), 1); return Iterables.getOnlyElement(shards).getColumnStats(); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java index 1cbeec84363e..2fe23db4243c 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java @@ -57,6 +57,7 @@ import static com.facebook.presto.testing.TestingConnectorSession.SESSION; import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder; import static com.google.common.io.Files.createTempDir; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.testing.FileUtils.deleteRecursively; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.Collections.nCopies; @@ -264,7 +265,7 @@ private static List createSortedShards(StorageManager storageManager, for (int shardNum = 0; shardNum < shardCount; shardNum++) { createSortedShard(columnTypes, sortChannels, sortOrders, sink); } - return sink.commit(); + return getFutureValue(sink.commit()); } private static void createSortedShard(List columnTypes, List sortChannels, List sortOrders, StoragePageSink sink) @@ -293,7 +294,7 @@ private static List createShards(StorageManager storageManager, List< sink.appendPages(createPages(columnTypes)); sink.flush(); } - return sink.commit(); + return getFutureValue(sink.commit()); } private static StoragePageSink createStoragePageSink(StorageManager manager, List columnIds, List columnTypes)