Skip to content

Commit

Permalink
atomic update of stores and cluster xml during rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed May 17, 2013
1 parent 6250c10 commit 818d96b
Show file tree
Hide file tree
Showing 12 changed files with 863 additions and 279 deletions.
6 changes: 3 additions & 3 deletions META-INF/MANIFEST.MF
@@ -1,8 +1,8 @@
Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Voldemort-Implementation-Version: 1.3.1
Created-By: 20.1-b02 (Sun Microsystems Inc.)
Voldemort-Implementation-Version: 1.3.3
Implementation-Title: Voldemort
Implementation-Version: 1.3.1
Implementation-Version: 1.3.3
Implementation-Vendor: LinkedIn

14 changes: 14 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -863,6 +863,14 @@ public void updateRemoteMetadata(int remoteNodeId, String key, Versioned<String>
updateRemoteMetadata(remoteNodeId, keyValueMap);
}

/*
* remoteNodeId the nodeId of the server keyValueMap a Map of metadata
* keys to their versioned value
*
* This method passes multiple metadata keys to the server atomic update
* of stores and cluster xml during rebalance
*/

public void updateRemoteMetadata(int remoteNodeId,
HashMap<String, Versioned<String>> keyValueMap) {

Expand Down Expand Up @@ -2419,6 +2427,8 @@ public Versioned<RebalancerState> getRemoteRebalancerState(int nodeId) {
*/
public void rebalanceStateChange(Cluster existingCluster,
Cluster transitionCluster,
List<StoreDefinition> existingStoreDefs,
List<StoreDefinition> targetStoreDefs,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean swapRO,
boolean changeClusterMetadata,
Expand All @@ -2438,6 +2448,7 @@ public void rebalanceStateChange(Cluster existingCluster,
try {
individualStateChange(nodeId,
transitionCluster,
targetStoreDefs,
stealerNodeToPlan.get(nodeId),
swapRO,
changeClusterMetadata,
Expand Down Expand Up @@ -2482,6 +2493,7 @@ public void rebalanceStateChange(Cluster existingCluster,
try {
individualStateChange(completedNodeId,
existingCluster,
existingStoreDefs,
stealerNodeToPlan.get(completedNodeId),
swapRO,
changeClusterMetadata,
Expand Down Expand Up @@ -2521,6 +2533,7 @@ public void rebalanceStateChange(Cluster existingCluster,
*/
private void individualStateChange(int nodeId,
Cluster cluster,
List<StoreDefinition> storeDefs,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean swapRO,
boolean changeClusterMetadata,
Expand Down Expand Up @@ -2559,6 +2572,7 @@ private void individualStateChange(int nodeId,
.setChangeRebalanceState(changeRebalanceState)
.setClusterString(clusterMapper.writeCluster(cluster))
.setRollback(rollback)
.setStoresString(new StoreDefinitionsMapper().writeStoreList(storeDefs))
.build();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setRebalanceStateChange(getRebalanceStateChangeRequest)
Expand Down
258 changes: 151 additions & 107 deletions src/java/voldemort/client/protocol/pb/VAdminProto.java

Large diffs are not rendered by default.

45 changes: 41 additions & 4 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -389,6 +389,7 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
// Flatten the node plans to partition plans
List<RebalancePartitionsInfo> rebalancePartitionPlanList = rebalancePartitionsInfoList;

List<StoreDefinition> allStoreDefs = orderedClusterTransition.getStoreDefs();
// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(orderedClusterTransition.getStoreDefs(),
true);
Expand All @@ -403,9 +404,14 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
List<RebalancePartitionsInfo> filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList,
readOnlyStoreDefs);

// TODO this method right nowtakes just the source stores definition
// the 2nd argument needs to be fixed
// ATTENTION JAY
rebalanceStateChange(orderedClusterTransition.getId(),
orderedClusterTransition.getCurrentCluster(),
orderedClusterTransition.getTargetCluster(),
allStoreDefs,
allStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -426,9 +432,14 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList,
readWriteStoreDefs);

// TODO this method right nowtakes just the source stores definition
// the 2nd argument needs to be fixed
// ATTENTION JAY
rebalanceStateChange(orderedClusterTransition.getId(),
orderedClusterTransition.getCurrentCluster(),
orderedClusterTransition.getTargetCluster(),
allStoreDefs,
allStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand Down Expand Up @@ -463,6 +474,8 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
}

/**
* TODO JAY -- This interface expects the source stores definition and
* target stores def
*
* Perform a group of state change actions. Also any errors + rollback
* procedures are performed at this level itself.
Expand Down Expand Up @@ -493,6 +506,8 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
private void rebalanceStateChange(final int taskId,
Cluster currentCluster,
Cluster transitionCluster,
List<StoreDefinition> existingStoreDefs,
List<StoreDefinition> targetStoreDefs,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
Expand All @@ -514,6 +529,8 @@ private void rebalanceStateChange(final int taskId,
if(!rebalanceConfig.isShowPlanEnabled())
adminClient.rebalanceOps.rebalanceStateChange(currentCluster,
transitionCluster,
existingStoreDefs,
targetStoreDefs,
rebalancePartitionPlanList,
false,
true,
Expand All @@ -526,6 +543,8 @@ private void rebalanceStateChange(final int taskId,
if(!rebalanceConfig.isShowPlanEnabled())
adminClient.rebalanceOps.rebalanceStateChange(currentCluster,
transitionCluster,
existingStoreDefs,
targetStoreDefs,
rebalancePartitionPlanList,
false,
false,
Expand All @@ -538,6 +557,8 @@ private void rebalanceStateChange(final int taskId,
if(!rebalanceConfig.isShowPlanEnabled())
adminClient.rebalanceOps.rebalanceStateChange(currentCluster,
transitionCluster,
existingStoreDefs,
targetStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand All @@ -552,6 +573,8 @@ private void rebalanceStateChange(final int taskId,
if(!rebalanceConfig.isShowPlanEnabled())
adminClient.rebalanceOps.rebalanceStateChange(currentCluster,
transitionCluster,
existingStoreDefs,
targetStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand Down Expand Up @@ -679,8 +702,15 @@ private void rebalancePerTaskTransition(final int taskId,

if(hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
// Case 0
adminClient.rebalanceOps.rebalanceStateChange(null,
currentCluster,

// TODO this method right nowtakes just the source stores
// definition
// the 2nd argument needs to be fixed
// ATTENTION JAY
adminClient.rebalanceOps.rebalanceStateChange(null, currentCluster, null, null, // pass
// current
// store
// def
null,
true,
true,
Expand All @@ -689,8 +719,15 @@ private void rebalancePerTaskTransition(final int taskId,
false);
} else if(hasReadWriteStores && finishedReadOnlyStores) {
// Case 4
adminClient.rebalanceOps.rebalanceStateChange(null,
currentCluster,

// TODO this method right nowtakes just the source stores
// definition
// the 2nd argument needs to be fixed
// ATTENTION JAY
adminClient.rebalanceOps.rebalanceStateChange(null, currentCluster, null, null, // pass
// current
// store
// def
null,
false,
true,
Expand Down
Expand Up @@ -323,12 +323,14 @@ public VAdminProto.RebalanceStateChangeResponse handleRebalanceStateChange(VAdmi

Cluster cluster = new ClusterMapper().readCluster(new StringReader(request.getClusterString()));

List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(request.getStoresString()));
boolean swapRO = request.getSwapRo();
boolean changeClusterMetadata = request.getChangeClusterMetadata();
boolean changeRebalanceState = request.getChangeRebalanceState();
boolean rollback = request.getRollback();

rebalancer.rebalanceStateChange(cluster,
storeDefs,
rebalancePartitionsInfo,
swapRO,
changeClusterMetadata,
Expand Down Expand Up @@ -380,7 +382,7 @@ public VAdminProto.AsyncOperationStatusResponse handleRebalanceNode(VAdminProto.
+ metadataStore.getNodeId());

// We should be in rebalancing state to run this function
if(!metadataStore.getServerState()
if(!metadataStore.getServerStateUnlocked()
.equals(MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER)) {
response.setError(ProtoUtils.encodeError(errorCodeMapper,
new VoldemortException("Voldemort server "
Expand Down Expand Up @@ -725,7 +727,8 @@ public VAdminProto.SwapStoreResponse handleSwapROStore(VAdminProto.SwapStoreRequ
final String storeName = request.getStoreName();
VAdminProto.SwapStoreResponse.Builder response = VAdminProto.SwapStoreResponse.newBuilder();

if(!metadataStore.getServerState().equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
if(!metadataStore.getServerStateUnlocked()
.equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
response.setError(ProtoUtils.encodeError(errorCodeMapper,
new VoldemortException("Voldemort server "
+ metadataStore.getNodeId()
Expand Down Expand Up @@ -1141,27 +1144,32 @@ public VAdminProto.DeletePartitionEntriesResponse handleDeletePartitionEntries(V
public VAdminProto.UpdateMetadataResponse handleUpdateMetadata(VAdminProto.UpdateMetadataRequest request) {
VAdminProto.UpdateMetadataResponse.Builder response = VAdminProto.UpdateMetadataResponse.newBuilder();

for(KeyedVersions keyValue: request.getKeyValueList()) {
metadataStore.writeLock.lock();
try {
for(KeyedVersions keyValue: request.getKeyValueList()) {

try {
ByteArray key = ProtoUtils.decodeBytes(keyValue.getKey());
String keyString = ByteUtils.getString(key.get(), "UTF-8");
if(MetadataStore.METADATA_KEYS.contains(keyString)) {
Versioned<byte[]> versionedValue = ProtoUtils.decodeVersioned(keyValue.getVersions(0));
logger.info("Updating metadata for key '" + keyString + "'");
metadataStore.put(new ByteArray(ByteUtils.getBytes(keyString, "UTF-8")),
versionedValue,
null);
logger.info("Successfully updated metadata for key '" + keyString + "'");
try {
ByteArray key = ProtoUtils.decodeBytes(keyValue.getKey());
String keyString = ByteUtils.getString(key.get(), "UTF-8");
if(MetadataStore.METADATA_KEYS.contains(keyString)) {
Versioned<byte[]> versionedValue = ProtoUtils.decodeVersioned(keyValue.getVersions(0));
logger.info("Updating metadata for key '" + keyString + "'");
metadataStore.put(new ByteArray(ByteUtils.getBytes(keyString, "UTF-8")),
versionedValue,
null);
logger.info("Successfully updated metadata for key '" + keyString + "'");
}
} catch(VoldemortException e) {
response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
logger.error("handleUpdateMetadata failed for request(" + request.toString()
+ ")", e);
}
} catch(VoldemortException e) {
response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
logger.error("handleUpdateMetadata failed for request(" + request.toString() + ")",
e);
}
}

return response.build();
return response.build();
} finally {
metadataStore.writeLock.unlock();
}
}

public VAdminProto.GetMetadataResponse handleGetMetadata(VAdminProto.GetMetadataRequest request) {
Expand Down Expand Up @@ -1211,7 +1219,8 @@ public VAdminProto.DeleteStoreResponse handleDeleteStore(VAdminProto.DeleteStore
VAdminProto.DeleteStoreResponse.Builder response = VAdminProto.DeleteStoreResponse.newBuilder();

// don't try to delete a store in the middle of rebalancing
if(!metadataStore.getServerState().equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
if(!metadataStore.getServerStateUnlocked()
.equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
response.setError(ProtoUtils.encodeError(errorCodeMapper,
new VoldemortException("Voldemort server is not in normal state")));
return response.build();
Expand Down Expand Up @@ -1288,7 +1297,8 @@ public VAdminProto.AddStoreResponse handleAddStore(VAdminProto.AddStoreRequest r
VAdminProto.AddStoreResponse.Builder response = VAdminProto.AddStoreResponse.newBuilder();

// don't try to add a store when not in normal state
if(!metadataStore.getServerState().equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
if(!metadataStore.getServerStateUnlocked()
.equals(MetadataStore.VoldemortState.NORMAL_SERVER)) {
response.setError(ProtoUtils.encodeError(errorCodeMapper,
new VoldemortException("Voldemort server is not in normal state")));
return response.build();
Expand Down

0 comments on commit 818d96b

Please sign in to comment.