Skip to content

Commit

Permalink
Added option to stop redirecting store using JMX
Browse files Browse the repository at this point in the history
  • Loading branch information
rsumbaly committed Jun 2, 2011
1 parent f299001 commit e000d52
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 43 deletions.
6 changes: 3 additions & 3 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -192,9 +192,9 @@ public static void main(String[] args) throws Exception {
if(missing.size() > 0) {
// Not the most elegant way to do this
if(!(missing.equals(ImmutableSet.of("node"))
&& (options.has("add-stores") || options.has("ro-metadata")
|| options.has("set-metadata") || options.has("get-metadata")
|| options.has("check-metadata") || options.has("key-distribution")) || options.has("truncate"))) {
&& (options.has("add-stores") || options.has("delete-store")
|| options.has("ro-metadata") || options.has("set-metadata")
|| options.has("get-metadata") || options.has("check-metadata") || options.has("key-distribution")) || options.has("truncate"))) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(1);
Expand Down
Expand Up @@ -959,30 +959,32 @@ public VAdminProto.DeleteStoreResponse handleDeleteStore(VAdminProto.DeleteStore

if(storeRepository.hasLocalStore(storeName)) {
if(storeName.compareTo(SlopStorageEngine.SLOP_STORE_NAME) == 0) {
storageService.unregisterEngine(storeName,
"slop",
storeRepository.getStorageEngine(storeName));
storageService.unregisterEngine(storeRepository.getStorageEngine(storeName),
false,
"slop");
} else {
// update stores list in metadata store
List<StoreDefinition> oldStoreDefList = metadataStore.getStoreDefList();
List<StoreDefinition> newStoreDefList = new ArrayList<StoreDefinition>();

for(StoreDefinition storeDef: oldStoreDefList) {
boolean isReadOnly = storeDef.getType()
.compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0;
if(storeDef.isView()) {
if(storeDef.getViewTargetStoreName().compareTo(storeName) != 0) {
newStoreDefList.add(storeDef);
} else {
storageService.unregisterEngine(storeDef.getName(),
storeDef.getType(),
storeRepository.getStorageEngine(storeDef.getName()));
storageService.unregisterEngine(storeRepository.getStorageEngine(storeDef.getName()),
isReadOnly,
storeDef.getType());
}
} else {
if(storeDef.getName().compareTo(storeName) != 0) {
newStoreDefList.add(storeDef);
} else {
storageService.unregisterEngine(storeDef.getName(),
storeDef.getType(),
storeRepository.getStorageEngine(storeDef.getName()));
storageService.unregisterEngine(storeRepository.getStorageEngine(storeDef.getName()),
isReadOnly,
storeDef.getType());
}
}
}
Expand Down
81 changes: 58 additions & 23 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -177,7 +177,7 @@ private void initStorageConfig(String configClassName) {

@Override
protected void startInner() {
registerEngine(metadata, false);
registerEngine(metadata, false, "metadata");

/* Initialize storage configurations */
for(String configClassName: voldemortConfig.getStorageConfigurations())
Expand All @@ -202,7 +202,7 @@ protected void startInner() {

SlopStorageEngine slopEngine = new SlopStorageEngine(config.getStore(SlopStorageEngine.SLOP_STORE_NAME),
metadata.getCluster());
registerEngine(slopEngine, false);
registerEngine(slopEngine, false, "slop");
storeRepository.setSlopStore(slopEngine);

if(voldemortConfig.isSlopPusherJobEnabled()) {
Expand Down Expand Up @@ -298,56 +298,73 @@ public void updateRoutingStrategy(RoutingStrategy updatedRoutingStrategy) {

// openStore() should have atomic semantics
try {
registerEngine(engine, isReadOnly);
registerEngine(engine, isReadOnly, storeDef.getType());

if(voldemortConfig.isServerRoutingEnabled())
registerNodeStores(storeDef, metadata.getCluster(), voldemortConfig.getNodeId());

if(storeDef.hasRetentionPeriod())
scheduleCleanupJob(storeDef, engine);
} catch(Exception e) {
unregisterEngine(storeDef.getName(), storeDef.getType(), engine);
unregisterEngine(engine, isReadOnly, storeDef.getType());
throw new VoldemortException(e);
}
}

/**
* Unregister and remove the engine from the storage repository
*
* @param storeName The name of the store to remote
* @param storeType The storage type of the store
* @param engine The actual engine to remove
* @param isReadOnly Is this read-only?
* @param storeType The storage type of the store
*/
public void unregisterEngine(String storeName,
String storeType,
StorageEngine<ByteArray, byte[], byte[]> engine) {
String engineName = engine.getName();
Store<ByteArray, byte[], byte[]> store = storeRepository.removeLocalStore(engineName);
public void unregisterEngine(StorageEngine<ByteArray, byte[], byte[]> engine,
boolean isReadOnly,
String storeType) {
String storeName = engine.getName();
Store<ByteArray, byte[], byte[]> store = storeRepository.removeLocalStore(storeName);

boolean isSlop = storeType.compareTo("slop") == 0;
boolean isView = storeType.compareTo(ViewStorageConfiguration.TYPE_NAME) == 0;
boolean isMetadata = storeName.compareTo(MetadataStore.METADATA_STORE_NAME) == 0;

if(store != null) {
if(voldemortConfig.isStatTrackingEnabled() && voldemortConfig.isJmxEnabled()) {

if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
if(!isSlop && voldemortConfig.isEnableRebalanceService() && !isReadOnly
&& !isMetadata && !isView) {

ObjectName name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
}

}

if(voldemortConfig.isStatTrackingEnabled()) {

ObjectName name = JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
}

}
}
if(voldemortConfig.isServerRoutingEnabled() && !isSlop) {
this.storeRepository.removeRoutedStore(engineName);
this.storeRepository.removeRoutedStore(storeName);
for(Node node: metadata.getCluster().getNodes())
this.storeRepository.removeNodeStore(storeName, node.getId());
}
}

storeRepository.removeStorageEngine(engineName);
storeRepository.removeStorageEngine(storeName);
if(!isView)
engine.truncate();
engine.close();
Expand All @@ -358,28 +375,45 @@ public void unregisterEngine(String storeName,
*
* @param engine Register the storage engine
* @param isReadOnly Boolean indicating if this store is read-only
* @param storeType The type of the store
*/
public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine, boolean isReadOnly) {
public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine,
boolean isReadOnly,
String storeType) {
Cluster cluster = this.metadata.getCluster();
storeRepository.addStorageEngine(engine);

/* Now add any store wrappers that are enabled */
Store<ByteArray, byte[], byte[]> store = engine;

boolean isSlop = store.getName().compareTo(SlopStorageEngine.SLOP_STORE_NAME) == 0;
boolean isMetadata = store.getName().compareTo(MetadataStore.METADATA_STORE_NAME) == 0;
boolean isSlop = storeType.compareTo("slop") == 0;
boolean isView = storeType.compareTo(ViewStorageConfiguration.TYPE_NAME) == 0;

if(voldemortConfig.isVerboseLoggingEnabled())
store = new LoggingStore<ByteArray, byte[], byte[]>(store,
cluster.getName(),
SystemTime.INSTANCE);
if(!isSlop) {
if(voldemortConfig.isEnableRebalanceService() && !isReadOnly && !isMetadata)
if(voldemortConfig.isEnableRebalanceService() && !isReadOnly && !isMetadata && !isView) {
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);

JmxUtils.registerMbean(mbeanServer, JmxUtils.createModelMBean(store), name);
}

}
}

if(voldemortConfig.isMetadataCheckingEnabled())
store = new InvalidMetadataCheckingStore(metadata.getNodeId(), store, metadata);
Expand All @@ -397,6 +431,7 @@ public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine, bool
synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);

JmxUtils.registerMbean(mbeanServer,
JmxUtils.createModelMBean(new StoreStatsJmx(statStore.getStats())),
name);
Expand Down
33 changes: 26 additions & 7 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -18,10 +18,13 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxSetter;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Node;
Expand Down Expand Up @@ -61,6 +64,7 @@ public class RedirectingStore extends DelegatingStore<ByteArray, byte[], byte[]>
private final StoreRepository storeRepository;
private final SocketStoreFactory storeFactory;
private FailureDetector failureDetector;
private AtomicBoolean isRedirectingStoreEnabled;

public RedirectingStore(Store<ByteArray, byte[], byte[]> innerStore,
MetadataStore metadata,
Expand All @@ -72,6 +76,17 @@ public RedirectingStore(Store<ByteArray, byte[], byte[]> innerStore,
this.storeRepository = storeRepository;
this.storeFactory = storeFactory;
this.failureDetector = detector;
this.isRedirectingStoreEnabled = new AtomicBoolean(true);
}

@JmxSetter(name = "setRedirectingStoreEnabled", description = "Enable the redirecting store for this store")
public void setIsRedirectingStoreEnabled(boolean isRedirectingStoreEnabled) {
this.isRedirectingStoreEnabled.set(isRedirectingStoreEnabled);
}

@JmxGetter(name = "isRedirectingStoreEnabled", description = "Get the redirecting store state for this store")
public boolean getIsRedirectingStoreEnabled() {
return this.isRedirectingStoreEnabled.get();
}

@Override
Expand All @@ -91,12 +106,16 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
}

private RebalancePartitionsInfo redirectingKey(ByteArray key) {
return metadata.getRebalancerState().find(getName(),
metadata.getRoutingStrategy(getName())
.getPartitionList(key.get()),
metadata.getCluster()
.getNodeById(metadata.getNodeId())
.getPartitionIds());
if(VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState())
&& isRedirectingStoreEnabled.get()) {
return metadata.getRebalancerState().find(getName(),
metadata.getRoutingStrategy(getName())
.getPartitionList(key.get()),
metadata.getCluster()
.getNodeById(metadata.getNodeId())
.getPartitionIds());
}
return null;
}

@Override
Expand Down Expand Up @@ -156,7 +175,7 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
* The options are:
* <ol>
* <li>
* Delete locally and on remote node as well. The issue iscursor is open in
* Delete locally and on remote node as well. The issue is cursor is open in
* READ_UNCOMMITED mode while rebalancing and can push the value back.</li>
* <li>
* Keep the operation in separate slop store and apply all deletes after
Expand Down
4 changes: 3 additions & 1 deletion src/java/voldemort/utils/RebalanceUtils.java
Expand Up @@ -570,6 +570,7 @@ public static void propagateCluster(AdminClient adminClient, Cluster cluster) {
Set<Integer> completedNodeIds = Sets.newHashSet();
try {
for(Node node: cluster.getNodes()) {
logger.info("Updating cluster definition on remote node " + node);
adminClient.updateRemoteCluster(node.getId(), cluster, latestClock);
logger.info("Updated cluster definition " + cluster + " on remote node "
+ node.getId());
Expand All @@ -583,7 +584,8 @@ public static void propagateCluster(AdminClient adminClient, Cluster cluster) {
currentClusters.get(completedNodeId),
latestClock);
} catch(VoldemortException exception) {
logger.error("Could not revert back on node " + completedNodeId);
logger.error("Could not revert cluster metadata back on node "
+ completedNodeId);
}
}
throw e;
Expand Down

0 comments on commit e000d52

Please sign in to comment.