Skip to content

Commit

Permalink
Categorize Raptor metadata database errors
Browse files Browse the repository at this point in the history
This also uses explicit connection management for JDBI SQL objects.
  • Loading branch information
electrum committed Oct 5, 2015
1 parent d6f5ef9 commit 357f579
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 93 deletions.
Expand Up @@ -23,7 +23,8 @@ public enum RaptorErrorCode
RAPTOR_EXTERNAL_BATCH_ALREADY_EXISTS(0x0300_0001),
RAPTOR_NO_HOST_FOR_SHARD(0x0300_0002),
RAPTOR_RECOVERY_ERROR(0x0300_0003),
RAPTOR_BACKUP_TIMEOUT(0x0300_0004);
RAPTOR_BACKUP_TIMEOUT(0x0300_0004),
RAPTOR_METADATA_ERROR(0x0300_0005);

private final ErrorCode errorCode;

Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand All @@ -72,6 +71,8 @@
import static com.facebook.presto.raptor.RaptorTableProperties.getTemporalColumn;
import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable;
import static com.facebook.presto.raptor.metadata.MetadataDaoUtils.createMetadataTablesWithRetry;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
import static com.facebook.presto.raptor.util.DatabaseUtil.runTransaction;
import static com.facebook.presto.raptor.util.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
Expand Down Expand Up @@ -112,12 +113,12 @@ public RaptorMetadata(

this.connectorId = connectorId.toString();
this.dbi = requireNonNull(dbi, "dbi is null");
this.dao = dbi.onDemand(MetadataDao.class);
this.dao = onDemandDao(dbi, MetadataDao.class);
this.shardManager = requireNonNull(shardManager, "shardManager is null");
this.shardInfoCodec = requireNonNull(shardInfoCodec, "shardInfoCodec is null");
this.shardDeltaCodec = requireNonNull(shardDeltaCodec, "shardDeltaCodec is null");

createMetadataTablesWithRetry(dao);
createMetadataTablesWithRetry(dbi);
}

@Override
Expand Down Expand Up @@ -257,7 +258,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
RaptorTableHandle raptorHandle = checkType(tableHandle, RaptorTableHandle.class, "tableHandle");
long tableId = raptorHandle.getTableId();
dbi.inTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
ShardManagerDao shardManagerDao = handle.attach(ShardManagerDao.class);
shardManagerDao.dropShardNodes(tableId);
shardManagerDao.dropShards(tableId);
Expand All @@ -282,7 +283,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
RaptorTableHandle table = checkType(tableHandle, RaptorTableHandle.class, "tableHandle");
dbi.inTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
MetadataDao dao = handle.attach(MetadataDao.class);
dao.renameTable(table.getTableId(), newTableName.getSchemaName(), newTableName.getTableName());
return null;
Expand Down Expand Up @@ -374,7 +375,7 @@ public void commitCreateTable(ConnectorSession session, ConnectorOutputTableHand
}
}

long newTableId = dbi.inTransaction((dbiHandle, status) -> {
long newTableId = runTransaction(dbi, (dbiHandle, status) -> {
MetadataDao dao = dbiHandle.attach(MetadataDao.class);
long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true);
List<RaptorColumnHandle> sortColumnHandles = table.getSortColumnHandles();
Expand Down Expand Up @@ -486,7 +487,7 @@ public void createView(ConnectorSession session, SchemaTableName viewName, Strin
String tableName = viewName.getTableName();

if (replace) {
dbi.inTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
MetadataDao dao = handle.attach(MetadataDao.class);
dao.dropView(schemaName, tableName);
dao.insertView(schemaName, tableName, viewData);
Expand All @@ -498,7 +499,7 @@ public void createView(ConnectorSession session, SchemaTableName viewName, Strin
try {
dao.insertView(schemaName, tableName, viewData);
}
catch (UnableToExecuteStatementException e) {
catch (PrestoException e) {
if (viewExists(session, viewName)) {
throw new PrestoException(ALREADY_EXISTS, "View already exists: " + viewName);
}
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.LongMapper;
Expand Down Expand Up @@ -63,11 +62,13 @@
import static com.facebook.presto.raptor.storage.ShardStats.MAX_BINARY_INDEX_SIZE;
import static com.facebook.presto.raptor.util.ArrayUtil.intArrayFromBytes;
import static com.facebook.presto.raptor.util.ArrayUtil.intArrayToBytes;
import static com.facebook.presto.raptor.util.DatabaseUtil.metadataError;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
import static com.facebook.presto.raptor.util.DatabaseUtil.runTransaction;
import static com.facebook.presto.raptor.util.UuidUtil.uuidToBytes;
import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.propagateIfInstanceOf;
import static com.google.common.collect.Iterables.partition;
import static java.lang.String.format;
import static java.sql.Statement.RETURN_GENERATED_KEYS;
Expand Down Expand Up @@ -99,10 +100,10 @@ public Integer load(String nodeIdentifier)
public DatabaseShardManager(@ForMetadata IDBI dbi)
{
this.dbi = requireNonNull(dbi, "dbi is null");
this.dao = dbi.onDemand(ShardManagerDao.class);
this.dao = onDemandDao(dbi, ShardManagerDao.class);

// keep retrying if database is unavailable when the server starts
createShardTablesWithRetry(dao);
createShardTablesWithRetry(dbi);
}

@Override
Expand Down Expand Up @@ -130,6 +131,9 @@ public void createTable(long tableId, List<ColumnInfo> columns)
try (Handle handle = dbi.open()) {
handle.execute(sql);
}
catch (DBIException e) {
throw metadataError(e);
}
}

@Override
Expand All @@ -142,7 +146,7 @@ public void commitShards(long tableId, List<ColumnInfo> columns, Collection<Shar

Map<String, Integer> nodeIds = toNodeIdMap(shards);

dbi.inTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
ShardManagerDao dao = handle.attach(ShardManagerDao.class);

insertShardsAndIndex(tableId, columns, shards, nodeIds, handle);
Expand All @@ -159,7 +163,7 @@ public void replaceShardIds(long tableId, List<ColumnInfo> columns, Set<Long> ol
{
Map<String, Integer> nodeIds = toNodeIdMap(newShards);

runTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
insertShardsAndIndex(tableId, columns, newShards, nodeIds, handle);
deleteShardsAndIndex(tableId, oldShardIds, handle);
return null;
Expand All @@ -171,7 +175,7 @@ public void replaceShardUuids(long tableId, List<ColumnInfo> columns, Set<UUID>
{
Map<String, Integer> nodeIds = toNodeIdMap(newShards);

runTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
for (List<ShardInfo> shards : partition(newShards, 1000)) {
insertShardsAndIndex(tableId, columns, shards, nodeIds, handle);
}
Expand Down Expand Up @@ -287,7 +291,7 @@ public void assignShard(long tableId, UUID shardUuid, String nodeIdentifier)
{
int nodeId = getOrCreateNodeId(nodeIdentifier);

runTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
ShardManagerDao dao = handle.attach(ShardManagerDao.class);

Set<Integer> nodes = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid));
Expand All @@ -305,7 +309,7 @@ public void unassignShard(long tableId, UUID shardUuid, String nodeIdentifier)
{
int nodeId = getOrCreateNodeId(nodeIdentifier);

runTransaction((handle, status) -> {
runTransaction(dbi, (handle, status) -> {
ShardManagerDao dao = handle.attach(ShardManagerDao.class);

Set<Integer> nodes = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid));
Expand Down Expand Up @@ -339,16 +343,8 @@ public Map<String, Long> getNodeBytes()
})
.build();
}
}

private <T> T runTransaction(TransactionCallback<T> callback)
{
try {
return dbi.inTransaction(callback);
}
catch (DBIException e) {
propagateIfInstanceOf(e.getCause(), PrestoException.class);
throw new PrestoException(RAPTOR_ERROR, "Failed to perform metadata operation", e);
throw metadataError(e);
}
}

Expand Down
Expand Up @@ -16,6 +16,8 @@
import com.google.common.base.Throwables;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;

import java.util.concurrent.TimeUnit;
Expand All @@ -26,12 +28,12 @@ public final class MetadataDaoUtils

private MetadataDaoUtils() {}

public static void createMetadataTablesWithRetry(MetadataDao dao)
public static void createMetadataTablesWithRetry(IDBI dbi)
{
Duration delay = new Duration(10, TimeUnit.SECONDS);
while (true) {
try {
createMetadataTables(dao);
try (Handle handle = dbi.open()) {
createMetadataTables(handle.attach(MetadataDao.class));
return;
}
catch (UnableToObtainConnectionException e) {
Expand Down
Expand Up @@ -20,7 +20,6 @@
import io.airlift.log.Logger;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.exceptions.DBIException;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -37,6 +36,8 @@
import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable;
import static com.facebook.presto.raptor.util.ArrayUtil.intArrayFromBytes;
import static com.facebook.presto.raptor.util.DatabaseUtil.metadataError;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
import static com.facebook.presto.raptor.util.UuidUtil.uuidFromBytes;
import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
Expand All @@ -62,7 +63,7 @@ public ShardIterator(long tableId, TupleDomain<RaptorColumnHandle> effectivePred
shardIndexTable(tableId),
predicate.getPredicate());

dao = dbi.onDemand(ShardManagerDao.class);
dao = onDemandDao(dbi, ShardManagerDao.class);
fetchNodes();

try {
Expand All @@ -75,7 +76,7 @@ public ShardIterator(long tableId, TupleDomain<RaptorColumnHandle> effectivePred
}
catch (SQLException e) {
close();
throw new PrestoException(RAPTOR_ERROR, e);
throw metadataError(e);
}
}

Expand All @@ -86,7 +87,7 @@ protected ShardNodes computeNext()
return compute();
}
catch (SQLException e) {
throw new PrestoException(RAPTOR_ERROR, e);
throw metadataError(e);
}
}

Expand Down Expand Up @@ -124,32 +125,17 @@ private ShardNodes compute()

private String fetchNode(int id, UUID shardUuid)
{
String node = fetchNode(id);
String node = dao.getNodeIdentifier(id);
if (node == null) {
throw new PrestoException(RAPTOR_ERROR, format("Missing node ID [%s] for shard: %s", id, shardUuid));
}
return node;
}

private String fetchNode(int id)
{
try {
return dao.getNodeIdentifier(id);
}
catch (DBIException e) {
throw new PrestoException(RAPTOR_ERROR, e);
}
}

private void fetchNodes()
{
try {
for (Node node : dao.getNodes()) {
nodeMap.put(node.getNodeId(), node.getNodeIdentifier());
}
}
catch (DBIException e) {
throw new PrestoException(RAPTOR_ERROR, e);
for (Node node : dao.getNodes()) {
nodeMap.put(node.getNodeId(), node.getNodeIdentifier());
}
}

Expand Down
Expand Up @@ -16,6 +16,8 @@
import com.google.common.base.Throwables;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;

import java.util.concurrent.TimeUnit;
Expand All @@ -26,12 +28,12 @@ public final class ShardManagerDaoUtils

private ShardManagerDaoUtils() {}

public static void createShardTablesWithRetry(ShardManagerDao dao)
public static void createShardTablesWithRetry(IDBI dbi)
{
Duration delay = new Duration(10, TimeUnit.SECONDS);
while (true) {
try {
createShardTables(dao);
try (Handle handle = dbi.open()) {
createShardTables(handle.attach(ShardManagerDao.class));
return;
}
catch (UnableToObtainConnectionException e) {
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.facebook.presto.raptor.metadata.TableColumn;
import com.facebook.presto.raptor.metadata.TableMetadata;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.type.Type;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -60,10 +59,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.raptor.metadata.DatabaseShardManager.maxColumn;
import static com.facebook.presto.raptor.metadata.DatabaseShardManager.minColumn;
import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable;
import static com.facebook.presto.raptor.util.DatabaseUtil.metadataError;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.partition;
Expand Down Expand Up @@ -138,7 +138,7 @@ public ShardCompactionManager(
boolean compactionEnabled)
{
this.dbi = requireNonNull(dbi, "dbi is null");
this.metadataDao = dbi.onDemand(MetadataDao.class);
this.metadataDao = onDemandDao(dbi, MetadataDao.class);

this.currentNodeIdentifier = requireNonNull(currentNodeIdentifier, "currentNodeIdentifier is null");
this.shardManager = requireNonNull(shardManager, "shardManager is null");
Expand Down Expand Up @@ -296,7 +296,7 @@ Set<ShardMetadata> filterShardsWithTemporalMetadata(Iterable<ShardMetadata> allS
}
}
catch (SQLException e) {
throw new PrestoException(RAPTOR_ERROR, e);
throw metadataError(e);
}
return temporalShards.build();
}
Expand Down

0 comments on commit 357f579

Please sign in to comment.