Skip to content

Commit

Permalink
Add grace period for bucket and shard reassignment
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Feb 23, 2016
1 parent c28d942 commit a42e5a5
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 23 deletions.
Expand Up @@ -320,7 +320,7 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess

private RaptorPartitioningHandle getPartitioningHandle(long distributionId)
{
return new RaptorPartitioningHandle(distributionId, shardManager.getBucketAssignments(distributionId));
return new RaptorPartitioningHandle(distributionId, shardManager.getBucketAssignments(distributionId, true));
}

private Optional<DistributionInfo> getOrCreateDistribution(Map<String, RaptorColumnHandle> columnHandleMap, Map<String, Object> properties)
Expand Down
Expand Up @@ -228,7 +228,7 @@ private ConnectorSplit createSplit(BucketShards bucketShards)
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Node node = selectRandom(availableNodes);
shardManager.assignShard(tableId, shardId, node.getNodeIdentifier());
shardManager.assignShard(tableId, shardId, node.getNodeIdentifier(), true);
addresses = ImmutableList.of(node.getHostAndPort());
}

Expand All @@ -248,7 +248,7 @@ private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shard
// shards not currently assigned to the node for the bucket.
for (ShardNodes shard : shards) {
if (!shard.getNodeIdentifiers().contains(nodeId)) {
shardManager.assignShard(tableId, shard.getShardUuid(), nodeId);
shardManager.assignShard(tableId, shard.getShardUuid(), nodeId, false);
}
}

Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import org.h2.jdbc.JdbcConnection;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
Expand Down Expand Up @@ -74,9 +75,11 @@
import static com.facebook.presto.raptor.util.UuidUtil.uuidToBytes;
import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.StandardErrorCode.SERVER_STARTING_UP;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.partition;
import static io.airlift.units.Duration.nanosSince;
import static java.lang.String.format;
import static java.sql.Statement.RETURN_GENERATED_KEYS;
import static java.util.Arrays.asList;
Expand All @@ -91,10 +94,12 @@ public class DatabaseShardManager

private static final String INDEX_TABLE_PREFIX = "x_shards_t";

private final long startTime = System.nanoTime();
private final IDBI dbi;
private final DaoSupplier<ShardDao> shardDaoSupplier;
private final ShardDao dao;
private final NodeSupplier nodeSupplier;
private final Duration startupGracePeriod;

private final LoadingCache<String, Integer> nodeIdCache = CacheBuilder.newBuilder()
.maximumSize(10_000)
Expand All @@ -108,12 +113,26 @@ public Integer load(String nodeIdentifier)
});

@Inject
public DatabaseShardManager(@ForMetadata IDBI dbi, DaoSupplier<ShardDao> shardDaoSupplier, NodeSupplier nodeSupplier)
public DatabaseShardManager(
@ForMetadata IDBI dbi,
DaoSupplier<ShardDao> shardDaoSupplier,
NodeSupplier nodeSupplier,
MetadataConfig config)
{
this(dbi, shardDaoSupplier, nodeSupplier, config.getStartupGracePeriod());
}

public DatabaseShardManager(
IDBI dbi,
DaoSupplier<ShardDao> shardDaoSupplier,
NodeSupplier nodeSupplier,
Duration startupGracePeriod)
{
this.dbi = requireNonNull(dbi, "dbi is null");
this.shardDaoSupplier = requireNonNull(shardDaoSupplier, "shardDaoSupplier is null");
this.dao = shardDaoSupplier.onDemand();
this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
this.startupGracePeriod = requireNonNull(startupGracePeriod, "startupGracePeriod is null");

createTablesWithRetry(dbi);
}
Expand Down Expand Up @@ -383,8 +402,12 @@ public ResultIterator<BucketShards> getShardNodes(long tableId, boolean bucketed
}

@Override
public void assignShard(long tableId, UUID shardUuid, String nodeIdentifier)
public void assignShard(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod)
{
if (gracePeriod && (nanosSince(startTime).compareTo(startupGracePeriod) < 0)) {
throw new PrestoException(SERVER_STARTING_UP, "Cannot reassign shards while server is starting");
}

int nodeId = getOrCreateNodeId(nodeIdentifier);

runTransaction(dbi, (handle, status) -> {
Expand Down Expand Up @@ -491,7 +514,7 @@ public void createBuckets(long distributionId, int bucketCount)
}

@Override
public Map<Integer, String> getBucketAssignments(long distributionId)
public Map<Integer, String> getBucketAssignments(long distributionId, boolean gracePeriod)
{
Set<String> nodeIds = getNodeIdentifiers();
Iterator<String> nodeIterator = cyclingShuffledIterator(nodeIds);
Expand All @@ -503,6 +526,9 @@ public Map<Integer, String> getBucketAssignments(long distributionId)
String nodeId = bucketNode.getNodeIdentifier();

if (!nodeIds.contains(nodeId)) {
if (gracePeriod && (nanosSince(startTime).compareTo(startupGracePeriod) < 0)) {
throw new PrestoException(SERVER_STARTING_UP, "Cannot reassign buckets while server is starting");
}
String oldNodeId = nodeId;
// TODO: use smarter system to choose replacement node
nodeId = nodeIterator.next();
Expand Down
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.metadata;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import javax.validation.constraints.NotNull;

import static java.util.concurrent.TimeUnit.MINUTES;

public class MetadataConfig
{
private Duration startupGracePeriod = new Duration(5, MINUTES);

@NotNull
public Duration getStartupGracePeriod()
{
return startupGracePeriod;
}

@Config("raptor.startup-grace-period")
@ConfigDescription("Minimum uptime before allowing bucket or shard reassignments")
public MetadataConfig setStartupGracePeriod(Duration startupGracePeriod)
{
this.startupGracePeriod = startupGracePeriod;
return this;
}
}
Expand Up @@ -64,7 +64,7 @@ public interface ShardManager
/**
* Assign a shard to a node.
*/
void assignShard(long tableId, UUID shardUuid, String nodeIdentifier);
void assignShard(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod);

/**
* Remove shard assignment from a node.
Expand Down Expand Up @@ -96,5 +96,5 @@ public interface ShardManager
/**
* Get map of buckets to node identifiers for a table.
*/
Map<Integer, String> getBucketAssignments(long distributionId);
Map<Integer, String> getBucketAssignments(long distributionId, boolean gracePeriod);
}
Expand Up @@ -235,7 +235,7 @@ void process()
nodeSize -= shardSize;

// move assignment
shardManager.assignShard(shard.getTableId(), shardUuid, target);
shardManager.assignShard(shard.getTableId(), shardUuid, target, false);
shardManager.unassignShard(shard.getTableId(), shardUuid, currentNode);

// delete local file
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.raptor.backup.BackupManager;
import com.facebook.presto.raptor.metadata.DatabaseShardManager;
import com.facebook.presto.raptor.metadata.MetadataConfig;
import com.facebook.presto.raptor.metadata.ShardCleaner;
import com.facebook.presto.raptor.metadata.ShardCleanerConfig;
import com.facebook.presto.raptor.metadata.ShardManager;
Expand Down Expand Up @@ -43,6 +44,7 @@ public void configure(Binder binder)
{
configBinder(binder).bindConfig(StorageManagerConfig.class);
configBinder(binder).bindConfig(ShardCleanerConfig.class);
configBinder(binder).bindConfig(MetadataConfig.class);

binder.bind(StorageManager.class).to(OrcStorageManager.class).in(Scopes.SINGLETON);
binder.bind(StorageService.class).to(FileStorageService.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.raptor.metadata;

import com.facebook.presto.raptor.NodeSupplier;
import com.facebook.presto.raptor.RaptorColumnHandle;
import com.facebook.presto.raptor.util.DaoSupplier;
import com.facebook.presto.spi.HostAddress;
Expand All @@ -31,6 +32,7 @@
import com.google.common.io.Files;
import io.airlift.slice.Slice;
import io.airlift.testing.FileUtils;
import io.airlift.units.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
Expand All @@ -55,6 +57,7 @@

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_EXTERNAL_BATCH_ALREADY_EXISTS;
import static com.facebook.presto.raptor.storage.ShardStats.MAX_BINARY_INDEX_SIZE;
import static com.facebook.presto.spi.StandardErrorCode.SERVER_STARTING_UP;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.facebook.presto.spi.predicate.Range.greaterThan;
import static com.facebook.presto.spi.predicate.Range.greaterThanOrEqual;
Expand All @@ -72,6 +75,7 @@
import static com.google.common.collect.Iterators.transform;
import static io.airlift.slice.Slices.utf8Slice;
import static java.time.ZoneOffset.UTC;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.stream.Collectors.toSet;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -161,14 +165,22 @@ public void testAssignShard()
ShardNodes actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all()));
assertEquals(actual, new ShardNodes(shard, ImmutableSet.of("node1")));

shardManager.assignShard(tableId, shard, "node2");
try {
shardManager.assignShard(tableId, shard, "node2", true);
fail("expected exception");
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), SERVER_STARTING_UP.toErrorCode());
}

shardManager.assignShard(tableId, shard, "node2", false);

// assign shard to another node
actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all()));
assertEquals(actual, new ShardNodes(shard, ImmutableSet.of("node1", "node2")));

// assigning a shard should be idempotent
shardManager.assignShard(tableId, shard, "node2");
shardManager.assignShard(tableId, shard, "node2", false);

// remove assignment from first node
shardManager.unassignShard(tableId, shard, "node1");
Expand Down Expand Up @@ -204,7 +216,7 @@ public void testGetNodeBytes()

assertEquals(shardManager.getNodeBytes(), ImmutableMap.of("node1", 88L));

shardManager.assignShard(tableId, shard1, "node2");
shardManager.assignShard(tableId, shard1, "node2", false);

assertEquals(getShardNodes(tableId, TupleDomain.all()), ImmutableSet.of(
new ShardNodes(shard1, ImmutableSet.of("node1", "node2")),
Expand Down Expand Up @@ -342,21 +354,27 @@ public void testBucketAssignments()
int bucketCount = 13;
long distributionId = metadataDao.insertDistribution(null, "test", bucketCount);

DaoSupplier<ShardDao> shardDaoDaoSupplier = new DaoSupplier<>(dbi, ShardDao.class);

Set<Node> originalNodes = ImmutableSet.of(node1, node2);
ShardManager shardManager = new DatabaseShardManager(dbi, shardDaoDaoSupplier, () -> originalNodes);
ShardManager shardManager = createShardManager(dbi, () -> originalNodes);

shardManager.createBuckets(distributionId, bucketCount);

Map<Integer, String> assignments = shardManager.getBucketAssignments(distributionId);
Map<Integer, String> assignments = shardManager.getBucketAssignments(distributionId, false);
assertEquals(assignments.size(), bucketCount);
assertEquals(ImmutableSet.copyOf(assignments.values()), nodeIds(originalNodes));

Set<Node> newNodes = ImmutableSet.of(node1, node3);
shardManager = new DatabaseShardManager(dbi, shardDaoDaoSupplier, () -> newNodes);
shardManager = createShardManager(dbi, () -> newNodes);

try {
shardManager.getBucketAssignments(distributionId, true);
fail("expected exception");
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), SERVER_STARTING_UP.toErrorCode());
}

assignments = shardManager.getBucketAssignments(distributionId);
assignments = shardManager.getBucketAssignments(distributionId, false);
assertEquals(assignments.size(), bucketCount);
assertEquals(ImmutableSet.copyOf(assignments.values()), nodeIds(newNodes));
}
Expand Down Expand Up @@ -607,7 +625,12 @@ private static Set<ShardNodes> toShardNodes(List<ShardInfo> shards)

public static ShardManager createShardManager(IDBI dbi)
{
return new DatabaseShardManager(dbi, new DaoSupplier<>(dbi, ShardDao.class), ImmutableSet::of);
return createShardManager(dbi, ImmutableSet::of);
}

public static ShardManager createShardManager(IDBI dbi, NodeSupplier nodeSupplier)
{
return new DatabaseShardManager(dbi, new DaoSupplier<>(dbi, ShardDao.class), nodeSupplier, new Duration(1, DAYS));
}

private static Domain createDomain(Range first, Range... ranges)
Expand Down
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.metadata;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestMetadataConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(MetadataConfig.class)
.setStartupGracePeriod(new Duration(5, MINUTES)));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("raptor.startup-grace-period", "42m")
.build();

MetadataConfig expected = new MetadataConfig()
.setStartupGracePeriod(new Duration(42, MINUTES));

assertFullMapping(properties, expected);
}
}
Expand Up @@ -24,7 +24,6 @@
import com.facebook.presto.raptor.RaptorSessionProperties;
import com.facebook.presto.raptor.RaptorTableHandle;
import com.facebook.presto.raptor.storage.StorageManagerConfig;
import com.facebook.presto.raptor.util.DaoSupplier;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -66,6 +65,7 @@
import static com.facebook.presto.raptor.RaptorTableProperties.ORDERING_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.TEMPORAL_COLUMN_PROPERTY;
import static com.facebook.presto.raptor.metadata.SchemaDaoUtil.createTablesWithRetry;
import static com.facebook.presto.raptor.metadata.TestDatabaseShardManager.createShardManager;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.DateType.DATE;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void setupDatabase()
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addCurrentNodeDatasource(connectorId.toString());
NodeSupplier nodeSupplier = new RaptorNodeSupplier(nodeManager, connectorId);
shardManager = new DatabaseShardManager(dbi, new DaoSupplier<>(dbi, ShardDao.class), nodeSupplier);
shardManager = createShardManager(dbi, nodeSupplier);
metadata = new RaptorMetadata(connectorId.toString(), dbi, shardManager, SHARD_INFO_CODEC, SHARD_DELTA_CODEC);
}

Expand Down

0 comments on commit a42e5a5

Please sign in to comment.