Skip to content

Commit

Permalink
Merge remote branch 'leigao/client-registry' into creg
Browse files Browse the repository at this point in the history
  • Loading branch information
Lei Gao committed Jul 10, 2012
2 parents 05f23ec + e90fa82 commit 35a887e
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 39 deletions.
14 changes: 9 additions & 5 deletions src/java/voldemort/client/SystemStoreRepository.java
Expand Up @@ -18,13 +18,17 @@ public SystemStoreRepository() {
sysStoreMap = new ConcurrentHashMap<String, SystemStore>();
}

public void addSystemStore(SystemStore newSysStore, String storeName) {
this.sysStoreMap.put(storeName, newSysStore);
}

public void createSystemStores(ClientConfig config, String clusterXml) {
for(SystemStoreConstants.SystemStoreName storeName: SystemStoreConstants.SystemStoreName.values()) {
SystemStore<String, Long> sysVersionStore = new SystemStore<String, Long>(storeName.name(),
config.getBootstrapUrls(),
config.getClientZoneId(),
clusterXml);
this.sysStoreMap.put(storeName.name(), sysVersionStore);
SystemStore sysStore = new SystemStore(storeName.name(),
config.getBootstrapUrls(),
config.getClientZoneId(),
clusterXml);
this.sysStoreMap.put(storeName.name(), sysStore);
}
}

Expand Down
@@ -1,10 +1,8 @@
package voldemort.client.scheduler;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Random;
import java.util.concurrent.Callable;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.client.SystemStoreRepository;
Expand All @@ -15,16 +13,16 @@
* cluster and if necessary Re-bootstrap the client.
*
* During initialization, it will retrieve the current version of the store (or
* the entire stores.xml depending upon granularity) and then periodically check
* whether this has been updated. During init if the initial version turns out
* to be null, it means that no change has been done to that store since it was
* created. In this case, we assume version '0'.
* the entire stores.xml depending upon granularity) and cluster.xml and then
* periodically check whether this has been updated. During init if the initial
* version turns out to be null, it means that no change has been done to that
* store since it was created. In this case, we assume version '0'.
*/

public class AsyncMetadataVersionManager implements Runnable {

private static final String STORES_VERSION_KEY = "stores.xml";
private static final String CLUSTER_VERSION_KEY = "cluster.xml";
public static final String STORES_VERSION_KEY = "stores.xml";
public static final String CLUSTER_VERSION_KEY = "cluster.xml";

private final Logger logger = Logger.getLogger(this.getClass());
private Versioned<Long> currentStoreVersion;
Expand All @@ -36,6 +34,8 @@ public class AsyncMetadataVersionManager implements Runnable {
private final int DELTA_MAX = 2000;
private final Random randomGenerator = new Random(System.currentTimeMillis());

public boolean isActive = false;

public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
Callable<Void> storeClientThunk) {
this.sysRepository = sysRepository;
Expand All @@ -56,22 +56,18 @@ public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
}

// If the received version is null, assume version 0
if(currentStoreVersion == null)
if(currentStoreVersion == null) {
currentStoreVersion = new Versioned<Long>((long) 0);
if(currentClusterVersion == null)
}
if(currentClusterVersion == null) {
currentClusterVersion = new Versioned<Long>((long) 0);
}

Thread checkVersionThread = new Thread(this, "AsyncVersionCheckThread");
checkVersionThread.setDaemon(true);
checkVersionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

public void uncaughtException(Thread t, Throwable e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error("Uncaught exception in Metadata Version check thread:", e);
}
});
logger.debug("Initial stores.xml version = " + this.currentStoreVersion);
logger.debug("Initial cluster.xml version = " + this.currentClusterVersion);

this.storeClientThunk = storeClientThunk;
this.isActive = true;
}

/*
Expand Down Expand Up @@ -126,15 +122,17 @@ public void run() {
}

try {
this.storeClientThunk.call();

if(newStoresVersion != null) {
logger.info("Updating stores version");
currentStoreVersion = newStoresVersion;
}

if(newClusterVersion != null) {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;
}

this.storeClientThunk.call();
} catch(Exception e) {
e.printStackTrace();
logger.info(e.getMessage());
Expand Down
65 changes: 52 additions & 13 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Expand Up @@ -153,30 +153,31 @@ public PipelineRoutedStore(String name,
}
}

private ConfigureNodesType getNodeConfigurationType(BasicPipelineData<List<Versioned<byte[]>>> pipelineData) {
private ConfigureNodesType obtainNodeConfigurationType(Integer zonesRequired) {
// If Zone and local preference required
if(pipelineData.getZonesRequired() != null
if(zonesRequired != null
&& routingStrategy.getType().equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY))
return ConfigureNodesType.BYZONE_LOCAL;

// If only local preference required
else if(pipelineData.getZonesRequired() == null
else if(zonesRequired == null
&& routingStrategy.getType().equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY))
return ConfigureNodesType.DEFAULT_LOCAL;

// If only Zone required
else if(pipelineData.getZonesRequired() != null
else if(zonesRequired != null
&& !routingStrategy.getType()
.equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY))
return ConfigureNodesType.BYZONE;

// Default case
return ConfigureNodesType.DEFAULT;

}

private AbstractConfigureNodes<ByteArray, List<Versioned<byte[]>>, BasicPipelineData<List<Versioned<byte[]>>>> getNodeConfiguration(BasicPipelineData<List<Versioned<byte[]>>> pipelineData,
ByteArray key) {
switch(getNodeConfigurationType(pipelineData)) {
switch(obtainNodeConfigurationType(pipelineData.getZonesRequired())) {
case DEFAULT:
return new ConfigureNodesDefault<List<Versioned<byte[]>>, BasicPipelineData<List<Versioned<byte[]>>>>(pipelineData,
Event.CONFIGURED,
Expand Down Expand Up @@ -566,6 +567,45 @@ public boolean isHintedHandoffEnabled() {
return slopStores != null;
}

private AbstractConfigureNodes<ByteArray, Void, PutPipelineData> putNodeConfiguration(PutPipelineData pipelineData,
ByteArray key) {
switch(obtainNodeConfigurationType(pipelineData.getZonesRequired())) {
case DEFAULT:
return new ConfigureNodesDefault<Void, PutPipelineData>(pipelineData,
Event.CONFIGURED,
failureDetector,
storeDef.getRequiredWrites(),
routingStrategy,
key);
case BYZONE:
return new ConfigureNodesByZone<Void, PutPipelineData>(pipelineData,
Event.CONFIGURED,
failureDetector,
storeDef.getRequiredWrites(),
routingStrategy,
key,
clientZone);
case DEFAULT_LOCAL:
return new ConfigureNodesLocalHost<Void, PutPipelineData>(pipelineData,
Event.CONFIGURED,
failureDetector,
storeDef.getRequiredWrites(),
routingStrategy,
key);
case BYZONE_LOCAL:
return new ConfigureNodesLocalHostByZone<Void, PutPipelineData>(pipelineData,
Event.CONFIGURED,
failureDetector,
storeDef.getRequiredWrites(),
routingStrategy,
key,
clientZone);
default:
return null;
}

}

public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
throws VoldemortException {
StoreUtils.assertValidKey(key);
Expand All @@ -585,6 +625,11 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)

HintedHandoff hintedHandoff = null;

// Get the correct type of configure nodes action depending on the store
// requirements
AbstractConfigureNodes<ByteArray, Void, PutPipelineData> configureNodes = putNodeConfiguration(pipelineData,
key);

if(isHintedHandoffEnabled())
hintedHandoff = new HintedHandoff(failureDetector,
slopStores,
Expand All @@ -593,14 +638,8 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
pipelineData.getFailedNodes(),
timeoutConfig.getOperationTimeout(VoldemortOpCode.PUT_OP_CODE));

pipeline.addEventAction(Event.STARTED,
new ConfigureNodes<Void, PutPipelineData>(pipelineData,
Event.CONFIGURED,
failureDetector,
storeDef.getRequiredWrites(),
routingStrategy,
key,
clientZone));
pipeline.addEventAction(Event.STARTED, configureNodes);

pipeline.addEventAction(Event.CONFIGURED,
new PerformSerialPutRequests(pipelineData,
isHintedHandoffEnabled() ? Event.RESPONSES_RECEIVED
Expand Down
@@ -0,0 +1,80 @@
package voldemort.store.routed.action;

import static org.junit.Assert.assertEquals;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import voldemort.TestUtils;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.ThresholdFailureDetector;
import voldemort.routing.RouteToAllLocalPrefStrategy;
import voldemort.routing.RoutingStrategy;
import voldemort.store.routed.BasicPipelineData;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Pipeline.Operation;
import voldemort.utils.ByteArray;

import com.google.common.collect.ImmutableList;

public class ConfigureNodesLocalHostTest {

protected final ByteArray aKey = TestUtils.toByteArray("vold");
protected String currentHost = "";

private List<Node> getTestNodes() {
try {
currentHost = InetAddress.getLocalHost().getHostName();
} catch(UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return ImmutableList.of(node(0, "some-node-1", 2, 7, 14),
node(1, "some-node-2", 1, 10, 13),
node(2, currentHost, 3, 5, 17),
node(3, "some-node-3", 0, 11, 16),
node(4, "some-node-4", 6, 9, 15),
node(5, "some-node-5", 4, 8, 12));
}

private Node node(int id, String hostName, int... tags) {
List<Integer> list = new ArrayList<Integer>(tags.length);
for(int tag: tags)
list.add(tag);
return new Node(id, hostName, 8080, 6666, 6667, list);
}

@Test
public void testConfigureNodesLocalHost() throws Exception {
List<Node> nodes = getTestNodes();
Cluster cluster = new Cluster("test-route-all-local-pref-cluster", nodes);
FailureDetector failureDetector = new ThresholdFailureDetector(new FailureDetectorConfig().setNodes(nodes));
RoutingStrategy routingStrategy = new RouteToAllLocalPrefStrategy(cluster.getNodes());
BasicPipelineData<byte[]> pipelineData = new BasicPipelineData<byte[]>();
ConfigureNodesLocalHost<byte[], BasicPipelineData<byte[]>> action = new ConfigureNodesLocalHost<byte[], BasicPipelineData<byte[]>>(pipelineData,
Event.COMPLETED,
failureDetector,
1,
routingStrategy,
aKey);
Pipeline pipeline = new Pipeline(Operation.GET, 10000, TimeUnit.MILLISECONDS);
pipeline.addEventAction(Event.STARTED, action);
pipeline.addEvent(Event.STARTED);
pipeline.execute();

if(pipelineData.getFatalError() != null)
throw pipelineData.getFatalError();

assertEquals(cluster.getNodes().size(), pipelineData.getNodes().size());
assertEquals(pipelineData.getNodes().get(0).getHost(), currentHost);
}
}

0 comments on commit 35a887e

Please sign in to comment.