diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index c4ab1ad27a..dadd59332d 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -673,7 +673,7 @@ public void stopAsyncRequest(int nodeId, int requestId) { * @param nodeId Id of the node to poll * @param requestId Id of the request to check * @param maxWait Maximum time we'll keep checking a request until we - * give up + * give up. Pass in 0 or less to wait "forever". * @param timeUnit Unit in which maxWait is expressed. * @param higherStatus A higher level async operation object. If this * waiting is being run another async operation this helps us @@ -688,7 +688,10 @@ public String waitForCompletion(int nodeId, TimeUnit timeUnit, AsyncOperationStatus higherStatus) { long delay = INITIAL_DELAY; - long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait); + long waitUntil = Long.MAX_VALUE; + if(maxWait > 0) { + waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait); + } String description = null; String oldStatus = ""; @@ -765,7 +768,7 @@ public String waitForCompletion(int nodeId, int requestId, long maxWait, TimeUni * maxWait time. */ public String waitForCompletion(int nodeId, int requestId) { - return waitForCompletion(nodeId, requestId, Long.MAX_VALUE, TimeUnit.SECONDS, null); + return waitForCompletion(nodeId, requestId, 0, TimeUnit.SECONDS, null); } /** diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index b6debd447c..227f2251bd 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -123,6 +123,10 @@ public RebalancePlan getPlan(Cluster finalCluster, RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs); RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster); + // TODO: (currentCluster vs interimCluster) Add more validation before + // constructing plan? Given that currentCluster was polled from prod + // cluster, should confirm that it is an "interim cluster" i.e., has + // same (superset?) of nodes as are in finalCluster. String outputDir = null; return new RebalancePlan(currentCluster, currentStoreDefs, diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index eb22fc20f5..d5dc85d21a 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -99,6 +99,10 @@ public RebalancePlan(final Cluster currentCluster, this.batchSize = batchSize; this.outputDir = outputDir; + // TODO: (currentCluster vs interimCluster) Instead of divining + // interimCluster from currentCluster and finalCluster, should we + // require that interimCluster be passed in? + // Derive the targetCluster from current & final cluster xml RebalanceUtils.validateCurrentFinalCluster(this.currentCluster, this.finalCluster); Cluster interimCluster = RebalanceUtils.getInterimCluster(this.currentCluster, diff --git a/src/java/voldemort/routing/StoreRoutingPlan.java b/src/java/voldemort/routing/StoreRoutingPlan.java index 26129b8cb4..8b1ed8217f 100644 --- a/src/java/voldemort/routing/StoreRoutingPlan.java +++ b/src/java/voldemort/routing/StoreRoutingPlan.java @@ -26,6 +26,7 @@ import voldemort.cluster.Cluster; import voldemort.cluster.Node; import voldemort.store.StoreDefinition; +import voldemort.store.system.SystemStoreConstants; import voldemort.utils.ByteUtils; import voldemort.utils.ClusterUtils; import voldemort.utils.NodeUtils; @@ -52,6 +53,8 @@ public class StoreRoutingPlan { public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { this.cluster = cluster; this.storeDefinition = storeDefinition; + verifyClusterStoreDefinition(); + this.partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster); this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster); @@ -78,7 +81,50 @@ public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { } } } + } + + /** + * Verify that cluster is congruent to store def wrt zones. + */ + private void verifyClusterStoreDefinition() { + if(SystemStoreConstants.isSystemStore(storeDefinition.getName())) { + // TODO: Once "todo" in StorageService.initSystemStores is complete, + // this early return can be removed and verification can be enabled + // for system stores. + return; + } + + Set clusterZoneIds = cluster.getZoneIds(); + if(clusterZoneIds.size() > 1) { // Zoned + Map zoneRepFactor = storeDefinition.getZoneReplicationFactor(); + Set storeDefZoneIds = zoneRepFactor.keySet(); + + if(!clusterZoneIds.equals(storeDefZoneIds)) { + throw new VoldemortException("Zone IDs in cluster (" + clusterZoneIds + + ") are incongruent with zone IDs in store defs (" + + storeDefZoneIds + ")"); + } + + for(int zoneId: clusterZoneIds) { + if(zoneRepFactor.get(zoneId) > cluster.getNumberOfNodesInZone(zoneId)) { + throw new VoldemortException("Not enough nodes (" + + cluster.getNumberOfNodesInZone(zoneId) + + ") in zone with id " + zoneId + + " for replication factor of " + + zoneRepFactor.get(zoneId) + "."); + } + } + } else { // Non-zoned + + if(storeDefinition.getReplicationFactor() > cluster.getNumberOfNodes()) { + System.err.println(storeDefinition); + System.err.println(cluster); + throw new VoldemortException("Not enough nodes (" + cluster.getNumberOfNodes() + + ") for replication factor of " + + storeDefinition.getReplicationFactor() + "."); + } + } } public Cluster getCluster() { diff --git a/src/java/voldemort/server/storage/StorageService.java b/src/java/voldemort/server/storage/StorageService.java index 7fb0e076c9..3874728fa1 100644 --- a/src/java/voldemort/server/storage/StorageService.java +++ b/src/java/voldemort/server/storage/StorageService.java @@ -249,7 +249,8 @@ private void initSystemStores() { } private void updateRepFactor(List storesDefs) { - // need impl + // TODO: need implementation. Once implemented, see related todo in + // StoreRoutingPlan.verifyClusterStoreDefinition } @Override diff --git a/src/java/voldemort/store/system/SystemStoreConstants.java b/src/java/voldemort/store/system/SystemStoreConstants.java index 0bd7dfe416..c0034721ea 100644 --- a/src/java/voldemort/store/system/SystemStoreConstants.java +++ b/src/java/voldemort/store/system/SystemStoreConstants.java @@ -37,6 +37,8 @@ public static enum SystemStoreName { voldsys$_metadata_version_persistence; } + // TODO: Verify that this hard coded system store works in three zones + // and/or extend it to have zone 2. public static final String SYSTEM_STORE_SCHEMA = "" + " " + " voldsys$_client_registry" diff --git a/src/java/voldemort/tools/PartitionBalance.java b/src/java/voldemort/tools/PartitionBalance.java index 5ccd9a93f1..f77bfaad37 100644 --- a/src/java/voldemort/tools/PartitionBalance.java +++ b/src/java/voldemort/tools/PartitionBalance.java @@ -35,8 +35,6 @@ import com.google.common.collect.Maps; -; - public class PartitionBalance { /** @@ -207,15 +205,16 @@ private Map getNodeIdToNaryCount(Cluster cluster, return nodeIdToNaryCount; } - // TODO: (refactor) When/if "replica type" is exorcised from the code base, + // TODO: (replicaType) When replicaType is exorcized from the code base, // this detailed dump method should be removed. /** * Dumps the partition IDs per node in terms of "replica type". * * @param cluster * @param storeDefinition - * @return pretty printed string of detailed replica tyep dump. + * @return pretty printed string of detailed replica type dump. */ + @Deprecated private String dumpReplicaTypeDetails(Cluster cluster, StoreDefinition storeDefinition) { StringBuilder sb = new StringBuilder(); Map>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(cluster, @@ -281,7 +280,9 @@ private String dumpZoneNAryDetails(StoreRoutingPlan storeRoutingPlan) { List naries = storeRoutingPlan.getZoneNAryPartitionIds(nodeId); Map> zoneNaryTypeToPartitionIds = new HashMap>(); for(int nary: naries) { - int zoneReplicaType = storeRoutingPlan.getZoneNaryForNodesPartition(zoneId, nodeId, nary); + int zoneReplicaType = storeRoutingPlan.getZoneNaryForNodesPartition(zoneId, + nodeId, + nary); if(!zoneNaryTypeToPartitionIds.containsKey(zoneReplicaType)) { zoneNaryTypeToPartitionIds.put(zoneReplicaType, new ArrayList()); } diff --git a/test/common/voldemort/ClusterTestUtils.java b/test/common/voldemort/ClusterTestUtils.java index 5c96e3054a..c4c4aff109 100644 --- a/test/common/voldemort/ClusterTestUtils.java +++ b/test/common/voldemort/ClusterTestUtils.java @@ -110,21 +110,50 @@ public static List getZZ322StoreDefs(String storageType) { return storeDefs; } + public static List getZZ322StoreDefsWithNonContiguousZoneIds(String storageType) { + + List storeDefs = new LinkedList(); + HashMap zoneRep322 = new HashMap(); + zoneRep322.put(0, 3); + zoneRep322.put(2, 3); + StoreDefinition storeDef322 = new StoreDefinitionBuilder().setName("ZZ322") + .setType(storageType) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY) + .setKeySerializer(new SerializerDefinition("string")) + .setValueSerializer(new SerializerDefinition("string")) + .setReplicationFactor(6) + .setZoneReplicationFactor(zoneRep322) + .setRequiredReads(2) + .setRequiredWrites(2) + .setZoneCountReads(0) + .setZoneCountWrites(0) + .build(); + storeDefs.add(storeDef322); + return storeDefs; + } + /** * Store defs for zoned clusters with 2 zones. Covers the three store * definitions of interest: 3/2/2, 2/1/1, and */ public static List getZZStoreDefsInMemory() { List storeDefs = new LinkedList(); - storeDefs.addAll(ClusterTestUtils.getZZ111StoreDefs(InMemoryStorageConfiguration.TYPE_NAME)); + storeDefs.addAll(getZZ111StoreDefs(InMemoryStorageConfiguration.TYPE_NAME)); storeDefs.addAll(getZZ211StoreDefs(InMemoryStorageConfiguration.TYPE_NAME)); storeDefs.addAll(getZZ322StoreDefs(InMemoryStorageConfiguration.TYPE_NAME)); return storeDefs; } + public static List getZZStoreDefsWithNonContiguousZoneIDsInMemory() { + List storeDefs = new LinkedList(); + storeDefs.addAll(getZZ322StoreDefsWithNonContiguousZoneIds(InMemoryStorageConfiguration.TYPE_NAME)); + return storeDefs; + } + public static List getZZStoreDefsBDB() { List storeDefs = new LinkedList(); - storeDefs.addAll(ClusterTestUtils.getZZ111StoreDefs(BdbStorageConfiguration.TYPE_NAME)); + storeDefs.addAll(getZZ111StoreDefs(BdbStorageConfiguration.TYPE_NAME)); storeDefs.addAll(getZZ211StoreDefs(BdbStorageConfiguration.TYPE_NAME)); storeDefs.addAll(getZZ322StoreDefs(BdbStorageConfiguration.TYPE_NAME)); return storeDefs; @@ -511,6 +540,7 @@ public static Cluster getZZClusterWithNonContiguousZoneIDsButContiguousNodeIDs() node.getHttpPort(), node.getSocketPort(), node.getAdminPort(), + node.getZoneId(), node.getPartitionIds()); nodeList.add(newNode); nodeId++; diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index ef4915321d..612d281ea7 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -41,7 +41,6 @@ import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import voldemort.ClusterTestUtils; @@ -120,8 +119,13 @@ public AbstractZonedRebalanceTest(boolean useNio, boolean useDonorBased) { super(useNio, useDonorBased); } - @BeforeClass - public static void generalSetup() throws IOException { + @Before + public void setUp() throws IOException { + setUpRWStuff(); + setupZZandZZZ(); + } + + public static void setupZZandZZZ() throws IOException { zzCurrent = ClusterTestUtils.getZZCluster(); zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions(); zzClusterExpansionNN = ClusterTestUtils.getZZClusterWithNN(); @@ -145,8 +149,7 @@ public static void generalSetup() throws IOException { zzzStoresXml = zzzfile.getAbsolutePath(); } - @Before - public void setUp() throws IOException { + public void setUpRWStuff() throws IOException { // First without replication HashMap zrfRWStoreWithoutReplication = new HashMap(); zrfRWStoreWithoutReplication.put(0, 1); @@ -237,6 +240,14 @@ public void tearDown() { socketStoreFactory = null; } + // TODO: (currentCluster vs interimCluster) Ideally, we could go from + // cCluster to fCluster for zone expansion. Unfortunately, to start a + // VoldemortServer, you need a cluster xml that includes that server. For + // now, we assume interim cluster is deployed (i.e., cluster with empty + // nodes in new zones). Either, deploying interim cluster with empty nodes + // must be codified in run book and tested as a pre-condition or servers + // need to be able to start without a cluster xml that includes them. + // TODO: The tests based on this method are susceptible to TOCTOU // BindException issue since findFreePorts is used to determine the ports // for localhost:PORT of each node. @@ -349,6 +360,9 @@ public void testClusterExpansionZZZ() throws Exception { @Test(timeout = 600000) public void testZoneExpansionZZ2ZZZ() throws Exception { + // TODO: see todo for method testZonedRebalance to understand why we + // cannot invoke the following: + /*- testZonedRebalance("TestZoneExpansionZZ2ZZZ", zzCurrent, zzzZoneExpansionXXP, @@ -356,6 +370,12 @@ public void testZoneExpansionZZ2ZZZ() throws Exception { zzzStoresXml, zzStores, zzzStores); + */ + testZonedRebalance("TestZoneExpansionZZ2ZZZ", + zzeZoneExpansion, + zzzZoneExpansionXXP, + zzzStoresXml, + zzzStores); } @Test(timeout = 600000) diff --git a/test/unit/voldemort/tools/PartitionBalanceTest.java b/test/unit/voldemort/tools/PartitionBalanceTest.java index ffe0085cdb..dd73a21d0f 100644 --- a/test/unit/voldemort/tools/PartitionBalanceTest.java +++ b/test/unit/voldemort/tools/PartitionBalanceTest.java @@ -60,9 +60,6 @@ public void testEmptyZoneThingsThatShouldWork() { new PartitionBalance(ClusterTestUtils.getZEZCluster(), ClusterTestUtils.getZZZStoreDefsInMemory()); - - new PartitionBalance(ClusterTestUtils.getZEZClusterWithOnlyOneNodeInNewZone(), - ClusterTestUtils.getZZZStoreDefsInMemory()); } @Test @@ -114,9 +111,9 @@ public void testClusterWithZoneThatCannotFullyReplicate() { * to shrink zones. */ @Test - public void testNonContiguousZonesThatShouldWork() { + public void testNonContiguousZoneIds() { new PartitionBalance(ClusterTestUtils.getZZClusterWithNonContiguousZoneIDsButContiguousNodeIDs(), - ClusterTestUtils.getZZStoreDefsInMemory()); + ClusterTestUtils.getZZStoreDefsWithNonContiguousZoneIDsInMemory()); } // TODO: Fix handling of node Ids so that they do not need to be contiguous. diff --git a/test/unit/voldemort/tools/RepartitionerTest.java b/test/unit/voldemort/tools/RepartitionerTest.java index 47c221e52e..4474a401c4 100644 --- a/test/unit/voldemort/tools/RepartitionerTest.java +++ b/test/unit/voldemort/tools/RepartitionerTest.java @@ -361,7 +361,7 @@ public void verifyGreedySwapsImproveBalance(Cluster currentCluster, } @Test - public void testRebalance() { + public void testShuffle() { // Two zone cluster Cluster currentCluster = ClusterTestUtils.getZZCluster(); List storeDefs = ClusterTestUtils.getZZStoreDefsInMemory(); @@ -402,8 +402,8 @@ public void testClusterExpansion() { @Test public void testZoneExpansion() { - Cluster currentCluster = ClusterTestUtils.getZZCluster(); - List currentStoreDefs = ClusterTestUtils.getZZStoreDefsInMemory(); + Cluster currentCluster = ClusterTestUtils.getZZECluster(); + List currentStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory(); Cluster targetCluster = ClusterTestUtils.getZZZClusterWithNNN(); List targetStoreDefs = ClusterTestUtils.getZZZStoreDefsInMemory();