Skip to content

Commit

Permalink
Use single split per bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Feb 3, 2016
1 parent ed6685f commit f0b17d6
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 94 deletions.
Expand Up @@ -166,7 +166,8 @@ private ConnectorTableHandle getTableHandle(SchemaTableName tableName)
table.getDistributionId(),
table.getBucketCount(),
OptionalLong.empty(),
Optional.ofNullable(sampleWeightColumnHandle));
Optional.ofNullable(sampleWeightColumnHandle),
false);
}

@Override
Expand Down Expand Up @@ -571,7 +572,7 @@ public void finishCreateTable(ConnectorSession session, ConnectorOutputTableHand
List<ColumnInfo> columns = table.getColumnHandles().stream().map(ColumnInfo::fromHandle).collect(toList());

// TODO: refactor this to avoid creating an empty table on failure
shardManager.createTable(newTableId, columns);
shardManager.createTable(newTableId, columns, table.getBucketCount().isPresent());
shardManager.commitShards(transactionId, newTableId, columns, parseFragments(fragments), Optional.empty());

clearRollback();
Expand Down Expand Up @@ -660,7 +661,8 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
handle.getDistributionId(),
handle.getBucketCount(),
OptionalLong.of(transactionId),
handle.getSampleWeightColumnHandle());
handle.getSampleWeightColumnHandle(),
true);
}

@Override
Expand Down
Expand Up @@ -15,18 +15,22 @@

import com.facebook.presto.raptor.storage.ReaderAttributes;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.util.ConcatPageSource;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;

import javax.inject.Inject;

import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.function.Function;

Expand All @@ -50,20 +54,36 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
{
RaptorSplit raptorSplit = checkType(split, RaptorSplit.class, "split");

UUID shardUuid = raptorSplit.getShardUuid();
OptionalInt bucketNumber = raptorSplit.getBucketNumber();
TupleDomain<RaptorColumnHandle> predicate = raptorSplit.getEffectivePredicate();
ReaderAttributes attributes = ReaderAttributes.from(session);
OptionalLong transactionId = raptorSplit.getTransactionId();

if (raptorSplit.getShardUuids().size() == 1) {
UUID shardUuid = raptorSplit.getShardUuids().iterator().next();
return createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId);
}

Iterator<ConnectorPageSource> iterator = raptorSplit.getShardUuids().stream()
.map(shardUuid -> createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId))
.iterator();

return new ConcatPageSource(iterator);
}

private ConnectorPageSource createPageSource(
UUID shardUuid,
OptionalInt bucketNumber,
List<ColumnHandle> columns,
TupleDomain<RaptorColumnHandle> predicate,
ReaderAttributes attributes,
OptionalLong transactionId)
{
List<RaptorColumnHandle> columnHandles = columns.stream().map(toRaptorColumnHandle()).collect(toList());
List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList());

return storageManager.getPageSource(
shardUuid,
bucketNumber,
columnIds,
columnTypes,
raptorSplit.getEffectivePredicate(),
ReaderAttributes.from(session),
raptorSplit.getTransactionId());
return storageManager.getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId);
}

private static Function<ColumnHandle, RaptorColumnHandle> toRaptorColumnHandle()
Expand Down
Expand Up @@ -19,10 +19,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -32,7 +34,7 @@ public class RaptorSplit
implements ConnectorSplit
{
private final String connectorId;
private final UUID shardUuid;
private final Set<UUID> shardUuids;
private final OptionalInt bucketNumber;
private final List<HostAddress> addresses;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;
Expand All @@ -41,24 +43,45 @@ public class RaptorSplit
@JsonCreator
public RaptorSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("shardUuid") UUID shardUuid,
@JsonProperty("shardUuids") Set<UUID> shardUuids,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("effectivePredicate") TupleDomain<RaptorColumnHandle> effectivePredicate,
@JsonProperty("transactionId") OptionalLong transactionId)
{
this(connectorId, shardUuid, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId);
this(connectorId, shardUuids, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId);
}

public RaptorSplit(
String connectorId,
UUID shardUuid,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this(connectorId, ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, effectivePredicate, transactionId);
}

public RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
int bucketNumber,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this(connectorId, shardUuids, OptionalInt.of(bucketNumber), addresses, effectivePredicate, transactionId);
}

private RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
OptionalInt bucketNumber,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.shardUuid = requireNonNull(shardUuid, "shardUuid is null");
this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuid is null"));
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
Expand All @@ -84,9 +107,9 @@ public String getConnectorId()
}

@JsonProperty
public UUID getShardUuid()
public Set<UUID> getShardUuids()
{
return shardUuid;
return shardUuids;
}

@JsonProperty
Expand Down Expand Up @@ -117,7 +140,7 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
.add("shardUuid", shardUuid)
.add("shardUuids", shardUuids)
.add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null)
.add("hosts", addresses)
.omitNullValues()
Expand Down
Expand Up @@ -38,7 +38,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
Expand All @@ -51,6 +50,7 @@
import static com.facebook.presto.raptor.util.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Maps.uniqueIndex;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.lang.String.format;
Expand Down Expand Up @@ -94,7 +94,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
RaptorTableLayoutHandle handle = checkType(layout, RaptorTableLayoutHandle.class, "layout");
RaptorTableHandle table = handle.getTable();
TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint());
return new RaptorSplitSource(table.getTableId(), effectivePredicate, table.getTransactionId());
long tableId = table.getTableId();
boolean bucketed = table.getBucketCount().isPresent();
boolean merged = bucketed && !table.isDelete();
OptionalLong transactionId = table.getTransactionId();
return new RaptorSplitSource(tableId, bucketed, merged, effectivePredicate, transactionId);
}

private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
Expand Down Expand Up @@ -133,12 +137,17 @@ private class RaptorSplitSource
@GuardedBy("this")
private CompletableFuture<List<ConnectorSplit>> future;

public RaptorSplitSource(long tableId, TupleDomain<RaptorColumnHandle> effectivePredicate, OptionalLong transactionId)
public RaptorSplitSource(
long tableId,
boolean bucketed,
boolean merged,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this.tableId = tableId;
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
this.iterator = new SynchronizedResultIterator<>(shardManager.getShardNodes(tableId, effectivePredicate));
this.iterator = new SynchronizedResultIterator<>(shardManager.getShardNodes(tableId, bucketed, merged, effectivePredicate));
}

@Override
Expand Down Expand Up @@ -190,19 +199,26 @@ private Supplier<List<ConnectorSplit>> batchSupplier(int maxSize)

private ConnectorSplit createSplit(ShardNodes shard)
{
UUID shardId = shard.getShardUuid();
OptionalInt bucketNumber = shard.getBucketNumber();
Set<UUID> shardUuids = shard.getShardUuids();
Collection<String> nodeIds = shard.getNodeIdentifiers();

List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds);

if (shard.getBucketNumber().isPresent()) {
int bucketNumber = shard.getBucketNumber().getAsInt();
if (addresses.isEmpty()) {
// TODO: reassign
throw new PrestoException(NO_NODES_AVAILABLE, "Reassignment not yet implemented");
}
return new RaptorSplit(connectorId, shardUuids, bucketNumber, addresses, effectivePredicate, transactionId);
}

verify(shardUuids.size() == 1, "wrong shard count for non-bucketed table: %s", shardUuids.size());
UUID shardId = shardUuids.iterator().next();

if (addresses.isEmpty()) {
if (!backupAvailable) {
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds));
}
if (bucketNumber.isPresent()) {
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, "No host for shard of bucketed table");
}

// Pick a random node and optimistically assign the shard to it.
// That node will restore the shard from the backup location.
Expand All @@ -215,7 +231,7 @@ private ConnectorSplit createSplit(ShardNodes shard)
addresses = ImmutableList.of(node.getHostAndPort());
}

return new RaptorSplit(connectorId, shardId, bucketNumber, addresses, effectivePredicate, transactionId);
return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId);
}
}
}
Expand Up @@ -38,6 +38,7 @@ public final class RaptorTableHandle
private final OptionalInt bucketCount;
private final OptionalLong transactionId;
private final Optional<RaptorColumnHandle> sampleWeightColumnHandle;
private final boolean delete;

@JsonCreator
public RaptorTableHandle(
Expand All @@ -48,7 +49,8 @@ public RaptorTableHandle(
@JsonProperty("distributionId") OptionalLong distributionId,
@JsonProperty("bucketCount") OptionalInt bucketCount,
@JsonProperty("transactionId") OptionalLong transactionId,
@JsonProperty("sampleWeightColumnHandle") Optional<RaptorColumnHandle> sampleWeightColumnHandle)
@JsonProperty("sampleWeightColumnHandle") Optional<RaptorColumnHandle> sampleWeightColumnHandle,
@JsonProperty("delete") boolean delete)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.schemaName = checkSchemaName(schemaName);
Expand All @@ -61,6 +63,8 @@ public RaptorTableHandle(
this.distributionId = requireNonNull(distributionId, "distributionId is null");
this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");

this.delete = delete;
}

@JsonProperty
Expand Down Expand Up @@ -111,6 +115,12 @@ public Optional<RaptorColumnHandle> getSampleWeightColumnHandle()
return sampleWeightColumnHandle;
}

@JsonProperty
public boolean isDelete()
{
return delete;
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -118,7 +118,7 @@ public DatabaseShardManager(@ForMetadata IDBI dbi, NodeSupplier nodeSupplier)
}

@Override
public void createTable(long tableId, List<ColumnInfo> columns)
public void createTable(long tableId, List<ColumnInfo> columns, boolean bucketed)
{
StringJoiner tableColumns = new StringJoiner(",\n ", " ", ",\n").setEmptyValue("");

Expand All @@ -130,15 +130,30 @@ public void createTable(long tableId, List<ColumnInfo> columns)
}
}

String sql = "" +
"CREATE TABLE " + shardIndexTable(tableId) + " (\n" +
" shard_id BIGINT NOT NULL PRIMARY KEY,\n" +
" shard_uuid BINARY(16) NOT NULL,\n" +
" node_ids VARBINARY(128) NOT NULL,\n" +
" bucket_number INT,\n" +
tableColumns +
" UNIQUE (shard_uuid)\n" +
")";
String sql;
if (bucketed) {
sql = "" +
"CREATE TABLE " + shardIndexTable(tableId) + " (\n" +
" shard_id BIGINT NOT NULL,\n" +
" shard_uuid BINARY(16) NOT NULL,\n" +
" bucket_number INT NOT NULL\n," +
" node_ids VARBINARY(128) NOT NULL,\n" +
tableColumns +
" PRIMARY KEY (bucket_number, shard_uuid),\n" +
" UNIQUE (shard_id),\n" +
" UNIQUE (shard_uuid)\n" +
")";
}
else {
sql = "" +
"CREATE TABLE " + shardIndexTable(tableId) + " (\n" +
" shard_id BIGINT NOT NULL PRIMARY KEY,\n" +
" shard_uuid BINARY(16) NOT NULL,\n" +
" node_ids VARBINARY(128) NOT NULL,\n" +
tableColumns +
" UNIQUE (shard_uuid)\n" +
")";
}

try (Handle handle = dbi.open()) {
handle.execute(sql);
Expand Down Expand Up @@ -361,9 +376,9 @@ public Set<ShardMetadata> getNodeShards(String nodeIdentifier)
}

@Override
public ResultIterator<ShardNodes> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> effectivePredicate)
public ResultIterator<ShardNodes> getShardNodes(long tableId, boolean bucketed, boolean merged, TupleDomain<RaptorColumnHandle> effectivePredicate)
{
return new ShardIterator(tableId, effectivePredicate, dbi);
return new ShardIterator(tableId, bucketed, merged, effectivePredicate, dbi);
}

@Override
Expand Down

0 comments on commit f0b17d6

Please sign in to comment.