Skip to content

Commit

Permalink
Add delete support to Raptor
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jun 4, 2015
1 parent cd0258b commit 790c4c3
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 25 deletions.
Expand Up @@ -20,7 +20,7 @@

import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.Preconditions.checkNotNull;

public final class RaptorColumnHandle
Expand All @@ -29,6 +29,10 @@ public final class RaptorColumnHandle
// This is intentionally not named "$sampleWeight" because column names are lowercase and case insensitive
public static final String SAMPLE_WEIGHT_COLUMN_NAME = "$sample_weight";

// Generated rowId column for updates
private static final long SHARD_ROW_ID_COLUMN_ID = -1;
private static final String SHARD_ROW_ID_COLUMN_NAME = "$shard_row_id";

private final String connectorId;
private final String columnName;
private final long columnId;
Expand All @@ -43,7 +47,6 @@ public RaptorColumnHandle(
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.columnName = checkNotNull(columnName, "columnName is null");
checkArgument(columnId > 0, "columnId must be greater than zero");
this.columnId = columnId;
this.columnType = checkNotNull(columnType, "columnType is null");
}
Expand Down Expand Up @@ -96,4 +99,19 @@ public int hashCode()
{
return Objects.hash(columnId);
}

public boolean isShardRowId()
{
return isShardRowIdColumn(columnId);
}

public static boolean isShardRowIdColumn(long columnId)
{
return columnId == SHARD_ROW_ID_COLUMN_ID;
}

public static RaptorColumnHandle shardRowIdHandle(String connectorId)
{
return new RaptorColumnHandle(connectorId, SHARD_ROW_ID_COLUMN_NAME, SHARD_ROW_ID_COLUMN_ID, BIGINT);
}
}
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.raptor.metadata.ForMetadata;
import com.facebook.presto.raptor.metadata.MetadataDao;
import com.facebook.presto.raptor.metadata.MetadataDaoUtils;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.Table;
Expand All @@ -38,6 +39,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
Expand All @@ -51,9 +53,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Predicate;

import static com.facebook.presto.raptor.RaptorColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME;
import static com.facebook.presto.raptor.RaptorColumnHandle.shardRowIdHandle;
import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.raptor.metadata.MetadataDaoUtils.createMetadataTablesWithRetry;
import static com.facebook.presto.raptor.util.Types.checkType;
Expand All @@ -74,10 +78,16 @@ public class RaptorMetadata
private final MetadataDao dao;
private final ShardManager shardManager;
private final JsonCodec<ShardInfo> shardInfoCodec;
private final JsonCodec<ShardDelta> shardDeltaCodec;
private final String connectorId;

@Inject
public RaptorMetadata(RaptorConnectorId connectorId, @ForMetadata IDBI dbi, ShardManager shardManager, JsonCodec<ShardInfo> shardInfoCodec)
public RaptorMetadata(
RaptorConnectorId connectorId,
@ForMetadata IDBI dbi,
ShardManager shardManager,
JsonCodec<ShardInfo> shardInfoCodec,
JsonCodec<ShardDelta> shardDeltaCodec)
{
checkNotNull(connectorId, "connectorId is null");

Expand All @@ -86,6 +96,7 @@ public RaptorMetadata(RaptorConnectorId connectorId, @ForMetadata IDBI dbi, Shar
this.dao = dbi.onDemand(MetadataDao.class);
this.shardManager = checkNotNull(shardManager, "shardManager is null");
this.shardInfoCodec = checkNotNull(shardInfoCodec, "shardInfoCodec is null");
this.shardDeltaCodec = checkNotNull(shardDeltaCodec, "shardDeltaCodec is null");

createMetadataTablesWithRetry(dao);
}
Expand Down Expand Up @@ -185,8 +196,13 @@ public boolean canCreateSampledTables(ConnectorSession session)
public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
long tableId = checkType(tableHandle, RaptorTableHandle.class, "tableHandle").getTableId();
long columnId = checkType(columnHandle, RaptorColumnHandle.class, "columnHandle").getColumnId();
RaptorColumnHandle column = checkType(columnHandle, RaptorColumnHandle.class, "columnHandle");

if (column.isShardRowId()) {
return new ColumnMetadata(column.getColumnName(), column.getColumnType(), false, null, true);
}

long columnId = column.getColumnId();
TableColumn tableColumn = dao.getTableColumn(tableId, columnId);
if (tableColumn == null) {
throw new PrestoException(NOT_FOUND, format("Column ID %s does not exist for table ID %s", columnId, tableId));
Expand Down Expand Up @@ -341,6 +357,40 @@ public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<Sli
shardManager.commitShards(tableId, columns, parseFragments(fragments), externalBatchId);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorTableHandle tableHandle)
{
return shardRowIdHandle(connectorId);
}

@Override
public ConnectorTableHandle beginDelete(ConnectorTableHandle tableHandle)
{
return tableHandle;
}

@Override
public void commitDelete(ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
long tableId = checkType(tableHandle, RaptorTableHandle.class, "tableHandle").getTableId();

List<ColumnInfo> columns = getColumnHandles(tableHandle).values().stream()
.map(handle -> checkType(handle, RaptorColumnHandle.class, "columnHandle"))
.map(ColumnInfo::fromHandle).collect(toList());

ImmutableSet.Builder<UUID> oldShardUuids = ImmutableSet.builder();
ImmutableList.Builder<ShardInfo> newShards = ImmutableList.builder();

fragments.stream()
.map(fragment -> shardDeltaCodec.fromJson(fragment.getBytes()))
.forEach(delta -> {
oldShardUuids.addAll(delta.getOldShardUuids());
newShards.addAll(delta.getNewShards());
});

shardManager.replaceShardUuids(tableId, columns, oldShardUuids.build(), newShards.build());
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace)
{
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.raptor.metadata.DatabaseShardManager;
import com.facebook.presto.raptor.metadata.ForMetadata;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.TableColumn;
Expand Down Expand Up @@ -56,6 +57,7 @@ public void configure(Binder binder)
binder.bind(ShardManager.class).to(DatabaseShardManager.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(ShardInfo.class);
jsonCodecBinder(binder).bindJsonCodec(ShardDelta.class);
}

@ForMetadata
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.raptor.RaptorColumnHandle;
import com.facebook.presto.raptor.util.CloseableIterator;
import com.facebook.presto.raptor.util.UuidUtil.UuidArgument;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TupleDomain;
import com.facebook.presto.spi.type.Type;
Expand All @@ -30,9 +31,11 @@
import io.airlift.log.Logger;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.LongMapper;

import javax.inject.Inject;

Expand Down Expand Up @@ -60,6 +63,7 @@
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagateIfInstanceOf;
import static com.google.common.collect.Iterables.partition;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.nCopies;
Expand Down Expand Up @@ -158,6 +162,40 @@ public void replaceShardIds(long tableId, List<ColumnInfo> columns, Set<Long> ol
});
}

@Override
public void replaceShardUuids(long tableId, List<ColumnInfo> columns, Set<UUID> oldShardUuids, Collection<ShardInfo> newShards)
{
Map<String, Integer> nodeIds = toNodeIdMap(newShards);

runTransaction((handle, status) -> {
ShardManagerDao dao = handle.attach(ShardManagerDao.class);
for (List<ShardInfo> shards : partition(newShards, 1000)) {
insertShardsAndIndex(tableId, columns, shards, nodeIds, handle, dao);
}
for (List<UUID> uuids : partition(oldShardUuids, 1000)) {
Set<Long> ids = getShardIds(handle, ImmutableSet.copyOf(uuids));
if (ids.size() != uuids.size()) {
throw new PrestoException(TRANSACTION_CONFLICT, "Shard was updated by a different transaction. Please retry the operation.");
}
deleteShardsAndIndex(tableId, ids, handle);
}
return null;
});
}

private static Set<Long> getShardIds(Handle handle, Set<UUID> shardUuids)
{
String args = Joiner.on(",").join(nCopies(shardUuids.size(), "?"));
String sql = "SELECT shard_id FROM shards WHERE shard_uuid IN (" + args + ")";
Query<Map<String, Object>> query = handle.createQuery(sql);
int i = 0;
for (UUID uuid : shardUuids) {
query.bind(i, new UuidArgument(uuid));
i++;
}
return ImmutableSet.copyOf(query.map(LongMapper.FIRST).list());
}

private static void deleteShardsAndIndex(long tableId, Set<Long> shardIds, Handle handle)
throws SQLException
{
Expand Down
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.metadata;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.UUID;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class ShardDelta
{
private final List<UUID> oldShardUuids;
private final List<ShardInfo> newShards;

public ShardDelta(
@JsonProperty("oldShardUuids") List<UUID> oldShardUuids,
@JsonProperty("newShards") List<ShardInfo> newShards)
{
this.oldShardUuids = ImmutableList.copyOf(requireNonNull(oldShardUuids, "oldShardUuids is null"));
this.newShards = ImmutableList.copyOf(requireNonNull(newShards, "newShards is null"));
}

@JsonProperty
public List<UUID> getOldShardUuids()
{
return oldShardUuids;
}

@JsonProperty
public List<ShardInfo> getNewShards()
{
return newShards;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("oldShardUuids", oldShardUuids)
.add("newShards", newShards)
.toString();
}
}
Expand Up @@ -40,6 +40,11 @@ public interface ShardManager
*/
void replaceShardIds(long tableId, List<ColumnInfo> columns, Set<Long> oldShardIds, Collection<ShardInfo> newShards);

/**
* Replace oldShardsUuids with newShards.
*/
void replaceShardUuids(long tableId, List<ColumnInfo> columns, Set<UUID> oldShardUuids, Collection<ShardInfo> newShards);

/**
* Get shard metadata for table shards on a given node.
*/
Expand Down

0 comments on commit 790c4c3

Please sign in to comment.