Skip to content

Commit

Permalink
add a java file to define all system store constants, including syste…
Browse files Browse the repository at this point in the history
…m store defs
  • Loading branch information
Lei Gao committed Jun 27, 2012
1 parent ea0c80b commit cd8b942
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 2 deletions.
58 changes: 58 additions & 0 deletions src/java/voldemort/server/SystemStoreConstants.java
@@ -0,0 +1,58 @@
package voldemort.server;

/**
* The various system stores
*/
public class SystemStoreConstants {

private static final String NAME_PREFIX = "voldsys$_";

public static enum SystemStoreName {
voldsys$_client_registry,
voldsys$_client_store_definition;
}

public static final String SYSTEM_STORE_SCHEMA = "<stores>"
+ " <store>"
+ " <name>voldsys$_client_registry</name>"
+ " <routing-strategy>zone-routing</routing-strategy>"
+ " <hinted-handoff-strategy>proximity-handoff</hinted-handoff-strategy>"
+ " <persistence>memory</persistence>"
+ " <routing>client</routing>"
+ " <replication-factor>4</replication-factor>"
+ " <zone-replication-factor>"
+ " <replication-factor zone-id=\"0\">2</replication-factor>"
+ " <replication-factor zone-id=\"1\">2</replication-factor>"
+ " </zone-replication-factor>"
+ " <required-reads>1</required-reads>"
+ " <required-writes>1</required-writes>"
+ " <key-serializer>"
+ " <type>string</type>"
+ " </key-serializer>"
+ " <value-serializer>"
+ " <type>string</type>"
+ " </value-serializer>"
+ " <retention-days>7</retention-days>"
+ " </store>"
+ " <store>"
+ " <name>voldsys$_client_store_definition</name>"
+ " <routing-strategy>zone-routing</routing-strategy>"
+ " <hinted-handoff-strategy>proximity-handoff</hinted-handoff-strategy>"
+ " <persistence>memory</persistence>"
+ " <routing>client</routing>"
+ " <replication-factor>1</replication-factor>"
+ " <required-reads>1</required-reads>"
+ " <required-writes>1</required-writes>"
+ " <key-serializer>"
+ " <type>string</type>"
+ " </key-serializer>"
+ " <value-serializer>"
+ " <type>string</type>"
+ " </value-serializer>"
+ " <retention-days>7</retention-days>"
+ " </store>" + "</stores>";

public static boolean isSystemStore(String storeName) {
return (null == storeName ? false : storeName.startsWith(NAME_PREFIX));
}
}
Expand Up @@ -32,6 +32,7 @@ protected StoreRepository getStoreRepository() {
}

protected Store<ByteArray, byte[], byte[]> getStore(String name, RequestRoutingType type) {

switch(type) {
case ROUTED:
return storeRepository.getRoutedStore(name);
Expand Down
200 changes: 200 additions & 0 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -18,6 +18,7 @@

import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;

import java.io.StringReader;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Calendar;
Expand Down Expand Up @@ -54,6 +55,7 @@
import voldemort.server.RequestRoutingType;
import voldemort.server.ServiceType;
import voldemort.server.StoreRepository;
import voldemort.server.SystemStoreConstants;
import voldemort.server.VoldemortConfig;
import voldemort.server.scheduler.DataCleanupJob;
import voldemort.server.scheduler.SchedulerService;
Expand All @@ -65,6 +67,7 @@
import voldemort.store.StoreDefinition;
import voldemort.store.invalidmetadata.InvalidMetadataCheckingStore;
import voldemort.store.logging.LoggingStore;
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStoreListener;
import voldemort.store.nonblockingstore.NonblockingStore;
Expand Down Expand Up @@ -97,6 +100,7 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.VectorClockInconsistencyResolver;
import voldemort.versioning.Versioned;
import voldemort.xml.StoreDefinitionsMapper;

/**
* The service responsible for managing all storage types
Expand Down Expand Up @@ -170,6 +174,7 @@ public StorageService(StoreRepository storeRepository,
}

private void initStorageConfig(String configClassName) {
// add the configurations of the storage engines needed by user stores
try {
Class<?> configClass = ReflectUtils.loadClass(configClassName);
StorageConfiguration configuration = (StorageConfiguration) ReflectUtils.callConstructor(configClass,
Expand All @@ -187,6 +192,37 @@ private void initStorageConfig(String configClassName) {

if(storageConfigs.size() == 0)
throw new ConfigurationException("No storage engine has been enabled!");

// now, add the configurations of the storage engines needed by system
// stores, if not yet exist
initSystemStorageConfig();
}

private void initSystemStorageConfig() {
// add InMemoryStorage used by voldsys$_client_registry
if(!storageConfigs.containsKey(InMemoryStorageConfiguration.TYPE_NAME)) {
storageConfigs.put(InMemoryStorageConfiguration.TYPE_NAME,
new InMemoryStorageConfiguration());
}

// add FileStorage config here
}

private void initSystemStores() {
List<StoreDefinition> storesDefs = (new StoreDefinitionsMapper()).readStoreList(new StringReader(SystemStoreConstants.SYSTEM_STORE_SCHEMA));

// TODO: replication factor can't now be determined unless the
// cluster.xml is made available to the server at runtime. So we need to
// set them here after load they are loaded
updateRepFactor(storesDefs);

for(StoreDefinition storeDef: storesDefs) {
openSystemStore(storeDef);
}
}

private void updateRepFactor(List<StoreDefinition> storesDefs) {
// need impl
}

@Override
Expand All @@ -203,6 +239,9 @@ protected void startInner() {
metadata.getStoreDefList(),
storeRepository));

/* Initialize system stores */
initSystemStores();

/* Register slop store */
if(voldemortConfig.isSlopEnabled()) {

Expand Down Expand Up @@ -282,6 +321,167 @@ protected void startInner() {
logger.info("All stores initialized.");
}

public void openSystemStore(StoreDefinition storeDef) {

logger.info("Opening system store '" + storeDef.getName() + "' (" + storeDef.getType()
+ ").");

StorageConfiguration config = storageConfigs.get(storeDef.getType());
if(config == null)
throw new ConfigurationException("Attempt to open system store " + storeDef.getName()
+ " but " + storeDef.getType()
+ " storage engine has not been enabled.");

final StorageEngine<ByteArray, byte[], byte[]> engine = config.getStore(storeDef.getName());

// Noted that there is no read-only processing as for user stores.

// openStore() should have atomic semantics
try {
registerSystemEngine(engine);

if(voldemortConfig.isServerRoutingEnabled())
registerNodeStores(storeDef, metadata.getCluster(), voldemortConfig.getNodeId());

if(storeDef.hasRetentionPeriod())
scheduleCleanupJob(storeDef, engine);
} catch(Exception e) {
unregisterSystemEngine(engine);
throw new VoldemortException(e);
}
}

public void registerSystemEngine(StorageEngine<ByteArray, byte[], byte[]> engine) {

Cluster cluster = this.metadata.getCluster();
storeRepository.addStorageEngine(engine);

/* Now add any store wrappers that are enabled */
Store<ByteArray, byte[], byte[]> store = engine;

if(voldemortConfig.isVerboseLoggingEnabled())
store = new LoggingStore<ByteArray, byte[], byte[]>(store,
cluster.getName(),
SystemTime.INSTANCE);
/* TODO: Do we really need rebalancing for system stores? */
if(voldemortConfig.isEnableRebalanceService()) {
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(cluster.getName()
+ "."
+ JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
JmxUtils.registerMbean(mbeanServer, JmxUtils.createModelMBean(store), name);
}

}
}

if(voldemortConfig.isMetadataCheckingEnabled())
store = new InvalidMetadataCheckingStore(metadata.getNodeId(), store, metadata);

if(voldemortConfig.isStatTrackingEnabled()) {
StatTrackingStore statStore = new StatTrackingStore(store, this.storeStats);
store = statStore;
if(voldemortConfig.isJmxEnabled()) {

MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(metadata.getCluster().getName()
+ "."
+ JmxUtils.getPackageName(store.getClass()),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);

JmxUtils.registerMbean(mbeanServer,
JmxUtils.createModelMBean(new StoreStatsJmx(statStore.getStats())),
name);
}
}
}

storeRepository.addLocalStore(store);
}

public void unregisterSystemEngine(StorageEngine<ByteArray, byte[], byte[]> engine) {
String storeName = engine.getName();
Store<ByteArray, byte[], byte[]> store = storeRepository.removeLocalStore(storeName);

if(store != null) {
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();

if(voldemortConfig.isEnableRebalanceService()) {

ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(metadata.getCluster().getName()
+ "."
+ JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
}

}

if(voldemortConfig.isStatTrackingEnabled()) {
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(metadata.getCluster().getName()
+ "."
+ JmxUtils.getPackageName(store.getClass()),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
}

}
}
if(voldemortConfig.isServerRoutingEnabled()) {
this.storeRepository.removeRoutedStore(storeName);
for(Node node: metadata.getCluster().getNodes())
this.storeRepository.removeNodeStore(storeName, node.getId());
}
}

storeRepository.removeStorageEngine(storeName);
// engine.truncate(); why truncate here when unregister? Isn't close
// good enough?
engine.close();
}

public void openStore(StoreDefinition storeDef) {

logger.info("Opening store '" + storeDef.getName() + "' (" + storeDef.getType() + ").");
Expand Down
4 changes: 3 additions & 1 deletion src/java/voldemort/store/StoreDefinition.java
Expand Up @@ -22,6 +22,7 @@

import voldemort.client.RoutingTier;
import voldemort.serialization.SerializerDefinition;
import voldemort.server.SystemStoreConstants;
import voldemort.store.slop.strategy.HintedHandoffStrategyType;
import voldemort.utils.Utils;

Expand Down Expand Up @@ -139,7 +140,8 @@ else if(requiredWrites > replicationFactor)
if(retentionPeriodDays != null && retentionPeriodDays < 0)
throw new IllegalArgumentException("Retention days must be non-negative.");

if(zoneReplicationFactor != null && zoneReplicationFactor.size() != 0) {
if(!SystemStoreConstants.isSystemStore(name) && zoneReplicationFactor != null
&& zoneReplicationFactor.size() != 0) {

if(zoneCountReads == null || zoneCountReads < 0)
throw new IllegalArgumentException("Zone Counts reads must be non-negative / non-null");
Expand Down
4 changes: 3 additions & 1 deletion src/java/voldemort/xml/StoreDefinitionsMapper.java
Expand Up @@ -46,6 +46,7 @@
import voldemort.routing.RoutingStrategyType;
import voldemort.serialization.Compression;
import voldemort.serialization.SerializerDefinition;
import voldemort.server.SystemStoreConstants;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.StoreUtils;
Expand Down Expand Up @@ -227,7 +228,8 @@ private StoreDefinition readStore(Element store) {
retentionThrottleRate = Integer.parseInt(throttleRate.getText());
}

if(routingStrategyType.compareTo(RoutingStrategyType.ZONE_STRATEGY) == 0) {
if(routingStrategyType.compareTo(RoutingStrategyType.ZONE_STRATEGY) == 0
&& !SystemStoreConstants.isSystemStore(name)) {
if(zoneCountReads == null || zoneCountWrites == null || zoneReplicationFactor == null) {
throw new MappingException("Have not set one of the following correctly for store '"
+ name
Expand Down

0 comments on commit cd8b942

Please sign in to comment.