Skip to content

Commit

Permalink
Make RaptorPageSink#finish() truly asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Statkevich committed Nov 1, 2016
1 parent 521a9cd commit a9f1b8d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
Expand Up @@ -35,7 +35,6 @@
import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;


import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
Expand All @@ -47,8 +46,8 @@
import static com.facebook.presto.spi.type.DateType.DATE; import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.concurrent.MoreFutures.allAsList;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;


Expand Down Expand Up @@ -133,18 +132,17 @@ public CompletableFuture<?> appendPage(Page page)
@Override @Override
public CompletableFuture<Collection<Slice>> finish() public CompletableFuture<Collection<Slice>> finish()
{ {
List<ShardInfo> shards = new ArrayList<>(); List<CompletableFuture<? extends List<Slice>>> futureSlices = pageWriter.getPageBuffers().stream().map(pageBuffer -> {
for (PageBuffer pageBuffer : pageWriter.getPageBuffers()) {
pageBuffer.flush(); pageBuffer.flush();
shards.addAll(pageBuffer.getStoragePageSink().commit()); CompletableFuture<List<ShardInfo>> futureShards = pageBuffer.getStoragePageSink().commit();
} return futureShards.thenApply(shards -> shards.stream()

.map(shard -> Slices.wrappedBuffer(shardInfoCodec.toJsonBytes(shard)))
ImmutableList.Builder<Slice> fragments = ImmutableList.builder(); .collect(toList()));
for (ShardInfo shard : shards) { }).collect(toList());
fragments.add(Slices.wrappedBuffer(shardInfoCodec.toJsonBytes(shard)));
} return allAsList(futureSlices).thenApply(lists -> lists.stream()
// TODO: process asynchronously .flatMap(Collection::stream)
return completedFuture(fragments.build()); .collect(toList()));
} }


@Override @Override
Expand Down
Expand Up @@ -50,7 +50,6 @@
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice; import io.airlift.slice.Slice;
import io.airlift.slice.Slices; import io.airlift.slice.Slices;
Expand Down Expand Up @@ -103,13 +102,15 @@
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagateIfInstanceOf; import static com.google.common.base.Throwables.propagateIfInstanceOf;
import static io.airlift.concurrent.MoreFutures.allAsList;
import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.json.JsonCodec.jsonCodec;
import static java.lang.Math.min; import static java.lang.Math.min;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.supplyAsync; import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
Expand All @@ -134,6 +135,7 @@ public class OrcStorageManager
private final DataSize minAvailableSpace; private final DataSize minAvailableSpace;
private final TypeManager typeManager; private final TypeManager typeManager;
private final ExecutorService deletionExecutor; private final ExecutorService deletionExecutor;
private final ExecutorService commitExecutor;


@Inject @Inject
public OrcStorageManager( public OrcStorageManager(
Expand Down Expand Up @@ -200,12 +202,14 @@ public OrcStorageManager(
this.shardRecorder = requireNonNull(shardRecorder, "shardRecorder is null"); this.shardRecorder = requireNonNull(shardRecorder, "shardRecorder is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.deletionExecutor = newFixedThreadPool(deletionThreads, daemonThreadsNamed("raptor-delete-" + connectorId + "-%s")); this.deletionExecutor = newFixedThreadPool(deletionThreads, daemonThreadsNamed("raptor-delete-" + connectorId + "-%s"));
this.commitExecutor = newCachedThreadPool(daemonThreadsNamed("raptor-commit-" + connectorId + "-%s"));
} }


@PreDestroy @PreDestroy
public void shutdown() public void shutdown()
{ {
deletionExecutor.shutdownNow(); deletionExecutor.shutdownNow();
commitExecutor.shutdown();
} }


@Override @Override
Expand Down Expand Up @@ -593,20 +597,19 @@ public void flush()
} }


@Override @Override
public List<ShardInfo> commit() public CompletableFuture<List<ShardInfo>> commit()
{ {
checkState(!committed, "already committed"); checkState(!committed, "already committed");
committed = true; committed = true;


flush(); flush();


// backup jobs depend on the staging files, so wait until all backups have finished return allAsList(futures).thenApplyAsync(ignored -> {
futures.forEach(MoreFutures::getFutureValue); for (ShardInfo shard : shards) {

writeShard(shard.getShardUuid());
for (ShardInfo shard : shards) { }
writeShard(shard.getShardUuid()); return ImmutableList.copyOf(shards);
} }, commitExecutor);
return ImmutableList.copyOf(shards);
} }


@SuppressWarnings("ResultOfMethodCallIgnored") @SuppressWarnings("ResultOfMethodCallIgnored")
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;


import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;


public interface StoragePageSink public interface StoragePageSink
{ {
Expand All @@ -30,7 +31,7 @@ public interface StoragePageSink


void flush(); void flush();


List<ShardInfo> commit(); CompletableFuture<List<ShardInfo>> commit();


void rollback(); void rollback();
} }
Expand Up @@ -45,6 +45,7 @@


import static com.facebook.presto.raptor.storage.Row.extractRow; import static com.facebook.presto.raptor.storage.Row.extractRow;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.units.Duration.nanosSince; import static io.airlift.units.Duration.nanosSince;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -107,7 +108,7 @@ private List<ShardInfo> compact(StoragePageSink storagePageSink, OptionalInt buc
} }
} }
} }
return storagePageSink.commit(); return getFutureValue(storagePageSink.commit());
} }


public List<ShardInfo> compactSorted(long transactionId, OptionalInt bucketNumber, Set<UUID> uuids, List<ColumnInfo> columns, List<Long> sortColumnIds, List<SortOrder> sortOrders) public List<ShardInfo> compactSorted(long transactionId, OptionalInt bucketNumber, Set<UUID> uuids, List<ColumnInfo> columns, List<Long> sortColumnIds, List<SortOrder> sortOrders)
Expand Down Expand Up @@ -151,7 +152,7 @@ public List<ShardInfo> compactSorted(long transactionId, OptionalInt bucketNumbe
rowSources.add(rowSource); rowSources.add(rowSource);
} }
outputPageSink.flush(); outputPageSink.flush();
List<ShardInfo> shardInfos = outputPageSink.commit(); List<ShardInfo> shardInfos = getFutureValue(outputPageSink.commit());


updateStats(uuids.size(), shardInfos.size(), nanosSince(start).toMillis()); updateStats(uuids.size(), shardInfos.size(), nanosSince(start).toMillis());


Expand Down
Expand Up @@ -84,6 +84,7 @@
import static com.google.common.hash.Hashing.md5; import static com.google.common.hash.Hashing.md5;
import static com.google.common.io.Files.createTempDir; import static com.google.common.io.Files.createTempDir;
import static com.google.common.io.Files.hash; import static com.google.common.io.Files.hash;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer; import static io.airlift.slice.Slices.wrappedBuffer;
Expand Down Expand Up @@ -183,7 +184,7 @@ public void testWriter()
List<RecordedShard> recordedShards = shardRecorder.getShards(); List<RecordedShard> recordedShards = shardRecorder.getShards();
assertEquals(recordedShards.size(), 1); assertEquals(recordedShards.size(), 1);


List<ShardInfo> shards = sink.commit(); List<ShardInfo> shards = getFutureValue(sink.commit());


assertEquals(shards.size(), 1); assertEquals(shards.size(), 1);
ShardInfo shardInfo = Iterables.getOnlyElement(shards); ShardInfo shardInfo = Iterables.getOnlyElement(shards);
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testReader()
.build(); .build();


sink.appendPages(pages); sink.appendPages(pages);
List<ShardInfo> shards = sink.commit(); List<ShardInfo> shards = getFutureValue(sink.commit());


assertEquals(shards.size(), 1); assertEquals(shards.size(), 1);
UUID uuid = Iterables.getOnlyElement(shards).getShardUuid(); UUID uuid = Iterables.getOnlyElement(shards).getShardUuid();
Expand Down Expand Up @@ -321,7 +322,7 @@ public void testRewriter()
.row(456L, "bye") .row(456L, "bye")
.build(); .build();
sink.appendPages(pages); sink.appendPages(pages);
List<ShardInfo> shards = sink.commit(); List<ShardInfo> shards = getFutureValue(sink.commit());


assertEquals(shardRecorder.getShards().size(), 1); assertEquals(shardRecorder.getShards().size(), 1);


Expand Down Expand Up @@ -658,7 +659,7 @@ private List<ColumnStats> columnStats(List<Type> columnTypes, Object[]... rows)
OrcStorageManager manager = createOrcStorageManager(); OrcStorageManager manager = createOrcStorageManager();
StoragePageSink sink = createStoragePageSink(manager, columnIds, columnTypes); StoragePageSink sink = createStoragePageSink(manager, columnIds, columnTypes);
sink.appendPages(rowPagesBuilder(columnTypes).rows(rows).build()); sink.appendPages(rowPagesBuilder(columnTypes).rows(rows).build());
List<ShardInfo> shards = sink.commit(); List<ShardInfo> shards = getFutureValue(sink.commit());


assertEquals(shards.size(), 1); assertEquals(shards.size(), 1);
return Iterables.getOnlyElement(shards).getColumnStats(); return Iterables.getOnlyElement(shards).getColumnStats();
Expand Down
Expand Up @@ -57,6 +57,7 @@
import static com.facebook.presto.testing.TestingConnectorSession.SESSION; import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder; import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder;
import static com.google.common.io.Files.createTempDir; import static com.google.common.io.Files.createTempDir;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.testing.FileUtils.deleteRecursively; import static io.airlift.testing.FileUtils.deleteRecursively;
import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Collections.nCopies; import static java.util.Collections.nCopies;
Expand Down Expand Up @@ -264,7 +265,7 @@ private static List<ShardInfo> createSortedShards(StorageManager storageManager,
for (int shardNum = 0; shardNum < shardCount; shardNum++) { for (int shardNum = 0; shardNum < shardCount; shardNum++) {
createSortedShard(columnTypes, sortChannels, sortOrders, sink); createSortedShard(columnTypes, sortChannels, sortOrders, sink);
} }
return sink.commit(); return getFutureValue(sink.commit());
} }


private static void createSortedShard(List<Type> columnTypes, List<Integer> sortChannels, List<SortOrder> sortOrders, StoragePageSink sink) private static void createSortedShard(List<Type> columnTypes, List<Integer> sortChannels, List<SortOrder> sortOrders, StoragePageSink sink)
Expand Down Expand Up @@ -293,7 +294,7 @@ private static List<ShardInfo> createShards(StorageManager storageManager, List<
sink.appendPages(createPages(columnTypes)); sink.appendPages(createPages(columnTypes));
sink.flush(); sink.flush();
} }
return sink.commit(); return getFutureValue(sink.commit());
} }


private static StoragePageSink createStoragePageSink(StorageManager manager, List<Long> columnIds, List<Type> columnTypes) private static StoragePageSink createStoragePageSink(StorageManager manager, List<Long> columnIds, List<Type> columnTypes)
Expand Down

0 comments on commit a9f1b8d

Please sign in to comment.