Skip to content

Commit

Permalink
Adding System store functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman authored and Lei Gao committed Jun 27, 2012
1 parent b59973e commit a3e9359
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 7 deletions.
12 changes: 11 additions & 1 deletion src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -158,6 +158,14 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId) {
return getRawStore(storeName, resolver, clientId, null);
}

@SuppressWarnings("unchecked")
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId,
String customStoresXml) {

logger.info("Client zone-id [" + clientZoneId
+ "] Attempting to obtain metadata for store [" + storeName + "] ");
Expand All @@ -169,7 +177,9 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
// Get cluster and store metadata
String clusterXml = bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY, bootstrapUrls);
Cluster cluster = clusterMapper.readCluster(new StringReader(clusterXml), false);
String storesXml = bootstrapMetadataWithRetries(MetadataStore.STORES_KEY, bootstrapUrls);
String storesXml = customStoresXml;
if(storesXml == null)
storesXml = bootstrapMetadataWithRetries(MetadataStore.STORES_KEY, bootstrapUrls);

if(logger.isDebugEnabled()) {
logger.debug("Obtained cluster metadata xml" + clusterXml);
Expand Down
7 changes: 7 additions & 0 deletions src/java/voldemort/client/CachingStoreClientFactory.java
Expand Up @@ -123,4 +123,11 @@ else if(client instanceof LazyStoreClient<?, ?>) {
logger.warn("Exception during bootstrapAllClients", e);
}
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId,
String storesXml) {
return inner.getRawStore(storeName, resolver, clientId, storesXml);
}
}
10 changes: 9 additions & 1 deletion src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -18,8 +18,8 @@

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry;
import java.util.UUID;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -65,6 +65,7 @@ public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final InconsistencyResolver<Versioned<V>> resolver;
private volatile Store<K, V, Object> store;
private final UUID clientId;
private SystemStore<String, String> sysStore;

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
Expand Down Expand Up @@ -103,6 +104,13 @@ public DefaultStoreClient(String storeName,
public void bootStrap() {
logger.info("Bootstrapping metadata for store " + this.storeName);
this.store = storeFactory.getRawStore(storeName, resolver, clientId);

logger.info("Creating System store");
String systemKey = storeName + "-client";
this.sysStore = new SystemStore<String, String>("voldsys$_client_registry",
this.storeFactory);
sysStore.putSysStore(systemKey, "Registered");
logger.info("Getting value - " + sysStore.getSysStore(systemKey));
}

public boolean delete(K key) {
Expand Down
7 changes: 7 additions & 0 deletions src/java/voldemort/client/MockStoreClientFactory.java
Expand Up @@ -213,4 +213,11 @@ public FailureDetector getFailureDetector() {
return failureDetector;
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId,
String storesXml) {
return null;
}

}
4 changes: 4 additions & 0 deletions src/java/voldemort/client/SocketStoreClientFactory.java
Expand Up @@ -32,6 +32,7 @@
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.FailureDetectorListener;
import voldemort.server.RequestRoutingType;
import voldemort.server.SystemStoreConstants;
import voldemort.store.Store;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.socket.SocketDestination;
Expand Down Expand Up @@ -182,4 +183,7 @@ public void close() {
super.close();
}

public <K, V, T> Store<K, V, T> getSystemStore(String storeName) {
return getRawStore(storeName, null, null, SystemStoreConstants.SYSTEM_STORE_SCHEMA);
}
}
14 changes: 14 additions & 0 deletions src/java/voldemort/client/StoreClientFactory.java
Expand Up @@ -86,6 +86,20 @@ <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId);

/**
* Get the underlying store, not the public StoreClient interface
*
* @param storeName The name of the store
* @param resolver The inconsistency resolver
* @param clientId The unique id of the client
* @param storesXml Custom set of stores containing storeName
* @return The appropriate store
*/
<K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId,
String customStoresXml);

/**
* Close the store client
*/
Expand Down
88 changes: 88 additions & 0 deletions src/java/voldemort/client/SystemStore.java
@@ -0,0 +1,88 @@
package voldemort.client;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.server.SystemStoreConstants;
import voldemort.store.Store;
import voldemort.versioning.InconsistentDataException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

public class SystemStore<K, V> {

private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final SocketStoreClientFactory systemStoreFactory;
private final String storeName;
private volatile Store<K, V, Object> sysStore;

SystemStore(String storeName, StoreClientFactory factory) {
String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length());
if(!SystemStoreConstants.NAME_PREFIX.equals(prefix))
throw new VoldemortException("Illegal system store : " + storeName);
if(!(factory instanceof SocketStoreClientFactory))
throw new VoldemortException("System store cannot be created without a Socket store client factory");

SocketStoreClientFactory clientFactory = (SocketStoreClientFactory) factory;
ClientConfig config = new ClientConfig();
config.setSelectors(1)
.setBootstrapUrls(config.getBootstrapUrls())
.setMaxConnectionsPerNode(2)
.setConnectionTimeout(1500, TimeUnit.MILLISECONDS)
.setSocketTimeout(5000, TimeUnit.MILLISECONDS)
.setRoutingTimeout(5000, TimeUnit.MILLISECONDS)
.setEnableJmx(false)
.setEnablePipelineRoutedStore(true)
.setClientZoneId(config.getClientZoneId());
this.systemStoreFactory = new SocketStoreClientFactory(config);
this.storeName = storeName;
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName);
}

public void putSysStore(K key, V value) throws VoldemortException {
logger.info("Invoking Put for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = getSysStore(key);
if(versioned == null)
versioned = Versioned.value(value, new VectorClock());
else
versioned.setObject(value);
this.sysStore.put(key, versioned, null);
}

public void putSysStore(K key, Versioned<V> value) throws VoldemortException {
logger.info("Invoking Put for key : " + key + " on store name : " + this.storeName);
this.sysStore.put(key, value, null);
}

public Versioned<V> getSysStore(K key) throws VoldemortException {
logger.info("Invoking Get for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = null;
List<Versioned<V>> items = this.sysStore.get(key, null);
if(items.size() == 1)
versioned = items.get(0);
else if(items.size() > 1)
throw new InconsistentDataException("Unresolved versions returned from get(" + key
+ ") = " + items, items);
if(versioned != null)
logger.info("Value for key : " + key + " = " + versioned.getValue()
+ " on store name : " + this.storeName);
else
logger.info("Got null value");
return versioned;
}

public V getValueSysStore(K key) throws VoldemortException {
logger.info("Invoking Get for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = getSysStore(key);
if(versioned != null) {
logger.info("Value for key : " + key + " = " + versioned.getValue()
+ " on store name : " + this.storeName);
return versioned.getValue();
}
return null;
}

}
2 changes: 1 addition & 1 deletion src/java/voldemort/server/SystemStoreConstants.java
Expand Up @@ -5,7 +5,7 @@
*/
public class SystemStoreConstants {

private static final String NAME_PREFIX = "voldsys$_";
public static final String NAME_PREFIX = "voldsys$_";

public static enum SystemStoreName {
voldsys$_client_registry,
Expand Down
49 changes: 45 additions & 4 deletions src/java/voldemort/store/metadata/MetadataStore.java
Expand Up @@ -41,6 +41,7 @@
import voldemort.routing.RouteToAllStrategy;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.server.SystemStoreConstants;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StorageEngine;
import voldemort.store.Store;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class MetadataStore implements StorageEngine<ByteArray, byte[], byte[]> {

public static final String CLUSTER_KEY = "cluster.xml";
public static final String STORES_KEY = "stores.xml";
public static final String SYSTEM_STORES_KEY = "systemStores";
public static final String SERVER_STATE_KEY = "server.state";
public static final String NODE_ID_KEY = "node.id";
public static final String REBALANCING_STEAL_INFO = "rebalancing.steal.info.key";
Expand All @@ -92,6 +94,7 @@ public class MetadataStore implements StorageEngine<ByteArray, byte[], byte[]> {

// helper keys for metadataCacheOnly
private static final String ROUTING_STRATEGY_KEY = "routing.strategy";
private static final String SYSTEM_ROUTING_STRATEGY_KEY = "system.routing.strategy";

public static enum VoldemortState {
NORMAL_SERVER,
Expand Down Expand Up @@ -174,7 +177,8 @@ public synchronized void put(String key, Versioned<Object> value) {
updateRoutingStrategies((Cluster) value.getValue(), getStoreDefList());
} else if(STORES_KEY.equals(key)) {
updateRoutingStrategies(getCluster(), (List<StoreDefinition>) value.getValue());
}
} else if(SYSTEM_STORES_KEY.equals(key))
throw new VoldemortException("Cannot overwrite system store definitions");

} else {
throw new VoldemortException("Unhandled Key:" + key + " for MetadataStore put()");
Expand All @@ -191,8 +195,9 @@ public synchronized void put(String key, Versioned<Object> value) {
public void put(String key, Object value) {
if(METADATA_KEYS.contains(key)) {
VectorClock version = (VectorClock) get(key, null).get(0).getVersion();
put(key, new Versioned<Object>(value, version.incremented(getNodeId(),
System.currentTimeMillis())));
put(key,
new Versioned<Object>(value, version.incremented(getNodeId(),
System.currentTimeMillis())));
} else {
throw new VoldemortException("Unhandled Key:" + key + " for MetadataStore put()");
}
Expand Down Expand Up @@ -297,6 +302,11 @@ public List<StoreDefinition> getStoreDefList() {
return (List<StoreDefinition>) metadataCache.get(STORES_KEY).getValue();
}

@SuppressWarnings("unchecked")
public List<StoreDefinition> getSystemStoreDefList() {
return (List<StoreDefinition>) metadataCache.get(SYSTEM_STORES_KEY).getValue();
}

public int getNodeId() {
return (Integer) (metadataCache.get(NODE_ID_KEY).getValue());
}
Expand All @@ -319,11 +329,21 @@ public RebalancerState getRebalancerState() {
return (RebalancerState) metadataCache.get(REBALANCING_STEAL_INFO).getValue();
}

/*
* First check in the map of regular stores. If not present, check in the
* system stores map.
*/
@SuppressWarnings("unchecked")
public RoutingStrategy getRoutingStrategy(String storeName) {
Map<String, RoutingStrategy> routingStrategyMap = (Map<String, RoutingStrategy>) metadataCache.get(ROUTING_STRATEGY_KEY)
.getValue();
return routingStrategyMap.get(storeName);
RoutingStrategy strategy = routingStrategyMap.get(storeName);
if(strategy == null) {
Map<String, RoutingStrategy> systemRoutingStrategyMap = (Map<String, RoutingStrategy>) metadataCache.get(SYSTEM_ROUTING_STRATEGY_KEY)
.getValue();
strategy = systemRoutingStrategyMap.get(storeName);
}
return strategy;
}

/**
Expand Down Expand Up @@ -362,6 +382,17 @@ private void updateRoutingStrategies(Cluster cluster, List<StoreDefinition> stor
}
}

/*
* Initialize the routing strategy map for system stores. This is used
* during get / put on system stores.
*/
private void initSystemRoutingStrategies(Cluster cluster) {
HashMap<String, RoutingStrategy> routingStrategyMap = createRoutingStrategyMap(cluster,
getSystemStoreDefList());
this.metadataCache.put(SYSTEM_ROUTING_STRATEGY_KEY,
new Versioned<Object>(routingStrategyMap));
}

/**
* Add the steal information to the rebalancer state
*
Expand Down Expand Up @@ -452,6 +483,10 @@ private void init(int nodeId) {
initCache(CLUSTER_KEY);
initCache(STORES_KEY);

// Initialize system store in the metadata cache
initSystemCache();
initSystemRoutingStrategies(getCluster());

initCache(NODE_ID_KEY, nodeId);
if(getNodeId() != nodeId)
throw new RuntimeException("Attempt to start previous node:"
Expand All @@ -473,6 +508,12 @@ private synchronized void initCache(String key) {
metadataCache.put(key, convertStringToObject(key, getInnerValue(key)));
}

// Initialize the metadata cache with system store list
private synchronized void initSystemCache() {
List<StoreDefinition> value = storeMapper.readStoreList(new StringReader(SystemStoreConstants.SYSTEM_STORE_SCHEMA));
metadataCache.put(SYSTEM_STORES_KEY, new Versioned<Object>(value));
}

private void initCache(String key, Object defaultValue) {
try {
initCache(key);
Expand Down
7 changes: 7 additions & 0 deletions test/common/voldemort/StaticStoreClientFactory.java
Expand Up @@ -74,4 +74,11 @@ public FailureDetector getFailureDetector() {
return failureDetector;
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId,
String storesXml) {
return null;
}

}

0 comments on commit a3e9359

Please sign in to comment.