Skip to content

Commit

Permalink
Added new end to end test for verifying the atomic update is consistent
Browse files Browse the repository at this point in the history
on bootstrap
cleaned up code based off last code review
  • Loading branch information
abh1nay committed May 17, 2013
1 parent 818d96b commit 11ea2f5
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 270 deletions.
14 changes: 5 additions & 9 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -874,14 +874,10 @@ public void updateRemoteMetadata(int remoteNodeId, String key, Versioned<String>
public void updateRemoteMetadata(int remoteNodeId,
HashMap<String, Versioned<String>> keyValueMap) {

Iterator it = keyValueMap.entrySet().iterator();
ArrayList<KeyedVersions> allKeyVersions = new ArrayList();
while(it.hasNext()) {
Map.Entry pairs = (Map.Entry) it.next();

String key = (String) pairs.getKey();

Versioned<String> value = (Versioned<String>) pairs.getValue();
ArrayList<KeyedVersions> allKeyVersions = new ArrayList<KeyedVersions>();
for(Entry<String, Versioned<String>> entry: keyValueMap.entrySet()) {
String key = entry.getKey();
Versioned<String> value = entry.getValue();
ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8"));

Versioned<byte[]> valueBytes = new Versioned<byte[]>(ByteUtils.getBytes(value.getValue(),
Expand All @@ -897,7 +893,7 @@ public void updateRemoteMetadata(int remoteNodeId,
VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder()
.setType(VAdminProto.AdminRequestType.UPDATE_METADATA)
.setUpdateMetadata(VAdminProto.UpdateMetadataRequest.newBuilder()
.addAllKeyValue(allKeyVersions)
.addAllMetadataEntry(allKeyVersions)

.build())
.build();
Expand Down
12 changes: 12 additions & 0 deletions src/java/voldemort/client/protocol/pb/ProtoUtils.java
Expand Up @@ -31,6 +31,7 @@
import voldemort.client.protocol.pb.VAdminProto.PerStorePartitionTuple;
import voldemort.client.protocol.pb.VAdminProto.ROStoreVersionDirMap;
import voldemort.client.protocol.pb.VAdminProto.RebalancePartitionInfoMap;
import voldemort.client.protocol.pb.VProto.KeyedVersions;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.store.ErrorCodeMapper;
import voldemort.utils.ByteArray;
Expand Down Expand Up @@ -198,6 +199,17 @@ public static Versioned<byte[]> decodeVersioned(VProto.Versioned versioned) {
decodeClock(versioned.getVersion()));
}

/**
* Given a list of value versions for the metadata keys we are just
* interested in the value at index 0 This is because even if we have to
* update the cluster.xml we marshall a single key into a versioned list
* Hence we just look at the value at index 0
*
*/
public static Versioned<byte[]> decodeVersionedMetadataKeyValue(KeyedVersions keyValue) {
return decodeVersioned(keyValue.getVersions(0));
}

public static List<Versioned<byte[]>> decodeVersions(List<VProto.Versioned> versioned) {
List<Versioned<byte[]>> values = new ArrayList<Versioned<byte[]>>(versioned.size());
for(VProto.Versioned v: versioned)
Expand Down
474 changes: 237 additions & 237 deletions src/java/voldemort/client/protocol/pb/VAdminProto.java

Large diffs are not rendered by default.

Expand Up @@ -1146,13 +1146,13 @@ public VAdminProto.UpdateMetadataResponse handleUpdateMetadata(VAdminProto.Updat

metadataStore.writeLock.lock();
try {
for(KeyedVersions keyValue: request.getKeyValueList()) {
for(KeyedVersions keyValue: request.getMetadataEntryList()) {

try {
ByteArray key = ProtoUtils.decodeBytes(keyValue.getKey());
String keyString = ByteUtils.getString(key.get(), "UTF-8");
if(MetadataStore.METADATA_KEYS.contains(keyString)) {
Versioned<byte[]> versionedValue = ProtoUtils.decodeVersioned(keyValue.getVersions(0));
Versioned<byte[]> versionedValue = ProtoUtils.decodeVersionedMetadataKeyValue(keyValue);
logger.info("Updating metadata for key '" + keyString + "'");
metadataStore.put(new ByteArray(ByteUtils.getBytes(keyString, "UTF-8")),
versionedValue,
Expand Down
21 changes: 11 additions & 10 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -140,8 +140,8 @@ public synchronized void releaseRebalancingPermit(int nodeId) {
* @param cluster Cluster metadata to change
* @param rebalancePartitionsInfo List of rebalance partitions info
* @param swapRO Boolean to indicate swapping of RO store
* @param changeClusterMetadata Boolean to indicate a change of cluster
* metadata
* @param changeClusterAndStoresMetadata Boolean to indicate a change of
* cluster metadata
* @param changeRebalanceState Boolean to indicate a change in rebalance
* state
* @param rollback Boolean to indicate that we are rolling back or not
Expand All @@ -150,21 +150,21 @@ public void rebalanceStateChange(Cluster cluster,
List<StoreDefinition> storeDefs,
List<RebalancePartitionsInfo> rebalancePartitionsInfo,
boolean swapRO,
boolean changeClusterMetadata,
boolean changeClusterAndStoresMetadata,
boolean changeRebalanceState,
boolean rollback) {
Cluster currentCluster = metadataStore.getCluster();
List<StoreDefinition> currentStoreDefs = metadataStore.getStoreDefList();

logger.info("Server doing rebalance state change with options [ cluster metadata change - "
+ changeClusterMetadata + " ], [ changing rebalancing state - "
+ changeClusterAndStoresMetadata + " ], [ changing rebalancing state - "
+ changeRebalanceState + " ], [ changing swapping RO - " + swapRO
+ " ], [ rollback - " + rollback + " ]");

// Variables to track what has completed
List<RebalancePartitionsInfo> completedRebalancePartitionsInfo = Lists.newArrayList();
List<String> swappedStoreNames = Lists.newArrayList();
boolean completedClusterChange = false;
boolean completedClusterAndStoresChange = false;
boolean completedRebalanceSourceClusterChange = false;
Cluster previousRebalancingSourceCluster = null;
List<StoreDefinition> previousRebalancingSourceStores = null;
Expand Down Expand Up @@ -239,7 +239,7 @@ public void rebalanceStateChange(Cluster cluster,
}

// CHANGE CLUSTER METADATA AND STORE METADATA
if(changeClusterMetadata) {
if(changeClusterAndStoresMetadata) {
logger.info("Switching cluster metadata from " + currentCluster + " to " + cluster);
logger.info("Switching stores metadata from " + currentStoreDefs + " to "
+ storeDefs);
Expand All @@ -248,7 +248,8 @@ public void rebalanceStateChange(Cluster cluster,
MetadataStore.STORES_KEY,
storeDefs);

completedClusterChange = true;
completedClusterAndStoresChange = true;

}

// SWAP RO DATA FOR ALL STORES
Expand All @@ -261,7 +262,7 @@ public void rebalanceStateChange(Cluster cluster,
logger.error("Got exception while changing state, now rolling back changes", e);

// ROLLBACK CLUSTER AND STORES CHANGE
if(completedClusterChange) {
if(completedClusterAndStoresChange) {
try {
logger.info("Rolling back cluster.xml to " + currentCluster);
logger.info("Rolling back stores.xml to " + currentStoreDefs);
Expand All @@ -270,8 +271,8 @@ public void rebalanceStateChange(Cluster cluster,
MetadataStore.STORES_KEY,
currentStoreDefs);
} catch(Exception exception) {
logger.error("Error while rolling back cluster metadata to " + currentCluster,
exception);
logger.error("Error while rolling back cluster metadata to " + currentCluster
+ " Stores metadata to " + currentStoreDefs, exception);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/java/voldemort/store/metadata/MetadataStore.java
Expand Up @@ -413,6 +413,17 @@ public StoreDefinition getStoreDef(String storeName) {
}
}

public VoldemortState getServerStateLocked() {
// acquire read lock
readLock.lock();
try {
return VoldemortState.valueOf(metadataCache.get(SERVER_STATE_KEY).getValue().toString());
} finally {
readLock.unlock();

}
}

public VoldemortState getServerState() {
// acquire read lock
readLock.lock();
Expand Down
14 changes: 4 additions & 10 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -463,10 +463,7 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan,
}
if(sourceStoreDefs == null) {
/*
* This is more for defensive coding purposes. The update of the
* source stores key happens before the server is put in REBALANCING
* mode and is reset to null after the server goes back to NORMAL
* mode.
* similar to the above for stores xml
*/

if(logger.isTraceEnabled()) {
Expand All @@ -477,15 +474,12 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan,
}

StoreDefinition sourceStoreDef = null;
for(StoreDefinition sourceStoreDefIt: sourceStoreDefs) {
if(storeDef.getName().equals(sourceStoreDefIt.getName()))
sourceStoreDef = storeDef;
}
sourceStoreDef = StoreUtils.getStoreDef(sourceStoreDefs, storeDef.getName());

Integer nodeId = metadata.getNodeId();
Integer zoneId = currentRoutingPlan.getCluster().getNodeById(nodeId).getZoneId();

// Use the old store definition to get the routing object instead of the
// new one
// Use the old store definition to get the routing object
StoreRoutingPlan oldRoutingPlan = new StoreRoutingPlan(sourceCluster, sourceStoreDef);
// Check the current node's relationship to the key.
int zoneReplicaType = currentRoutingPlan.getZoneReplicaType(zoneId, nodeId, key);
Expand Down
2 changes: 1 addition & 1 deletion src/proto/voldemort-admin.proto
Expand Up @@ -21,7 +21,7 @@ message GetMetadataResponse {
}

message UpdateMetadataRequest {
repeated KeyedVersions keyValue = 1;
repeated KeyedVersions metadataEntry = 1;
}

message UpdateMetadataResponse {
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void setUp() {
@Test
public void testThreading() {

for(int i = 0; i < 30; i++) {
for(int i = 0; i < 3000; i++) {

Cluster cluster;
StoreDefinition storeDef;
Expand Down Expand Up @@ -189,8 +189,10 @@ class ThreadReader implements Runnable {
@Override
public void run() {

metadataStore.readLock.lock();
checkCluster = metadataStore.getCluster();
checkstores = metadataStore.getStoreDefList();
metadataStore.readLock.unlock();

if(checkCluster.equals(currentCluster)) {
Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication);
Expand Down

0 comments on commit 11ea2f5

Please sign in to comment.