From f0b17d625caaa2aa742cec7e6d17c90b538ec66d Mon Sep 17 00:00:00 2001 From: David Phillips Date: Fri, 13 Nov 2015 21:32:33 -0800 Subject: [PATCH] Use single split per bucket --- .../presto/raptor/RaptorMetadata.java | 8 +- .../raptor/RaptorPageSourceProvider.java | 38 ++++-- .../facebook/presto/raptor/RaptorSplit.java | 37 ++++-- .../presto/raptor/RaptorSplitManager.java | 38 ++++-- .../presto/raptor/RaptorTableHandle.java | 12 +- .../raptor/metadata/DatabaseShardManager.java | 39 +++++-- .../presto/raptor/metadata/IndexInserter.java | 32 +++++- .../presto/raptor/metadata/ShardIterator.java | 71 ++++++++++-- .../presto/raptor/metadata/ShardManager.java | 4 +- .../presto/raptor/metadata/ShardNodes.java | 21 ++-- .../presto/raptor/util/ConcatPageSource.java | 108 ++++++++++++++++++ .../raptor/TestRaptorDistributedQueries.java | 4 + .../metadata/TestDatabaseShardManager.java | 72 ++++++++---- .../storage/TestShardCompactionDiscovery.java | 2 +- .../raptor/storage/TestShardEjector.java | 4 +- 15 files changed, 396 insertions(+), 94 deletions(-) create mode 100644 presto-raptor/src/main/java/com/facebook/presto/raptor/util/ConcatPageSource.java diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorMetadata.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorMetadata.java index 17db68e8ed21..c2e8ef1dec6e 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorMetadata.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorMetadata.java @@ -166,7 +166,8 @@ private ConnectorTableHandle getTableHandle(SchemaTableName tableName) table.getDistributionId(), table.getBucketCount(), OptionalLong.empty(), - Optional.ofNullable(sampleWeightColumnHandle)); + Optional.ofNullable(sampleWeightColumnHandle), + false); } @Override @@ -571,7 +572,7 @@ public void finishCreateTable(ConnectorSession session, ConnectorOutputTableHand List columns = table.getColumnHandles().stream().map(ColumnInfo::fromHandle).collect(toList()); // TODO: refactor this to avoid creating an empty table on failure - shardManager.createTable(newTableId, columns); + shardManager.createTable(newTableId, columns, table.getBucketCount().isPresent()); shardManager.commitShards(transactionId, newTableId, columns, parseFragments(fragments), Optional.empty()); clearRollback(); @@ -660,7 +661,8 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable handle.getDistributionId(), handle.getBucketCount(), OptionalLong.of(transactionId), - handle.getSampleWeightColumnHandle()); + handle.getSampleWeightColumnHandle(), + true); } @Override diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java index 5cd3b7530dd6..929c215d70d7 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java @@ -15,18 +15,22 @@ import com.facebook.presto.raptor.storage.ReaderAttributes; import com.facebook.presto.raptor.storage.StorageManager; +import com.facebook.presto.raptor.util.ConcatPageSource; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; import javax.inject.Inject; +import java.util.Iterator; import java.util.List; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.UUID; import java.util.function.Function; @@ -50,20 +54,36 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti { RaptorSplit raptorSplit = checkType(split, RaptorSplit.class, "split"); - UUID shardUuid = raptorSplit.getShardUuid(); OptionalInt bucketNumber = raptorSplit.getBucketNumber(); + TupleDomain predicate = raptorSplit.getEffectivePredicate(); + ReaderAttributes attributes = ReaderAttributes.from(session); + OptionalLong transactionId = raptorSplit.getTransactionId(); + + if (raptorSplit.getShardUuids().size() == 1) { + UUID shardUuid = raptorSplit.getShardUuids().iterator().next(); + return createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId); + } + + Iterator iterator = raptorSplit.getShardUuids().stream() + .map(shardUuid -> createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId)) + .iterator(); + + return new ConcatPageSource(iterator); + } + + private ConnectorPageSource createPageSource( + UUID shardUuid, + OptionalInt bucketNumber, + List columns, + TupleDomain predicate, + ReaderAttributes attributes, + OptionalLong transactionId) + { List columnHandles = columns.stream().map(toRaptorColumnHandle()).collect(toList()); List columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList()); List columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList()); - return storageManager.getPageSource( - shardUuid, - bucketNumber, - columnIds, - columnTypes, - raptorSplit.getEffectivePredicate(), - ReaderAttributes.from(session), - raptorSplit.getTransactionId()); + return storageManager.getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId); } private static Function toRaptorColumnHandle() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplit.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplit.java index 22f417af127d..b31188d0ca26 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplit.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplit.java @@ -19,10 +19,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.Set; import java.util.UUID; import static com.google.common.base.MoreObjects.toStringHelper; @@ -32,7 +34,7 @@ public class RaptorSplit implements ConnectorSplit { private final String connectorId; - private final UUID shardUuid; + private final Set shardUuids; private final OptionalInt bucketNumber; private final List addresses; private final TupleDomain effectivePredicate; @@ -41,24 +43,45 @@ public class RaptorSplit @JsonCreator public RaptorSplit( @JsonProperty("connectorId") String connectorId, - @JsonProperty("shardUuid") UUID shardUuid, + @JsonProperty("shardUuids") Set shardUuids, @JsonProperty("bucketNumber") OptionalInt bucketNumber, @JsonProperty("effectivePredicate") TupleDomain effectivePredicate, @JsonProperty("transactionId") OptionalLong transactionId) { - this(connectorId, shardUuid, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId); + this(connectorId, shardUuids, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId); } public RaptorSplit( String connectorId, UUID shardUuid, + List addresses, + TupleDomain effectivePredicate, + OptionalLong transactionId) + { + this(connectorId, ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, effectivePredicate, transactionId); + } + + public RaptorSplit( + String connectorId, + Set shardUuids, + int bucketNumber, + List addresses, + TupleDomain effectivePredicate, + OptionalLong transactionId) + { + this(connectorId, shardUuids, OptionalInt.of(bucketNumber), addresses, effectivePredicate, transactionId); + } + + private RaptorSplit( + String connectorId, + Set shardUuids, OptionalInt bucketNumber, List addresses, TupleDomain effectivePredicate, OptionalLong transactionId) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); - this.shardUuid = requireNonNull(shardUuid, "shardUuid is null"); + this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuid is null")); this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null"); this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); @@ -84,9 +107,9 @@ public String getConnectorId() } @JsonProperty - public UUID getShardUuid() + public Set getShardUuids() { - return shardUuid; + return shardUuids; } @JsonProperty @@ -117,7 +140,7 @@ public Object getInfo() public String toString() { return toStringHelper(this) - .add("shardUuid", shardUuid) + .add("shardUuids", shardUuids) .add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null) .add("hosts", addresses) .omitNullValues() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java index 421c147d5fb2..ff88fb739cc0 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java @@ -38,7 +38,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.UUID; @@ -51,6 +50,7 @@ import static com.facebook.presto.raptor.util.Types.checkType; import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.Maps.uniqueIndex; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static java.lang.String.format; @@ -94,7 +94,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout"); RaptorTableHandle table = handle.getTable(); TupleDomain effectivePredicate = toRaptorTupleDomain(handle.getConstraint()); - return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId()); + long tableId = table.getTableId(); + boolean bucketed = table.getBucketCount().isPresent(); + boolean merged = bucketed && !table.isDelete(); + OptionalLong transactionId = table.getTransactionId(); + return new RaptorSplitSource(tableId, bucketed, merged, effectivePredicate, transactionId); } private static List getAddressesForNodes(Map nodeMap, Iterable nodeIdentifiers) @@ -133,12 +137,17 @@ private class RaptorSplitSource @GuardedBy("this") private CompletableFuture> future; - public RaptorSplitSource(long tableId, TupleDomain effectivePredicate, OptionalLong transactionId) + public RaptorSplitSource( + long tableId, + boolean bucketed, + boolean merged, + TupleDomain effectivePredicate, + OptionalLong transactionId) { this.tableId = tableId; this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.transactionId = requireNonNull(transactionId, "transactionId is null"); - this.iterator = new SynchronizedResultIterator<>(shardManager.getShardNodes(tableId, effectivePredicate)); + this.iterator = new SynchronizedResultIterator<>(shardManager.getShardNodes(tableId, bucketed, merged, effectivePredicate)); } @Override @@ -190,19 +199,26 @@ private Supplier> batchSupplier(int maxSize) private ConnectorSplit createSplit(ShardNodes shard) { - UUID shardId = shard.getShardUuid(); - OptionalInt bucketNumber = shard.getBucketNumber(); + Set shardUuids = shard.getShardUuids(); Collection nodeIds = shard.getNodeIdentifiers(); - List addresses = getAddressesForNodes(nodesById, nodeIds); + if (shard.getBucketNumber().isPresent()) { + int bucketNumber = shard.getBucketNumber().getAsInt(); + if (addresses.isEmpty()) { + // TODO: reassign + throw new PrestoException(NO_NODES_AVAILABLE, "Reassignment not yet implemented"); + } + return new RaptorSplit(connectorId, shardUuids, bucketNumber, addresses, effectivePredicate, transactionId); + } + + verify(shardUuids.size() == 1, "wrong shard count for non-bucketed table: %s", shardUuids.size()); + UUID shardId = shardUuids.iterator().next(); + if (addresses.isEmpty()) { if (!backupAvailable) { throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds)); } - if (bucketNumber.isPresent()) { - throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, "No host for shard of bucketed table"); - } // Pick a random node and optimistically assign the shard to it. // That node will restore the shard from the backup location. @@ -215,7 +231,7 @@ private ConnectorSplit createSplit(ShardNodes shard) addresses = ImmutableList.of(node.getHostAndPort()); } - return new RaptorSplit(connectorId, shardId, bucketNumber, addresses, effectivePredicate, transactionId); + return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId); } } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorTableHandle.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorTableHandle.java index dffff5f94adc..eb07641671ae 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorTableHandle.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorTableHandle.java @@ -38,6 +38,7 @@ public final class RaptorTableHandle private final OptionalInt bucketCount; private final OptionalLong transactionId; private final Optional sampleWeightColumnHandle; + private final boolean delete; @JsonCreator public RaptorTableHandle( @@ -48,7 +49,8 @@ public RaptorTableHandle( @JsonProperty("distributionId") OptionalLong distributionId, @JsonProperty("bucketCount") OptionalInt bucketCount, @JsonProperty("transactionId") OptionalLong transactionId, - @JsonProperty("sampleWeightColumnHandle") Optional sampleWeightColumnHandle) + @JsonProperty("sampleWeightColumnHandle") Optional sampleWeightColumnHandle, + @JsonProperty("delete") boolean delete) { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.schemaName = checkSchemaName(schemaName); @@ -61,6 +63,8 @@ public RaptorTableHandle( this.distributionId = requireNonNull(distributionId, "distributionId is null"); this.bucketCount = requireNonNull(bucketCount, "bucketCount is null"); this.transactionId = requireNonNull(transactionId, "transactionId is null"); + + this.delete = delete; } @JsonProperty @@ -111,6 +115,12 @@ public Optional getSampleWeightColumnHandle() return sampleWeightColumnHandle; } + @JsonProperty + public boolean isDelete() + { + return delete; + } + @Override public String toString() { diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java index 2a3ccfb1e5ce..ad02d787277a 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java @@ -118,7 +118,7 @@ public DatabaseShardManager(@ForMetadata IDBI dbi, NodeSupplier nodeSupplier) } @Override - public void createTable(long tableId, List columns) + public void createTable(long tableId, List columns, boolean bucketed) { StringJoiner tableColumns = new StringJoiner(",\n ", " ", ",\n").setEmptyValue(""); @@ -130,15 +130,30 @@ public void createTable(long tableId, List columns) } } - String sql = "" + - "CREATE TABLE " + shardIndexTable(tableId) + " (\n" + - " shard_id BIGINT NOT NULL PRIMARY KEY,\n" + - " shard_uuid BINARY(16) NOT NULL,\n" + - " node_ids VARBINARY(128) NOT NULL,\n" + - " bucket_number INT,\n" + - tableColumns + - " UNIQUE (shard_uuid)\n" + - ")"; + String sql; + if (bucketed) { + sql = "" + + "CREATE TABLE " + shardIndexTable(tableId) + " (\n" + + " shard_id BIGINT NOT NULL,\n" + + " shard_uuid BINARY(16) NOT NULL,\n" + + " bucket_number INT NOT NULL\n," + + " node_ids VARBINARY(128) NOT NULL,\n" + + tableColumns + + " PRIMARY KEY (bucket_number, shard_uuid),\n" + + " UNIQUE (shard_id),\n" + + " UNIQUE (shard_uuid)\n" + + ")"; + } + else { + sql = "" + + "CREATE TABLE " + shardIndexTable(tableId) + " (\n" + + " shard_id BIGINT NOT NULL PRIMARY KEY,\n" + + " shard_uuid BINARY(16) NOT NULL,\n" + + " node_ids VARBINARY(128) NOT NULL,\n" + + tableColumns + + " UNIQUE (shard_uuid)\n" + + ")"; + } try (Handle handle = dbi.open()) { handle.execute(sql); @@ -361,9 +376,9 @@ public Set getNodeShards(String nodeIdentifier) } @Override - public ResultIterator getShardNodes(long tableId, TupleDomain effectivePredicate) + public ResultIterator getShardNodes(long tableId, boolean bucketed, boolean merged, TupleDomain effectivePredicate) { - return new ShardIterator(tableId, effectivePredicate, dbi); + return new ShardIterator(tableId, bucketed, merged, effectivePredicate, dbi); } @Override diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/IndexInserter.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/IndexInserter.java index d3f329fcc5db..c7a996cce221 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/IndexInserter.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/IndexInserter.java @@ -15,6 +15,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.util.BooleanMapper; import java.sql.Connection; import java.sql.JDBCType; @@ -35,13 +37,14 @@ import static com.facebook.presto.raptor.metadata.ShardPredicate.bindValue; import static com.facebook.presto.raptor.metadata.ShardPredicate.jdbcType; import static com.facebook.presto.raptor.util.ArrayUtil.intArrayToBytes; -import static com.facebook.presto.raptor.util.DatabaseUtil.bindOptionalInt; import static com.facebook.presto.raptor.util.UuidUtil.uuidToBytes; +import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.Slices.utf8Slice; class IndexInserter implements AutoCloseable { + private final boolean bucketed; private final List columns; private final Map indexes; private final Map types; @@ -50,6 +53,12 @@ class IndexInserter public IndexInserter(Connection connection, long tableId, List columns) throws SQLException { + this.bucketed = DBI.open(connection) + .createQuery("SELECT distribution_id IS NOT NULL FROM tables WHERE table_id = ?") + .bind(0, tableId) + .map(BooleanMapper.FIRST) + .first(); + ImmutableList.Builder columnBuilder = ImmutableList.builder(); ImmutableMap.Builder indexBuilder = ImmutableMap.builder(); ImmutableMap.Builder typeBuilder = ImmutableMap.builder(); @@ -57,9 +66,15 @@ public IndexInserter(Connection connection, long tableId, List colum StringJoiner valueJoiner = new StringJoiner(", "); int index = 1; - nameJoiner.add("shard_id").add("shard_uuid").add("node_ids").add("bucket_number"); - valueJoiner.add("?").add("?").add("?").add("?"); - index += 4; + nameJoiner.add("shard_id").add("shard_uuid").add("node_ids"); + valueJoiner.add("?").add("?").add("?"); + index += 3; + + if (bucketed) { + nameJoiner.add("bucket_number"); + valueJoiner.add("?"); + index++; + } for (ColumnInfo column : columns) { JDBCType jdbcType = jdbcType(column.getType()); @@ -108,7 +123,14 @@ public void insert(long shardId, UUID shardUuid, OptionalInt bucketNumber, Set nodeMap = new HashMap<>(); + private final boolean bucketed; + private final boolean merged; private final ShardManagerDao dao; private final Connection connection; private final PreparedStatement statement; private final ResultSet resultSet; - public ShardIterator(long tableId, TupleDomain effectivePredicate, IDBI dbi) + public ShardIterator(long tableId, boolean bucketed, boolean merged, TupleDomain effectivePredicate, IDBI dbi) { + this.bucketed = bucketed; + this.merged = merged; ShardPredicate predicate = ShardPredicate.create(effectivePredicate); - String sql = format( - "SELECT shard_uuid, bucket_number, node_ids FROM %s WHERE %s", - shardIndexTable(tableId), - predicate.getPredicate()); + String sql; + if (bucketed) { + sql = "SELECT shard_uuid, node_ids, bucket_number FROM %s WHERE %s ORDER BY bucket_number"; + } + else { + sql = "SELECT shard_uuid, node_ids FROM %s WHERE %s"; + } + sql = format(sql, shardIndexTable(tableId), predicate.getPredicate()); dao = onDemandDao(dbi, ShardManagerDao.class); fetchNodes(); @@ -86,7 +95,7 @@ public ShardIterator(long tableId, TupleDomain effectivePred protected ShardNodes computeNext() { try { - return compute(); + return merged ? computeMerged() : compute(); } catch (SQLException e) { throw metadataError(e); @@ -107,6 +116,9 @@ public void close() } } + /** + * Compute split-per-shard (separate split for each shard). + */ private ShardNodes compute() throws SQLException { @@ -115,15 +127,52 @@ private ShardNodes compute() } UUID shardUuid = uuidFromBytes(resultSet.getBytes("shard_uuid")); - OptionalInt bucketNumber = getOptionalInt(resultSet, "bucket_number"); List nodeIds = intArrayFromBytes(resultSet.getBytes("node_ids")); + OptionalInt bucketNumber = bucketed ? OptionalInt.of(resultSet.getInt("bucket_number")) : OptionalInt.empty(); + + return new ShardNodes(ImmutableSet.of(shardUuid), bucketNumber, getNodeIdentifiers(nodeIds, shardUuid)); + } + + /** + * Compute split-per-bucket (single split for all shards in a bucket). + */ + private ShardNodes computeMerged() + throws SQLException + { + if (resultSet.isAfterLast()) { + return endOfData(); + } + if (resultSet.getRow() == 0) { + if (!resultSet.next()) { + return endOfData(); + } + } + + int bucketNumber = resultSet.getInt("bucket_number"); + byte[] nodeIdBytes = resultSet.getBytes("node_ids"); + ImmutableSet.Builder shardBuilder = ImmutableSet.builder(); + + do { + if (!Arrays.equals(nodeIdBytes, resultSet.getBytes("node_ids"))) { + throw new PrestoException(RAPTOR_ERROR, "Shards in same bucket have different node assignments"); + } + shardBuilder.add(uuidFromBytes(resultSet.getBytes("shard_uuid"))); + } + while (resultSet.next() && resultSet.getInt("bucket_number") == bucketNumber); + + List nodeIds = intArrayFromBytes(nodeIdBytes); + Set shardUuids = shardBuilder.build(); + Set nodeIdentifiers = getNodeIdentifiers(nodeIds, shardUuids.iterator().next()); + return new ShardNodes(shardUuids, OptionalInt.of(bucketNumber), nodeIdentifiers); + } + + private Set getNodeIdentifiers(List nodeIds, UUID shardUuid) + { Function fetchNode = id -> fetchNode(id, shardUuid); - Set nodeIdentifiers = nodeIds.stream() + return nodeIds.stream() .map(id -> nodeMap.computeIfAbsent(id, fetchNode)) .collect(toSet()); - - return new ShardNodes(shardUuid, bucketNumber, nodeIdentifiers); } private String fetchNode(int id, UUID shardUuid) diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java index 9e9680aa65b3..ad3b6aacb286 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java @@ -30,7 +30,7 @@ public interface ShardManager /** * Create a table. */ - void createTable(long tableId, List columns); + void createTable(long tableId, List columns, boolean bucketed); /** * Drop a table. @@ -60,7 +60,7 @@ public interface ShardManager /** * Return the shard nodes a given table. */ - ResultIterator getShardNodes(long tableId, TupleDomain effectivePredicate); + ResultIterator getShardNodes(long tableId, boolean bucketed, boolean merged, TupleDomain effectivePredicate); /** * Assign a shard to a node. diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardNodes.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardNodes.java index 778b49cd71e7..84135f589bdd 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardNodes.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardNodes.java @@ -25,20 +25,25 @@ public class ShardNodes { - private final UUID shardUuid; + private final Set shardUuids; private final OptionalInt bucketNumber; private final Set nodeIdentifiers; - public ShardNodes(UUID shardUuid, OptionalInt bucketNumber, Set nodeIdentifiers) + public ShardNodes(UUID shardUuid, Set nodeIdentifiers) { - this.shardUuid = requireNonNull(shardUuid, "shardUuid is null"); + this(ImmutableSet.of(shardUuid), OptionalInt.empty(), nodeIdentifiers); + } + + public ShardNodes(Set shardUuids, OptionalInt bucketNumber, Set nodeIdentifiers) + { + this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuids is null")); this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null"); this.nodeIdentifiers = ImmutableSet.copyOf(requireNonNull(nodeIdentifiers, "nodeIdentifiers is null")); } - public UUID getShardUuid() + public Set getShardUuids() { - return shardUuid; + return shardUuids; } public OptionalInt getBucketNumber() @@ -61,7 +66,7 @@ public boolean equals(Object obj) return false; } ShardNodes other = (ShardNodes) obj; - return Objects.equals(this.shardUuid, other.shardUuid) && + return Objects.equals(this.shardUuids, other.shardUuids) && Objects.equals(this.bucketNumber, other.bucketNumber) && Objects.equals(this.nodeIdentifiers, other.nodeIdentifiers); } @@ -69,14 +74,14 @@ public boolean equals(Object obj) @Override public int hashCode() { - return Objects.hash(shardUuid, bucketNumber, nodeIdentifiers); + return Objects.hash(shardUuids, bucketNumber, nodeIdentifiers); } @Override public String toString() { return toStringHelper(this) - .add("shardUuid", shardUuid) + .add("shardUuids", shardUuids) .add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null) .add("nodeIdentifiers", nodeIdentifiers) .omitNullValues() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/util/ConcatPageSource.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/util/ConcatPageSource.java new file mode 100644 index 000000000000..da1137b17f02 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/util/ConcatPageSource.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.util; + +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.Page; + +import java.io.IOException; +import java.util.Iterator; + +import static java.util.Objects.requireNonNull; + +public class ConcatPageSource + implements ConnectorPageSource +{ + private final Iterator iterator; + + private ConnectorPageSource current; + private long totalBytes; + private long completedBytes; + private long readTimeNanos; + + public ConcatPageSource(Iterator iterator) + { + this.iterator = requireNonNull(iterator, "iterator is null"); + } + + @Override + public long getTotalBytes() + { + setup(); + return totalBytes + ((current != null) ? current.getTotalBytes() : 0); + } + + @Override + public long getCompletedBytes() + { + setup(); + return completedBytes + ((current != null) ? current.getCompletedBytes() : 0); + } + + @Override + public long getReadTimeNanos() + { + setup(); + return readTimeNanos + ((current != null) ? current.getReadTimeNanos() : 0); + } + + @Override + public boolean isFinished() + { + setup(); + return current == null; + } + + @Override + public Page getNextPage() + { + while (true) { + setup(); + + if (current == null) { + return null; + } + if (!current.isFinished()) { + return current.getNextPage(); + } + + totalBytes += current.getTotalBytes(); + completedBytes += current.getCompletedBytes(); + readTimeNanos += current.getReadTimeNanos(); + current = null; + } + } + + @Override + public long getSystemMemoryUsage() + { + return (current != null) ? current.getSystemMemoryUsage() : 0; + } + + @Override + public void close() + throws IOException + { + if (current != null) { + current.close(); + } + } + + private void setup() + { + if ((current == null) && iterator.hasNext()) { + current = iterator.next(); + } + } +} diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorDistributedQueries.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorDistributedQueries.java index f69ac3ec3d53..a15215996d09 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorDistributedQueries.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorDistributedQueries.java @@ -121,6 +121,10 @@ public void testCreateBucketedTable() assertQuery("SELECT count(*) FROM orders_bucketed a JOIN orders_bucketed b USING (orderkey)", "SELECT count(*) * 4 FROM orders"); + assertUpdate("DELETE FROM orders_bucketed WHERE orderkey = 37", 2); + assertQuery("SELECT count(*) FROM orders_bucketed", "SELECT (count(*) * 2) - 2 FROM orders"); + assertQuery("SELECT count(DISTINCT \"$shard_uuid\") FROM orders_bucketed", "SELECT 50 * 2"); + assertUpdate("DROP TABLE orders_bucketed"); } } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java index 7e9023673986..6211d50dd800 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java @@ -39,6 +39,7 @@ import java.io.File; import java.time.LocalDate; import java.time.ZonedDateTime; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -67,6 +68,7 @@ import static java.time.ZoneOffset.UTC; import static java.util.stream.Collectors.toSet; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; @Test(singleThreaded = true) @@ -107,7 +109,7 @@ public void testCommit() List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); @@ -123,7 +125,7 @@ public void testRollback() List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); List shards = ImmutableList.of(shardInfo(UUID.randomUUID(), "node1")); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.rollbackTransaction(transactionId); @@ -141,24 +143,23 @@ public void testRollback() public void testAssignShard() { long tableId = createTable("test"); - OptionalInt bucketNumber = OptionalInt.empty(); UUID shard = UUID.randomUUID(); List shardNodes = ImmutableList.of(shardInfo(shard, "node1")); List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shardNodes, Optional.empty()); ShardNodes actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all())); - assertEquals(actual, new ShardNodes(shard, bucketNumber, ImmutableSet.of("node1"))); + assertEquals(actual, new ShardNodes(shard, ImmutableSet.of("node1"))); shardManager.assignShard(tableId, shard, "node2"); // assign shard to another node actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all())); - assertEquals(actual, new ShardNodes(shard, bucketNumber, ImmutableSet.of("node1", "node2"))); + assertEquals(actual, new ShardNodes(shard, ImmutableSet.of("node1", "node2"))); // assigning a shard should be idempotent shardManager.assignShard(tableId, shard, "node2"); @@ -167,7 +168,7 @@ public void testAssignShard() shardManager.unassignShard(tableId, shard, "node1"); actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all())); - assertEquals(actual, new ShardNodes(shard, bucketNumber, ImmutableSet.of("node2"))); + assertEquals(actual, new ShardNodes(shard, ImmutableSet.of("node2"))); // removing an assignment should be idempotent shardManager.unassignShard(tableId, shard, "node1"); @@ -186,22 +187,22 @@ public void testGetNodeBytes() new ShardInfo(shard2, bucketNumber, ImmutableSet.of("node1"), ImmutableList.of(), 5, 55, 555)); List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shardNodes, Optional.empty()); assertEquals(getShardNodes(tableId, TupleDomain.all()), ImmutableSet.of( - new ShardNodes(shard1, bucketNumber, ImmutableSet.of("node1")), - new ShardNodes(shard2, bucketNumber, ImmutableSet.of("node1")))); + new ShardNodes(shard1, ImmutableSet.of("node1")), + new ShardNodes(shard2, ImmutableSet.of("node1")))); assertEquals(shardManager.getNodeBytes(), ImmutableMap.of("node1", 88L)); shardManager.assignShard(tableId, shard1, "node2"); assertEquals(getShardNodes(tableId, TupleDomain.all()), ImmutableSet.of( - new ShardNodes(shard1, bucketNumber, ImmutableSet.of("node1", "node2")), - new ShardNodes(shard2, bucketNumber, ImmutableSet.of("node1")))); + new ShardNodes(shard1, ImmutableSet.of("node1", "node2")), + new ShardNodes(shard2, ImmutableSet.of("node1")))); assertEquals(shardManager.getNodeBytes(), ImmutableMap.of("node1", 88L, "node2", 33L)); } @@ -222,7 +223,7 @@ public void testGetNodeTableShards() inputShards.add(shardInfo(uuid, node)); } - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, inputShards.build(), Optional.empty()); @@ -250,7 +251,7 @@ public void testReplaceShardUuids() .add(shardInfo(originalUuids.get(2), nodes.get(2))) .build(); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, oldShards, Optional.empty()); @@ -277,8 +278,11 @@ public void testReplaceShardUuids() expectedAllUuids.addAll(expectedUuids); // check that shards are replaced in index table as well - Set shardNodes = ImmutableSet.copyOf(shardManager.getShardNodes(tableId, TupleDomain.all())); - Set actualAllUuids = shardNodes.stream().map(ShardNodes::getShardUuid).collect(toSet()); + Set shardNodes = ImmutableSet.copyOf(shardManager.getShardNodes(tableId, false, false, TupleDomain.all())); + Set actualAllUuids = shardNodes.stream() + .map(ShardNodes::getShardUuids) + .flatMap(Collection::stream) + .collect(toSet()); assertEquals(actualAllUuids, expectedAllUuids); // verify that conflicting updates are handled @@ -303,7 +307,7 @@ public void testExternalBatches() List shards = ImmutableList.of(shardInfo(UUID.randomUUID(), "node1")); List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, externalBatchId); @@ -320,6 +324,30 @@ public void testExternalBatches() } } + @Test + public void testEmptyTable() + { + long tableId = createTable("test"); + List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); + shardManager.createTable(tableId, columns, false); + + try (ResultIterator iterator = shardManager.getShardNodes(tableId, false, false, TupleDomain.all())) { + assertFalse(iterator.hasNext()); + } + } + + @Test + public void testEmptyTableBucketed() + { + long tableId = createTable("test"); + List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); + shardManager.createTable(tableId, columns, true); + + try (ResultIterator iterator = shardManager.getShardNodes(tableId, true, true, TupleDomain.all())) { + assertFalse(iterator.hasNext()); + } + } + @Test public void testShardPruning() throws Exception @@ -384,7 +412,7 @@ public void testShardPruning() RaptorColumnHandle c6 = new RaptorColumnHandle("raptor", "c6", 6, BOOLEAN); long tableId = createTable("test"); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); @@ -466,7 +494,7 @@ public void testShardPruningTruncatedValues() RaptorColumnHandle c1 = new RaptorColumnHandle("raptor", "c1", 1, VARCHAR); long tableId = createTable("test"); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); @@ -502,7 +530,7 @@ public void testShardPruningNoStats() List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); RaptorColumnHandle c1 = new RaptorColumnHandle("raptor", "c1", 1, BIGINT); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); @@ -513,7 +541,7 @@ public void testShardPruningNoStats() private Set getShardNodes(long tableId, TupleDomain predicate) { - try (ResultIterator iterator = shardManager.getShardNodes(tableId, predicate)) { + try (ResultIterator iterator = shardManager.getShardNodes(tableId, false, false, predicate)) { return ImmutableSet.copyOf(iterator); } } @@ -536,7 +564,7 @@ public static ShardInfo shardInfo(UUID shardUuid, String nodeId, List toShardNodes(List shards) { return shards.stream() - .map(shard -> new ShardNodes(shard.getShardUuid(), shard.getBucketNumber(), shard.getNodeIdentifiers())) + .map(shard -> new ShardNodes(shard.getShardUuid(), shard.getNodeIdentifiers())) .collect(toSet()); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardCompactionDiscovery.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardCompactionDiscovery.java index 7710e0d5ee23..1c38c77ca67e 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardCompactionDiscovery.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardCompactionDiscovery.java @@ -80,7 +80,7 @@ public void testTemporalShardDiscovery() { List columns = ImmutableList.of(new ColumnInfo(1, BIGINT), new ColumnInfo(2, BIGINT)); long tableId = createTable("test"); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); dbi.onDemand(MetadataDao.class).updateTemporalColumnId(1, 1); Set nonTimeRangeShards = ImmutableSet.builder() diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java index 8df4e3c608fb..839f821f2ea6 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java @@ -120,7 +120,7 @@ public void testEjector() long tableId = createTable("test"); List columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); - shardManager.createTable(tableId, columns); + shardManager.createTable(tableId, columns, false); long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableId, columns, shards, Optional.empty()); @@ -133,7 +133,7 @@ public void testEjector() ejector.process(); - shardManager.getShardNodes(tableId, TupleDomain.all()); + shardManager.getShardNodes(tableId, false, false, TupleDomain.all()); Set ejectedShards = shards.subList(0, 4).stream() .map(ShardInfo::getShardUuid)