Skip to content

Commit

Permalink
1.Enabling proxy puts by dafault
Browse files Browse the repository at this point in the history
2. Bug fix in proxy put stats
3. Changing order of state change updates for correctness
4. Setting proxy put tests to do one batch rebalancing
  • Loading branch information
vinothchandar committed May 13, 2013
1 parent b3147d3 commit e7ecec1
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -470,7 +470,7 @@ public VoldemortConfig(Props props) {
this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance",
true);
this.maxProxyPutThreads = props.getInt("max.proxy.put.threads", 1);
this.proxyPutsDuringRebalance = props.getBoolean("proxy.puts.during.rebalance", false);
this.proxyPutsDuringRebalance = props.getBoolean("proxy.puts.during.rebalance", true);

this.failureDetectorImplementation = props.getString("failuredetector.implementation",
FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME);
Expand Down
50 changes: 39 additions & 11 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -167,24 +167,38 @@ public void rebalanceStateChange(Cluster cluster,
Cluster previousRebalancingSourceCluster = null;

try {
// CHANGE CLUSTER METADATA
if(changeClusterMetadata) {
logger.info("Switching metadata from " + currentCluster + " to " + cluster);
changeCluster(MetadataStore.CLUSTER_KEY, cluster);
completedClusterChange = true;
}

// SWAP RO DATA FOR ALL STORES
if(swapRO) {
swapROStores(swappedStoreNames, false);
}

/*
* Do the rebalancing state changes. It is important that this
* happens before the actual cluster metadata is changed. Here's
* what could happen otherwise. When a batch completes with
* {current_cluster c2, rebalancing_source_cluster c1} and the next
* rebalancing state changes it to {current_cluster c3,
* rebalancing_source_cluster c2} is set for the next batch, then
* there could be a window during which the state is
* {current_cluster c3, rebalancing_source_cluster c1}. On the other
* hand, when we update the rebalancing source cluster first, there
* is a window where the state is {current_cluster c2,
* rebalancing_source_cluster c2}, which still fine, because of the
* following. Successful completion of a batch means the cluster is
* finalized, so its okay to stop proxying based on {current_cluster
* c2, rebalancing_source_cluster c1}. And since the cluster
* metadata has not yet been updated to c3, the writes will happen
* based on c2.
*
* Even if some clients have already seen the {current_cluster c3,
* rebalancing_source_cluster c2} state from other servers, the
* operation will be rejected with InvalidMetadataException since
* this server itself is not aware of C3
*/
// CHANGE REBALANCING STATE
if(changeRebalanceState) {
try {
previousRebalancingSourceCluster = metadataStore.getRebalancingSourceCluster();
if(!rollback) {
// Save up the current cluster for Redirecting store
logger.info("Setting rebalancing source cluster xml from "
+ previousRebalancingSourceCluster + "to " + currentCluster);
changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, currentCluster);
completedRebalanceSourceClusterChange = true;

Expand All @@ -194,6 +208,8 @@ public void rebalanceStateChange(Cluster cluster,
}
} else {
// Reset the rebalancing source cluster back to null
logger.info("Resetting rebalancing source cluster xml from "
+ previousRebalancingSourceCluster + "to null");
changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, null);
completedRebalanceSourceClusterChange = true;

Expand All @@ -206,6 +222,18 @@ public void rebalanceStateChange(Cluster cluster,
throw new VoldemortException(e);
}
}

// CHANGE CLUSTER METADATA
if(changeClusterMetadata) {
logger.info("Switching metadata from " + currentCluster + " to " + cluster);
changeCluster(MetadataStore.CLUSTER_KEY, cluster);
completedClusterChange = true;
}

// SWAP RO DATA FOR ALL STORES
if(swapRO) {
swapROStores(swappedStoreNames, false);
}
} catch(VoldemortException e) {

logger.error("Got exception while changing state, now rolling back changes", e);
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java
Expand Up @@ -79,9 +79,10 @@ public void run() {

socketStore.put(key, value, transforms);
redirectingStore.recordSuccess(proxyNode, startNs);
redirectingStore.reportProxyPutSuccess();
if(logger.isTraceEnabled()) {
logger.trace("Proxy write for store " + redirectingStore.getName() + " key "
+ ByteUtils.toBinaryString(key.get()) + " to destinationNode:"
+ ByteUtils.toHexString(key.get()) + " to destinationNode:"
+ destinationNode);
}
} catch(UnreachableStoreException e) {
Expand Down
5 changes: 4 additions & 1 deletion src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -682,7 +682,6 @@ protected void recordException(Node node, long startNs, UnreachableStoreExceptio
}

protected void recordSuccess(Node node, long startNs) {
proxyPutStats.reportProxyPutCompletion();
failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / Time.NS_PER_MS);
}

Expand All @@ -694,6 +693,10 @@ protected void reportProxyPutFailure() {
proxyPutStats.reportProxyPutFailure();
}

protected void reportProxyPutSuccess() {
proxyPutStats.reportProxyPutCompletion();
}

public ProxyPutStats getProxyPutStats() {
return this.proxyPutStats;
}
Expand Down
Expand Up @@ -928,6 +928,11 @@ public void testProxyPutDuringRebalancing() throws Exception {

RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
// Its is imperative that we test in a single shot since multiple
// batches would mean the proxy bridges being torn down and
// established multiple times and we cannot test against the source
// cluster topology then.
rebalanceClientConfig.setPrimaryPartitionBatchSize(Integer.MAX_VALUE);
rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased);

final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
Expand Down
Expand Up @@ -622,6 +622,11 @@ public void testProxyPutDuringRebalancing() throws Exception {

RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
// Its is imperative that we test in a single shot since multiple
// batches would mean the proxy bridges being torn down and
// established multiple times and we cannot test against the source
// cluster topology then.
rebalanceClientConfig.setPrimaryPartitionBatchSize(Integer.MAX_VALUE);
rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased);

final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
Expand Down

0 comments on commit e7ecec1

Please sign in to comment.