Skip to content

Commit

Permalink
Do not store node assignments for bucketed shards
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jun 21, 2016
1 parent 4aebb7a commit b079a6d
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 44 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
boolean merged = bucketed && !table.isDelete() && (table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session)); boolean merged = bucketed && !table.isDelete() && (table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session));
OptionalLong transactionId = table.getTransactionId(); OptionalLong transactionId = table.getTransactionId();
Optional<Map<Integer, String>> bucketToNode = handle.getPartitioning().map(RaptorPartitioningHandle::getBucketToNode); Optional<Map<Integer, String>> bucketToNode = handle.getPartitioning().map(RaptorPartitioningHandle::getBucketToNode);
return new RaptorSplitSource(tableId, bucketed, merged, effectivePredicate, transactionId, bucketToNode); verify(bucketed == bucketToNode.isPresent(), "mismatched bucketCount and bucketToNode presence");
return new RaptorSplitSource(tableId, merged, effectivePredicate, transactionId, bucketToNode);
} }


private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers) private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
Expand Down Expand Up @@ -145,7 +146,6 @@ private class RaptorSplitSource


public RaptorSplitSource( public RaptorSplitSource(
long tableId, long tableId,
boolean bucketed,
boolean merged, boolean merged,
TupleDomain<RaptorColumnHandle> effectivePredicate, TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId, OptionalLong transactionId,
Expand All @@ -155,7 +155,15 @@ public RaptorSplitSource(
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null"); this.transactionId = requireNonNull(transactionId, "transactionId is null");
this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null"); this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null");
this.iterator = new SynchronizedResultIterator<>(shardManager.getShardNodes(tableId, bucketed, merged, effectivePredicate));
ResultIterator<BucketShards> iterator;
if (bucketToNode.isPresent()) {
iterator = shardManager.getShardNodesBucketed(tableId, merged, bucketToNode.get(), effectivePredicate);
}
else {
iterator = shardManager.getShardNodes(tableId, effectivePredicate);
}
this.iterator = new SynchronizedResultIterator<>(iterator);
} }


@Override @Override
Expand Down Expand Up @@ -238,26 +246,15 @@ private ConnectorSplit createSplit(BucketShards bucketShards)


private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shards) private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shards)
{ {
// Bucket splits contain all the shards for the bucket
// and run on the node assigned to the bucket.

String nodeId = bucketToNode.get().get(bucketNumber); String nodeId = bucketToNode.get().get(bucketNumber);
Node node = nodesById.get(nodeId); Node node = nodesById.get(nodeId);
if (node == null) { if (node == null) {
throw new PrestoException(NO_NODES_AVAILABLE, "Node for bucket is offline: " + nodeId); throw new PrestoException(NO_NODES_AVAILABLE, "Node for bucket is offline: " + nodeId);
} }


// Bucket splits contain all the shards for the bucket and run on
// the node assigned to the bucket. Reassign any of the bucket
// shards not currently assigned to the node for the bucket.
for (ShardNodes shard : shards) {
if (!shard.getNodeIdentifiers().contains(nodeId)) {
shardManager.assignShard(tableId, shard.getShardUuid(), nodeId, false);
}
for (String shardNodeId : shard.getNodeIdentifiers()) {
if (!shardNodeId.equals(nodeId)) {
shardManager.unassignShard(tableId, shard.getShardUuid(), shardNodeId);
}
}
}

Set<UUID> shardUuids = shards.stream() Set<UUID> shardUuids = shards.stream()
.map(ShardNodes::getShardUuid) .map(ShardNodes::getShardUuid)
.collect(toSet()); .collect(toSet());
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ public void createTable(long tableId, List<ColumnInfo> columns, boolean bucketed
" shard_id BIGINT NOT NULL,\n" + " shard_id BIGINT NOT NULL,\n" +
" shard_uuid BINARY(16) NOT NULL,\n" + " shard_uuid BINARY(16) NOT NULL,\n" +
" bucket_number INT NOT NULL\n," + " bucket_number INT NOT NULL\n," +
" node_ids VARBINARY(128) NOT NULL,\n" +
tableColumns + tableColumns +
" PRIMARY KEY (bucket_number, shard_uuid),\n" + " PRIMARY KEY (bucket_number, shard_uuid),\n" +
" UNIQUE (shard_id),\n" + " UNIQUE (shard_id),\n" +
Expand Down Expand Up @@ -402,11 +401,19 @@ private static void bindLongs(PreparedStatement statement, Iterable<Long> values
private static void insertShardsAndIndex(long tableId, List<ColumnInfo> columns, Collection<ShardInfo> shards, Map<String, Integer> nodeIds, Handle handle) private static void insertShardsAndIndex(long tableId, List<ColumnInfo> columns, Collection<ShardInfo> shards, Map<String, Integer> nodeIds, Handle handle)
throws SQLException throws SQLException
{ {
if (shards.isEmpty()) {
return;
}
boolean bucketed = shards.iterator().next().getBucketNumber().isPresent();

Connection connection = handle.getConnection(); Connection connection = handle.getConnection();
try (IndexInserter indexInserter = new IndexInserter(connection, tableId, columns)) { try (IndexInserter indexInserter = new IndexInserter(connection, tableId, columns)) {
for (List<ShardInfo> batch : partition(shards, batchSize(connection))) { for (List<ShardInfo> batch : partition(shards, batchSize(connection))) {
List<Long> shardIds = insertShards(connection, tableId, batch); List<Long> shardIds = insertShards(connection, tableId, batch);
insertShardNodes(connection, nodeIds, shardIds, batch);
if (!bucketed) {
insertShardNodes(connection, nodeIds, shardIds, batch);
}


for (int i = 0; i < batch.size(); i++) { for (int i = 0; i < batch.size(); i++) {
ShardInfo shard = batch.get(i); ShardInfo shard = batch.get(i);
Expand Down Expand Up @@ -448,9 +455,15 @@ public Set<ShardMetadata> getNodeShards(String nodeIdentifier)
} }


@Override @Override
public ResultIterator<BucketShards> getShardNodes(long tableId, boolean bucketed, boolean merged, TupleDomain<RaptorColumnHandle> effectivePredicate) public ResultIterator<BucketShards> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> effectivePredicate)
{
return new ShardIterator(tableId, false, Optional.empty(), effectivePredicate, dbi);
}

@Override
public ResultIterator<BucketShards> getShardNodesBucketed(long tableId, boolean merged, Map<Integer, String> bucketToNode, TupleDomain<RaptorColumnHandle> effectivePredicate)
{ {
return new ShardIterator(tableId, bucketed, merged, effectivePredicate, dbi); return new ShardIterator(tableId, merged, Optional.of(bucketToNode), effectivePredicate, dbi);
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ public IndexInserter(Connection connection, long tableId, List<ColumnInfo> colum
StringJoiner valueJoiner = new StringJoiner(", "); StringJoiner valueJoiner = new StringJoiner(", ");
int index = 1; int index = 1;


nameJoiner.add("shard_id").add("shard_uuid").add("node_ids"); nameJoiner.add("shard_id").add("shard_uuid");
valueJoiner.add("?").add("?").add("?"); valueJoiner.add("?").add("?").add("?");
index += 3; index += 3;


if (bucketed) { if (bucketed) {
nameJoiner.add("bucket_number"); nameJoiner.add("bucket_number");
valueJoiner.add("?"); }
index++; else {
nameJoiner.add("node_ids");
} }


for (ColumnInfo column : columns) { for (ColumnInfo column : columns) {
Expand Down Expand Up @@ -121,14 +122,14 @@ public void insert(long shardId, UUID shardUuid, OptionalInt bucketNumber, Set<I
{ {
statement.setLong(1, shardId); statement.setLong(1, shardId);
statement.setBytes(2, uuidToBytes(shardUuid)); statement.setBytes(2, uuidToBytes(shardUuid));
statement.setBytes(3, intArrayToBytes(nodeIds));


if (bucketed) { if (bucketed) {
checkArgument(bucketNumber.isPresent(), "shard bucket missing for bucketed table"); checkArgument(bucketNumber.isPresent(), "shard bucket missing for bucketed table");
statement.setInt(4, bucketNumber.getAsInt()); statement.setInt(3, bucketNumber.getAsInt());
} }
else { else {
checkArgument(!bucketNumber.isPresent(), "shard bucket present for non-bucketed table"); checkArgument(!bucketNumber.isPresent(), "shard bucket present for non-bucketed table");
statement.setBytes(3, intArrayToBytes(nodeIds));
} }


for (ColumnInfo column : columns) { for (ColumnInfo column : columns) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
Expand All @@ -51,23 +52,28 @@ final class ShardIterator
private static final Logger log = Logger.get(ShardIterator.class); private static final Logger log = Logger.get(ShardIterator.class);
private final Map<Integer, String> nodeMap = new HashMap<>(); private final Map<Integer, String> nodeMap = new HashMap<>();


private final boolean bucketed;
private final boolean merged; private final boolean merged;
private final Map<Integer, String> bucketToNode;
private final ShardDao dao; private final ShardDao dao;
private final Connection connection; private final Connection connection;
private final PreparedStatement statement; private final PreparedStatement statement;
private final ResultSet resultSet; private final ResultSet resultSet;
private boolean first = true; private boolean first = true;


public ShardIterator(long tableId, boolean bucketed, boolean merged, TupleDomain<RaptorColumnHandle> effectivePredicate, IDBI dbi) public ShardIterator(
long tableId,
boolean merged,
Optional<Map<Integer, String>> bucketToNode,
TupleDomain<RaptorColumnHandle> effectivePredicate,
IDBI dbi)
{ {
this.bucketed = bucketed;
this.merged = merged; this.merged = merged;
ShardPredicate predicate = ShardPredicate.create(effectivePredicate, bucketed); this.bucketToNode = bucketToNode.orElse(null);
ShardPredicate predicate = ShardPredicate.create(effectivePredicate, bucketToNode.isPresent());


String sql; String sql;
if (bucketed) { if (bucketToNode.isPresent()) {
sql = "SELECT shard_uuid, node_ids, bucket_number FROM %s WHERE %s ORDER BY bucket_number"; sql = "SELECT shard_uuid, bucket_number FROM %s WHERE %s ORDER BY bucket_number";
} }
else { else {
sql = "SELECT shard_uuid, node_ids FROM %s WHERE %s"; sql = "SELECT shard_uuid, node_ids FROM %s WHERE %s";
Expand Down Expand Up @@ -127,10 +133,20 @@ private BucketShards compute()
} }


UUID shardUuid = uuidFromBytes(resultSet.getBytes("shard_uuid")); UUID shardUuid = uuidFromBytes(resultSet.getBytes("shard_uuid"));
List<Integer> nodeIds = intArrayFromBytes(resultSet.getBytes("node_ids")); Set<String> nodeIdentifiers;
OptionalInt bucketNumber = bucketed ? OptionalInt.of(resultSet.getInt("bucket_number")) : OptionalInt.empty(); OptionalInt bucketNumber = OptionalInt.empty();


ShardNodes shard = new ShardNodes(shardUuid, getNodeIdentifiers(nodeIds, shardUuid)); if (bucketToNode != null) {
int bucket = resultSet.getInt("bucket_number");
bucketNumber = OptionalInt.of(bucket);
nodeIdentifiers = ImmutableSet.of(getBucketNode(bucket));
}
else {
List<Integer> nodeIds = intArrayFromBytes(resultSet.getBytes("node_ids"));
nodeIdentifiers = getNodeIdentifiers(nodeIds, shardUuid);
}

ShardNodes shard = new ShardNodes(shardUuid, nodeIdentifiers);
return new BucketShards(bucketNumber, ImmutableSet.of(shard)); return new BucketShards(bucketNumber, ImmutableSet.of(shard));
} }


Expand All @@ -155,8 +171,8 @@ private BucketShards computeMerged()


do { do {
UUID shardUuid = uuidFromBytes(resultSet.getBytes("shard_uuid")); UUID shardUuid = uuidFromBytes(resultSet.getBytes("shard_uuid"));
List<Integer> nodeIds = intArrayFromBytes(resultSet.getBytes("node_ids")); int bucket = resultSet.getInt("bucket_number");
Set<String> nodeIdentifiers = getNodeIdentifiers(nodeIds, shardUuid); Set<String> nodeIdentifiers = ImmutableSet.of(getBucketNode(bucket));


shards.add(new ShardNodes(shardUuid, nodeIdentifiers)); shards.add(new ShardNodes(shardUuid, nodeIdentifiers));
} }
Expand All @@ -165,6 +181,15 @@ private BucketShards computeMerged()
return new BucketShards(OptionalInt.of(bucketNumber), shards.build()); return new BucketShards(OptionalInt.of(bucketNumber), shards.build());
} }


private String getBucketNode(int bucket)
{
String node = bucketToNode.get(bucket);
if (node == null) {
throw new PrestoException(RAPTOR_ERROR, "No node mapping for bucket: " + bucket);
}
return node;
}

private Set<String> getNodeIdentifiers(List<Integer> nodeIds, UUID shardUuid) private Set<String> getNodeIdentifiers(List<Integer> nodeIds, UUID shardUuid)
{ {
Function<Integer, String> fetchNode = id -> fetchNode(id, shardUuid); Function<Integer, String> fetchNode = id -> fetchNode(id, shardUuid);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ public interface ShardManager
Set<ShardMetadata> getNodeShards(String nodeIdentifier); Set<ShardMetadata> getNodeShards(String nodeIdentifier);


/** /**
* Return the shard nodes a given table. * Return the shard nodes for a non-bucketed table.
*/ */
ResultIterator<BucketShards> getShardNodes(long tableId, boolean bucketed, boolean merged, TupleDomain<RaptorColumnHandle> effectivePredicate); ResultIterator<BucketShards> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> effectivePredicate);

/**
* Return the shard nodes for a bucketed table.
*/
ResultIterator<BucketShards> getShardNodesBucketed(long tableId, boolean merged, Map<Integer, String> bucketToNode, TupleDomain<RaptorColumnHandle> effectivePredicate);


/** /**
* Assign a shard to a node. * Assign a shard to a node.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void testReplaceShardUuids()
expectedAllUuids.addAll(expectedUuids); expectedAllUuids.addAll(expectedUuids);


// check that shards are replaced in index table as well // check that shards are replaced in index table as well
Set<BucketShards> shardNodes = ImmutableSet.copyOf(shardManager.getShardNodes(tableId, false, false, TupleDomain.all())); Set<BucketShards> shardNodes = ImmutableSet.copyOf(shardManager.getShardNodes(tableId, TupleDomain.all()));
Set<UUID> actualAllUuids = shardNodes.stream() Set<UUID> actualAllUuids = shardNodes.stream()
.map(BucketShards::getShards) .map(BucketShards::getShards)
.flatMap(Collection::stream) .flatMap(Collection::stream)
Expand Down Expand Up @@ -391,7 +391,7 @@ public void testEmptyTable()
List<ColumnInfo> columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); List<ColumnInfo> columns = ImmutableList.of(new ColumnInfo(1, BIGINT));
shardManager.createTable(tableId, columns, false); shardManager.createTable(tableId, columns, false);


try (ResultIterator<BucketShards> iterator = shardManager.getShardNodes(tableId, false, false, TupleDomain.all())) { try (ResultIterator<BucketShards> iterator = shardManager.getShardNodes(tableId, TupleDomain.all())) {
assertFalse(iterator.hasNext()); assertFalse(iterator.hasNext());
} }
} }
Expand All @@ -403,7 +403,7 @@ public void testEmptyTableBucketed()
List<ColumnInfo> columns = ImmutableList.of(new ColumnInfo(1, BIGINT)); List<ColumnInfo> columns = ImmutableList.of(new ColumnInfo(1, BIGINT));
shardManager.createTable(tableId, columns, true); shardManager.createTable(tableId, columns, true);


try (ResultIterator<BucketShards> iterator = shardManager.getShardNodes(tableId, true, true, TupleDomain.all())) { try (ResultIterator<BucketShards> iterator = shardManager.getShardNodesBucketed(tableId, true, ImmutableMap.of(), TupleDomain.all())) {
assertFalse(iterator.hasNext()); assertFalse(iterator.hasNext());
} }
} }
Expand Down Expand Up @@ -601,7 +601,7 @@ public void testShardPruningNoStats()


private Set<ShardNodes> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> predicate) private Set<ShardNodes> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> predicate)
{ {
try (ResultIterator<BucketShards> iterator = shardManager.getShardNodes(tableId, false, false, predicate)) { try (ResultIterator<BucketShards> iterator = shardManager.getShardNodes(tableId, predicate)) {
return ImmutableSet.copyOf(concat(transform(iterator, i -> i.getShards().iterator()))); return ImmutableSet.copyOf(concat(transform(iterator, i -> i.getShards().iterator())));
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testEjector()


ejector.process(); ejector.process();


shardManager.getShardNodes(tableId, false, false, TupleDomain.all()); shardManager.getShardNodes(tableId, TupleDomain.all());


Set<UUID> ejectedShards = shards.subList(0, 4).stream() Set<UUID> ejectedShards = shards.subList(0, 4).stream()
.map(ShardInfo::getShardUuid) .map(ShardInfo::getShardUuid)
Expand Down

0 comments on commit b079a6d

Please sign in to comment.