Skip to content

Commit

Permalink
Small refactor for DatabaseShardManager
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed May 5, 2015
1 parent b1af9ce commit a06ea61
Showing 1 changed file with 30 additions and 17 deletions.
Expand Up @@ -35,6 +35,7 @@
import javax.inject.Inject;

import java.sql.JDBCType;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -122,27 +123,12 @@ public void commitShards(long tableId, List<ColumnInfo> columns, Collection<Shar
throw new PrestoException(RAPTOR_EXTERNAL_BATCH_ALREADY_EXISTS, "External batch already exists: " + externalBatchId.get());
}

Set<String> identifiers = shards.stream()
.map(ShardInfo::getNodeIdentifiers)
.flatMap(Collection::stream)
.collect(toSet());
Map<String, Integer> nodeIds = Maps.toMap(identifiers, this::getOrCreateNodeId);
Map<String, Integer> nodeIds = toNodeIdMap(shards);

dbi.inTransaction((handle, status) -> {
ShardManagerDao dao = handle.attach(ShardManagerDao.class);

try (IndexInserter indexInserter = new IndexInserter(handle.getConnection(), tableId, columns)) {
for (ShardInfo shard : shards) {
long shardId = dao.insertShard(shard.getShardUuid(), tableId, shard.getRowCount(), shard.getDataSize());

Set<Integer> shardNodes = shard.getNodeIdentifiers().stream().map(nodeIds::get).collect(toSet());
for (int nodeId : shardNodes) {
dao.insertShardNode(shardId, nodeId);
}

indexInserter.insert(shardId, shard.getShardUuid(), shardNodes, shard.getColumnStats());
}
}
insertShardsAndIndex(tableId, columns, shards, nodeIds, handle, dao);

if (externalBatchId.isPresent()) {
dao.insertExternalBatch(externalBatchId.get());
Expand All @@ -151,6 +137,33 @@ public void commitShards(long tableId, List<ColumnInfo> columns, Collection<Shar
});
}

private void insertShardsAndIndex(long tableId, List<ColumnInfo> columns, Collection<ShardInfo> shards, Map<String, Integer> nodeIds, Handle handle, ShardManagerDao dao)
throws SQLException
{
try (IndexInserter indexInserter = new IndexInserter(handle.getConnection(), tableId, columns)) {
for (ShardInfo shard : shards) {
long shardId = dao.insertShard(shard.getShardUuid(), tableId, shard.getRowCount(), shard.getDataSize());
Set<Integer> shardNodes = shard.getNodeIdentifiers().stream()
.map(nodeIds::get)
.collect(toSet());
for (int nodeId : shardNodes) {
dao.insertShardNode(shardId, nodeId);
}

indexInserter.insert(shardId, shard.getShardUuid(), shardNodes, shard.getColumnStats());
}
}
}

private Map<String, Integer> toNodeIdMap(Collection<ShardInfo> shards)
{
Set<String> identifiers = shards.stream()
.map(ShardInfo::getNodeIdentifiers)
.flatMap(Collection::stream)
.collect(toSet());
return Maps.toMap(identifiers, this::getOrCreateNodeId);
}

@Override
public CloseableIterator<ShardNodes> getShardNodes(long tableId, TupleDomain<RaptorColumnHandle> effectivePredicate)
{
Expand Down

0 comments on commit a06ea61

Please sign in to comment.