Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added tests for auto-rebootstrap, Failure detector fix to track just …

…one state of the topology (instead of immutable states), added ZenStoreClient
  • Loading branch information...
commit a86a9e5a05cb8afbe94bbd0eb23999e39508b8a8 2 parents fcffeb5 + e77ee0f
Chinmay Soman authored
Showing with 1,041 additions and 706 deletions.
  1. +5 −5 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
  2. +117 −64 src/java/voldemort/VoldemortAdminTool.java
  3. +101 −77 src/java/voldemort/client/AbstractStoreClientFactory.java
  4. +1 −16 src/java/voldemort/client/CachingStoreClientFactory.java
  5. +45 −3 src/java/voldemort/client/ClientConfig.java
  6. +70 −9 src/java/voldemort/client/ClientInfo.java
  7. +3 −4 src/java/voldemort/client/HttpStoreClientFactory.java
  8. +1 −17 src/java/voldemort/client/MockStoreClientFactory.java
  9. +9 −13 src/java/voldemort/client/SocketStoreClientFactory.java
  10. +0 −44 src/java/voldemort/client/StoreClientFactory.java
  11. +25 −12 src/java/voldemort/client/SystemStore.java
  12. +14 −0 src/java/voldemort/client/SystemStoreClientFactory.java
  13. +4 −2 src/java/voldemort/client/SystemStoreRepository.java
  14. +85 −64 src/java/voldemort/client/ZenStoreClient.java
  15. +118 −2 src/java/voldemort/client/protocol/admin/AdminClient.java
  16. +18 −45 src/java/voldemort/client/scheduler/AsyncMetadataVersionManager.java
  17. +52 −8 src/java/voldemort/client/scheduler/ClientRegistryRefresher.java
  18. +18 −25 src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java
  19. +1 −1  src/java/voldemort/cluster/failuredetector/AsyncRecoveryFailureDetector.java
  20. +1 −1  src/java/voldemort/cluster/failuredetector/BannagePeriodFailureDetector.java
  21. +4 −0 src/java/voldemort/cluster/failuredetector/BasicStoreVerifier.java
  22. +6 −0 src/java/voldemort/cluster/failuredetector/ClientStoreVerifier.java
  23. +37 −1 src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java
  24. +4 −0 src/java/voldemort/cluster/failuredetector/ServerStoreVerifier.java
  25. +5 −0 src/java/voldemort/cluster/failuredetector/StoreVerifier.java
  26. +1 −1  src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java
  27. +1 −2  src/java/voldemort/server/storage/StorageService.java
  28. +42 −34 src/java/voldemort/store/configuration/FileBackedCachingStorageEngine.java
  29. +19 −2 src/java/voldemort/store/system/SystemStoreConstants.java
  30. +23 −0 src/java/voldemort/utils/JmxUtils.java
  31. +20 −0 src/java/voldemort/utils/ManifestFileReader.java
  32. +18 −6 src/java/voldemort/utils/MetadataVersionStoreUtils.java
  33. +1 −18 test/common/voldemort/StaticStoreClientFactory.java
  34. +1 −1  test/integration/voldemort/cluster/failuredetector/FlappingTest.java
  35. +1 −1  test/integration/voldemort/cluster/failuredetector/TimedUnavailabilityTest.java
  36. +48 −94 test/unit/voldemort/client/ClientJmxTest.java
  37. +63 −20 test/unit/voldemort/client/ClientRegistryTest.java
  38. +0 −15 test/unit/voldemort/client/DefaultSocketStoreClientTest.java
  39. +40 −81 test/unit/voldemort/client/EndToEndRebootstrapTest.java
  40. +5 −6 test/unit/voldemort/store/system/AsyncMetadataVersionManagerTest.java
  41. +14 −12 test/unit/voldemort/store/system/SystemStoreTest.java
View
10 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
@@ -150,7 +150,7 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));
// 2. Stop all the nodes, then test enough that we can cause the nodes
@@ -159,19 +159,19 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(0, failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertFalse(failureDetector.isAvailable(n));
// 3. Now start the cluster up, test, and make sure everything's OK.
startClusterAsync(hostNames, ec2FailureDetectorTestConfig, nodeIds);
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
failureDetector.waitForAvailability(n);
test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));
}
@@ -252,7 +252,7 @@ private Node getNodeByHostName(String hostName, FailureDetector failureDetector)
throws Exception {
Integer offlineNodeId = nodeIds.get(hostName);
- for(Node n: failureDetector.getConfig().getNodes()) {
+ for(Node n: failureDetector.getConfig().getCluster().getNodes()) {
if(offlineNodeId.equals(n.getId()))
return n;
}
View
181 src/java/voldemort/VoldemortAdminTool.java
@@ -19,6 +19,7 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -36,6 +37,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
@@ -47,7 +49,6 @@
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
-import voldemort.client.SystemStore;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
@@ -56,6 +57,7 @@
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
+import voldemort.serialization.StringSerializer;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
@@ -227,7 +229,10 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("version")
.ofType(Long.class);
-
+ parser.accepts("verify-metadata-version",
+ "Verify the version of Metadata on all the cluster nodes");
+ parser.accepts("synchronize-metadata-version",
+ "Synchronize the metadata versions across all the nodes.");
OptionSet options = parser.parse(args);
if(options.has("help")) {
@@ -243,7 +248,7 @@ public static void main(String[] args) throws Exception {
|| options.has("ro-metadata") || options.has("set-metadata")
|| options.has("get-metadata") || options.has("check-metadata") || options.has("key-distribution"))
|| options.has("truncate") || options.has("clear-rebalancing-metadata")
- || options.has("async") || options.has("native-backup") || options.has("rollback"))) {
+ || options.has("async") || options.has("native-backup") || options.has("rollback") || options.has("verify-metadata-version"))) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(1);
@@ -255,14 +260,13 @@ public static void main(String[] args) throws Exception {
int parallelism = CmdUtils.valueOf(options, "restore", 5);
Integer zoneId = CmdUtils.valueOf(options, "zone", -1);
- AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
+ int zone = zoneId == -1 ? 0 : zoneId;
+ AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), zone);
- // Initialize the system store for stores.xml version
- String[] bootstrapUrls = new String[1];
- bootstrapUrls[0] = url;
- SystemStore<String, String> sysStoreVersion = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
- bootstrapUrls,
- 0);
+ if(options.has("verify-metadata-version")) {
+ checkMetadataVersion(adminClient);
+ return;
+ }
String ops = "";
if(options.has("delete-partitions")) {
@@ -325,11 +329,15 @@ public static void main(String[] args) throws Exception {
}
ops += "o";
}
+ if(options.has("synchronize-metadata-version")) {
+ ops += "z";
+ }
+
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
- + "repair-job, native-backup) must be specified");
+ + "repair-job, native-backup, verify-metadata-version) must be specified");
}
List<String> storeNames = null;
@@ -432,9 +440,6 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.CLUSTER_KEY,
mapper.writeCluster(newCluster));
-
- // Update the cluster.xml version info
- updateMetadataversion(CLUSTER_VERSION_KEY, sysStoreVersion);
} else if(metadataKey.compareTo(MetadataStore.SERVER_STATE_KEY) == 0) {
VoldemortState newState = VoldemortState.valueOf(metadataValue);
executeSetMetadata(nodeId,
@@ -450,16 +455,6 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.STORES_KEY,
mapper.writeStoreList(storeDefs));
-
- /*
- * Update the store metadata version
- *
- * TODO: For now write one version for the entire
- * stores.xml When we split the stores.xml, make this
- * more granular
- */
- updateMetadataversion(STORES_VERSION_KEY, sysStoreVersion);
-
} else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Rebalancing steal info file path incorrect");
@@ -507,12 +502,97 @@ public static void main(String[] args) throws Exception {
long pushVersion = (Long) options.valueOf("version");
executeRollback(nodeId, storeName, pushVersion, adminClient);
}
+ if(ops.contains("z")) {
+ synchronizeMetadataVersion(adminClient, nodeId);
+ }
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
}
}
+ private static String getMetadataVersionsForNode(AdminClient adminClient, int nodeId) {
+ List<Integer> partitionIdList = Lists.newArrayList();
+ for(Node node: adminClient.getAdminClientCluster().getNodes()) {
+ partitionIdList.addAll(node.getPartitionIds());
+ }
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
+ SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
+ partitionIdList,
+ null,
+ true);
+ Serializer<String> serializer = new StringSerializer("UTF8");
+ String keyObject = null;
+ String valueObject = null;
+
+ while(entriesIterator.hasNext()) {
+ try {
+ Pair<ByteArray, Versioned<byte[]>> kvPair = entriesIterator.next();
+ byte[] keyBytes = kvPair.getFirst().get();
+ byte[] valueBytes = kvPair.getSecond().getValue();
+ keyObject = serializer.toObject(keyBytes);
+ if(!keyObject.equals(MetadataVersionStoreUtils.VERSIONS_METADATA_KEY)) {
+ continue;
+ }
+ valueObject = serializer.toObject(valueBytes);
+ } catch(Exception e) {
+ System.err.println("Error while retrieving Metadata versions from node : " + nodeId
+ + ". Exception = \n");
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ return valueObject;
+ }
+
+ private static void checkMetadataVersion(AdminClient adminClient) {
+ Map<Properties, Integer> versionsNodeMap = new HashMap<Properties, Integer>();
+
+ for(Node node: adminClient.getAdminClientCluster().getNodes()) {
+ String valueObject = getMetadataVersionsForNode(adminClient, node.getId());
+ Properties props = new Properties();
+ try {
+ props.load(new ByteArrayInputStream(valueObject.getBytes()));
+ } catch(IOException e) {
+ System.err.println("Error while parsing Metadata versions for node : "
+ + node.getId() + ". Exception = \n");
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ versionsNodeMap.put(props, node.getId());
+ }
+
+ if(versionsNodeMap.keySet().size() > 1) {
+ System.err.println("Mismatching versions detected !!!");
+ for(Entry<Properties, Integer> entry: versionsNodeMap.entrySet()) {
+ System.out.println("**************************** Node: " + entry.getValue()
+ + " ****************************");
+ System.out.println(entry.getKey());
+ }
+ } else {
+ System.err.println("All the nodes have the same metadata versions .");
+ }
+
+ }
+
+ private static void synchronizeMetadataVersion(AdminClient adminClient, int baseNodeId) {
+ String valueObject = getMetadataVersionsForNode(adminClient, baseNodeId);
+ Properties props = new Properties();
+ try {
+ props.load(new ByteArrayInputStream(valueObject.getBytes()));
+ adminClient.setMetadataversion(props);
+ System.out.println("Metadata versions synchronized successfully.");
+ } catch(IOException e) {
+ System.err.println("Error while retrieving Metadata versions from node : " + baseNodeId
+ + ". Exception = \n");
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
private static void executeRollback(Integer nodeId,
String storeName,
long pushVersion,
@@ -749,21 +829,6 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat
}
}
- // Get the metadata version for the given key (cluster or store)
- public static void updateMetadataversion(String versionKey,
- SystemStore<String, String> sysStoreVersion) {
- Properties props = MetadataVersionStoreUtils.getProperties(sysStoreVersion);
- if(props.getProperty(versionKey) != null) {
- System.out.println("Version obtained = " + props.getProperty(versionKey));
- long newValue = Long.parseLong(props.getProperty(versionKey)) + 1;
- props.setProperty(versionKey, Long.toString(newValue));
- } else {
- System.err.println("Current version is null. Assuming version 0.");
- props.setProperty(versionKey, "0");
- }
- MetadataVersionStoreUtils.setProperties(sysStoreVersion, props);
- }
-
public static void executeSetMetadata(Integer nodeId,
AdminClient adminClient,
String key,
@@ -792,21 +857,9 @@ public static void executeSetMetadata(Integer nodeId,
System.currentTimeMillis());
nodeIds.add(nodeId);
}
- for(Integer currentNodeId: nodeIds) {
- System.out.println("Setting "
- + key
- + " for "
- + adminClient.getAdminClientCluster()
- .getNodeById(currentNodeId)
- .getHost()
- + ":"
- + adminClient.getAdminClientCluster()
- .getNodeById(currentNodeId)
- .getId());
- adminClient.updateRemoteMetadata(currentNodeId,
- key,
- Versioned.value(value.toString(), updatedVersion));
- }
+ adminClient.updateRemoteMetadata(nodeIds,
+ key,
+ Versioned.value(value.toString(), updatedVersion));
}
private static void executeROMetadata(Integer nodeId,
@@ -1002,9 +1055,9 @@ private static void executeFetchEntries(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
- // add system store to the map so they can be fetched when specified
- // explicitly
- storeDefinitionMap.putAll(getSystemStoreDef());
+ // add system stores to the map so they can be fetched when
+ // specified explicitly
+ storeDefinitionMap.putAll(getSystemStoreDefs());
}
// Pick up all the partitions
@@ -1049,13 +1102,13 @@ private static void executeFetchEntries(Integer nodeId,
}
}
- private static Map<String, StoreDefinition> getSystemStoreDef() {
- Map<String, StoreDefinition> sysStoreDef = Maps.newHashMap();
+ private static Map<String, StoreDefinition> getSystemStoreDefs() {
+ Map<String, StoreDefinition> sysStoreDefMap = Maps.newHashMap();
List<StoreDefinition> storesDefs = SystemStoreConstants.getAllSystemStoreDefs();
for(StoreDefinition def: storesDefs) {
- sysStoreDef.put(def.getName(), def);
+ sysStoreDefMap.put(def.getName(), def);
}
- return sysStoreDef;
+ return sysStoreDefMap;
}
private static void executeUpdateEntries(Integer nodeId,
@@ -1255,9 +1308,9 @@ private static void executeFetchKeys(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
- // add system store to the map so they can be fetched when specified
- // explicitly
- storeDefinitionMap.putAll(getSystemStoreDef());
+ // add system stores to the map so they can be fetched when
+ // specified explicitly
+ storeDefinitionMap.putAll(getSystemStoreDefs());
}
// Pick up all the partitions
View
178 src/java/voldemort/client/AbstractStoreClientFactory.java
@@ -19,10 +19,10 @@
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -33,6 +33,7 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
+import voldemort.common.service.SchedulerService;
import voldemort.serialization.ByteArraySerializer;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.SerializationException;
@@ -58,6 +59,7 @@
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.utils.ByteArray;
import voldemort.utils.JmxUtils;
+import voldemort.utils.SystemTime;
import voldemort.versioning.ChainedResolver;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.TimeBasedInconsistencyResolver;
@@ -100,7 +102,10 @@
private final RoutedStoreFactory routedStoreFactory;
private final int clientZoneId;
private final String clientContextName;
- private final AtomicInteger sequencer;
+ private final AtomicInteger clientSequencer;
+ private final HashSet<SchedulerService> clientAsyncServiceRepo;
+
+ private Cluster cluster;
public AbstractStoreClientFactory(ClientConfig config) {
this.config = config;
@@ -111,68 +116,84 @@ public AbstractStoreClientFactory(ClientConfig config) {
this.bootstrapUrls = validateUrls(config.getBootstrapUrls());
this.isJmxEnabled = config.isJmxEnabled();
this.requestFormatType = config.getRequestFormatType();
- this.jmxId = jmxIdCounter.getAndIncrement();
+ this.jmxId = getNextJmxId();
this.maxBootstrapRetries = config.getMaxBootstrapRetries();
this.stats = new StoreStats();
this.clientZoneId = config.getClientZoneId();
- this.clientContextName = (null == config.getClientContextName() ? ""
- : config.getClientContextName());
+ this.clientContextName = config.getClientContextName();
this.routedStoreFactory = new RoutedStoreFactory(config.isPipelineRoutedStoreEnabled(),
threadPool,
config.getTimeoutConfig());
- this.sequencer = new AtomicInteger(0);
+ this.clientSequencer = new AtomicInteger(0);
+ this.clientAsyncServiceRepo = new HashSet<SchedulerService>();
if(this.isJmxEnabled) {
JmxUtils.registerMbean(threadPool,
JmxUtils.createObjectName(JmxUtils.getPackageName(threadPool.getClass()),
JmxUtils.getClassName(threadPool.getClass())
- + "."
- + clientContextName
+ jmxId()));
JmxUtils.registerMbean(new StoreStatsJmx(stats),
JmxUtils.createObjectName("voldemort.store.stats.aggregate",
- clientContextName + ".aggregate-perf"
- + jmxId()));
+ "aggregate-perf" + jmxId()));
}
}
+ public int getNextJmxId() {
+ return jmxIdCounter.getAndIncrement();
+ }
+
+ public int getCurrentJmxId() {
+ return jmxIdCounter.get();
+ }
+
public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
return getStoreClient(storeName, null);
}
public <K, V> StoreClient<K, V> getStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
+
+ StoreClient<K, V> client = null;
if(this.config.isDefaultClientEnabled()) {
- return new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
+ client = new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
+ } else {
+
+ SchedulerService service = new SchedulerService(config.getAsyncJobThreadPoolSize(),
+ SystemTime.INSTANCE,
+ true);
+ clientAsyncServiceRepo.add(service);
+
+ client = new ZenStoreClient<K, V>(storeName,
+ resolver,
+ this,
+ 3,
+ clientContextName,
+ clientSequencer.getAndIncrement(),
+ config,
+ service);
}
- return new ZenStoreClient<K, V>(storeName,
- resolver,
- this,
- 3,
- clientContextName,
- sequencer.getAndIncrement(),
- config);
+ return client;
}
@SuppressWarnings("unchecked")
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId) {
- return getRawStore(storeName, resolver, clientId, null, null);
+ InconsistencyResolver<Versioned<V>> resolver) {
+ return getRawStore(storeName, resolver, null, null, null);
}
@SuppressWarnings("unchecked")
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId,
String customStoresXml,
- String clusterXmlString) {
+ String clusterXmlString,
+ FailureDetector fd) {
- logger.info("Client zone-id [" + clientZoneId
- + "] Attempting to obtain metadata for store [" + storeName + "] ");
if(logger.isDebugEnabled()) {
+ logger.debug("Client zone-id [" + clientZoneId
+ + "] Attempting to obtain metadata for store [" + storeName + "] ");
+
for(URI uri: bootstrapUrls) {
logger.debug("Client Bootstrap url [" + uri + "]");
}
@@ -180,13 +201,14 @@ public AbstractStoreClientFactory(ClientConfig config) {
// Get cluster and store metadata
String clusterXml = clusterXmlString;
if(clusterXml == null) {
- logger.debug("*************************** Fetching cluster.xml !!! ******************************************");
+ logger.debug("Fetching cluster.xml ...");
clusterXml = bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY, bootstrapUrls);
}
- Cluster cluster = clusterMapper.readCluster(new StringReader(clusterXml), false);
+
+ this.cluster = clusterMapper.readCluster(new StringReader(clusterXml), false);
String storesXml = customStoresXml;
if(storesXml == null) {
- logger.debug("*************************** Fetching stores.xml !!! ******************************************");
+ logger.debug("Fetching stores.xml ...");
storesXml = bootstrapMetadataWithRetries(MetadataStore.STORES_KEY, bootstrapUrls);
}
@@ -207,7 +229,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
}
if(logger.isDebugEnabled()) {
- logger.debug(cluster.toString(true));
+ logger.debug(this.cluster.toString(true));
logger.debug(storeDef.toString());
}
boolean repairReads = !storeDef.isView();
@@ -221,7 +243,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
if(storeDef.hasHintedHandoffStrategyType())
slopStores = Maps.newHashMap();
- for(Node node: cluster.getNodes()) {
+ for(Node node: this.cluster.getNodes()) {
Store<ByteArray, byte[], byte[]> store = getStore(storeDef.getName(),
node.getHost(),
getPort(node),
@@ -246,7 +268,18 @@ public AbstractStoreClientFactory(ClientConfig config) {
}
}
- Store<ByteArray, byte[], byte[]> store = routedStoreFactory.create(cluster,
+ /*
+ * Check if we need to retrieve a reference to the failure detector. For
+ * system stores - the FD reference would be passed in.
+ */
+ FailureDetector failureDetectorRef = fd;
+ if(failureDetectorRef == null) {
+ failureDetectorRef = getFailureDetector();
+ } else {
+ logger.debug("Using existing failure detector.");
+ }
+
+ Store<ByteArray, byte[], byte[]> store = routedStoreFactory.create(this.cluster,
storeDef,
clientMapping,
nonblockingStores,
@@ -254,7 +287,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
nonblockingSlopStores,
repairReads,
clientZoneId,
- getFailureDetector(),
+ failureDetectorRef,
isJmxEnabled);
store = new LoggingStore(store);
@@ -263,13 +296,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
store = statStore;
JmxUtils.registerMbean(new StoreStatsJmx(statStore.getStats()),
JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
- clientContextName
- + "."
- + store.getName()
- + jmxId()
- + (null == clientId ? ""
- : "."
- + clientId.toString())));
+ store.getName() + jmxId()));
}
if(storeDef.getKeySerializer().hasCompression()
@@ -303,36 +330,46 @@ public AbstractStoreClientFactory(ClientConfig config) {
return serializedStore;
}
- public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
- return getRawStore(storeName, resolver, null);
- }
-
protected ClientConfig getConfig() {
return config;
}
protected abstract FailureDetector initFailureDetector(final ClientConfig config,
- final Collection<Node> nodes);
+ Cluster cluster);
public FailureDetector getFailureDetector() {
+ if(this.cluster == null) {
+ String clusterXml = bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY,
+ bootstrapUrls);
+ this.cluster = clusterMapper.readCluster(new StringReader(clusterXml), false);
+ }
+
// first check: avoids locking as the field is volatile
FailureDetector result = failureDetector;
+
if(result == null) {
- String clusterXml = bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY,
- bootstrapUrls);
- Cluster cluster = clusterMapper.readCluster(new StringReader(clusterXml), false);
+ logger.debug("Failure detector is null. Creating a new FD.");
synchronized(this) {
// second check: avoids double initialization
result = failureDetector;
if(result == null) {
- failureDetector = result = initFailureDetector(config, cluster.getNodes());
+ failureDetector = result = initFailureDetector(config, this.cluster);
JmxUtils.registerMbean(failureDetector,
JmxUtils.createObjectName(JmxUtils.getPackageName(failureDetector.getClass()),
JmxUtils.getClassName(failureDetector.getClass())
+ jmxId()));
}
}
+ } else {
+
+ /*
+ * The existing failure detector might have an old state
+ */
+ logger.debug("Failure detector already exists. Updating the state and flushing cached verifier stores.");
+ synchronized(this) {
+ failureDetector.getConfig().setCluster(this.cluster);
+ failureDetector.getConfig().getStoreVerifier().flushCachedStores();
+ }
}
return result;
@@ -453,38 +490,25 @@ public void close() {
if(failureDetector != null)
failureDetector.destroy();
- }
- /* Give a unique id to avoid jmx clashes */
- public String jmxId() {
- return jmxId == 0 ? "" : Integer.toString(jmxId);
+ stopClientAsyncSchedulers();
}
- /**
- * Generate a unique client ID based on: 0. clientContext, if specified; 1.
- * storeName 2. run path 3. client sequence
- *
- * @param storeName the name of the store the client is created for
- * @param contextName the name of the client context
- * @param clientSequence the client sequence number
- * @return unique client ID
- */
- public static UUID generateClientId(ClientInfo clientInfo) {
- String contextName = clientInfo.getContext();
- int clientSequence = clientInfo.getClientSequence();
-
- String newLine = System.getProperty("line.separator");
- StringBuilder context = new StringBuilder(contextName == null ? "" : contextName);
- context.append(0 == clientSequence ? "" : ("." + clientSequence));
- context.append(".").append(clientInfo.getStoreName());
- context.append("@").append(clientInfo.getLocalHostName()).append(":");
- context.append(clientInfo.getDeploymentPath()).append(newLine);
-
- if(logger.isDebugEnabled()) {
- logger.debug(context.toString());
+ private void stopClientAsyncSchedulers() {
+ Iterator<SchedulerService> it = clientAsyncServiceRepo.iterator();
+ while(it.hasNext()) {
+ it.next().stop();
}
+ clientAsyncServiceRepo.clear();
+ }
- return UUID.nameUUIDFromBytes(context.toString().getBytes());
+ protected String getClientContext() {
+ return clientContextName;
+ }
+
+ /* Give a unique id to avoid jmx clashes */
+ public String jmxId() {
+ return jmxId == 0 ? "" : Integer.toString(jmxId);
}
}
View
17 src/java/voldemort/client/CachingStoreClientFactory.java
@@ -17,7 +17,6 @@
package voldemort.client;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -76,13 +75,7 @@ public CachingStoreClientFactory(StoreClientFactory inner) {
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
- return getRawStore(storeName, resolver, null);
- }
-
- public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId) {
- return inner.getRawStore(storeName, resolver, clientId);
+ return getRawStore(storeName, resolver);
}
public void close() {
@@ -123,12 +116,4 @@ else if(client instanceof LazyStoreClient<?, ?>) {
logger.warn("Exception during bootstrapAllClients", e);
}
}
-
- public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId,
- String storesXml,
- String clusterXml) {
- return inner.getRawStore(storeName, resolver, clientId, storesXml, clusterXml);
- }
}
View
48 src/java/voldemort/client/ClientConfig.java
@@ -81,12 +81,13 @@
private long failureDetectorRequestLengthThreshold = socketTimeoutMs;
private volatile int maxBootstrapRetries = 2;
- private volatile String clientContextName = "default";
+ private volatile String clientContextName = "";
/* 5 second check interval, in ms */
private volatile long asyncCheckMetadataInterval = 5000;
/* 12 hr refresh internval, in seconds */
private volatile int clientRegistryRefreshInterval = 3600 * 12;
+ private volatile int asyncJobThreadPoolSize = 2;
public ClientConfig() {}
@@ -130,6 +131,8 @@ public ClientConfig() {}
public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context";
public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval";
private static final String USE_DEFAULT_CLIENT = "use_default_client";
+ public static final String CLIENT_REGISTRY_REFRESH_INTERVAL = "client_registry_refresh_interval";
+ public static final String ASYNC_JOB_THREAD_POOL_SIZE = "async_job_thread_pool_size";
/**
* Instantiate the client config using a properties file
@@ -293,11 +296,19 @@ private void setProperties(Properties properties) {
this.setMaxBootstrapRetries(props.getInt(MAX_BOOTSTRAP_RETRIES));
if(props.containsKey(CLIENT_CONTEXT_NAME)) {
- this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null));
+ this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME));
}
if(props.containsKey(ASYNC_CHECK_METADATA_INTERVAL)) {
- this.setAsyncMetadataRefreshInMs(props.getLong(ASYNC_CHECK_METADATA_INTERVAL, 5000));
+ this.setAsyncMetadataRefreshInMs(props.getLong(ASYNC_CHECK_METADATA_INTERVAL));
+ }
+
+ if(props.containsKey(CLIENT_REGISTRY_REFRESH_INTERVAL)) {
+ this.setClientRegistryUpdateInSecs(props.getInt(CLIENT_REGISTRY_REFRESH_INTERVAL));
+ }
+
+ if(props.containsKey(ASYNC_JOB_THREAD_POOL_SIZE)) {
+ this.setAsyncJobThreadPoolSize(props.getInt(ASYNC_JOB_THREAD_POOL_SIZE));
}
}
@@ -727,6 +738,11 @@ public String getClientContextName() {
return clientContextName;
}
+ /**
+ * Set the client context name
+ *
+ * @param clientContextName The name of client context
+ */
public ClientConfig setClientContextName(String clientContextName) {
this.clientContextName = clientContextName;
return this;
@@ -736,7 +752,13 @@ public long getAsyncMetadataRefreshInMs() {
return asyncCheckMetadataInterval;
}
+ /**
+ * Set the interval on which client checks for metadata change on servers
+ *
+ * @param asyncCheckMetadataInterval The metadata change interval
+ */
public ClientConfig setAsyncMetadataRefreshInMs(long asyncCheckMetadataInterval) {
+
this.asyncCheckMetadataInterval = asyncCheckMetadataInterval;
return this;
}
@@ -745,8 +767,28 @@ public int getClientRegistryUpdateInSecs() {
return this.clientRegistryRefreshInterval;
}
+ /**
+ * Set the interval on which client refreshes its corresponding entry of the
+ * client registry on the servers
+ *
+ * @param clientRegistryRefreshInterval The refresh interval in seconds
+ */
public ClientConfig setClientRegistryUpdateInSecs(int clientRegistryRefrshInterval) {
this.clientRegistryRefreshInterval = clientRegistryRefrshInterval;
return this;
}
+
+ public int getAsyncJobThreadPoolSize() {
+ return asyncJobThreadPoolSize;
+ }
+
+ /**
+ * Set the # of threads for the async. job thread pool
+ *
+ * @param asyncJobThreadPoolSize The max # of threads in the async job
+ */
+ public ClientConfig setAsyncJobThreadPoolSize(int asyncJobThreadPoolSize) {
+ this.asyncJobThreadPoolSize = asyncJobThreadPoolSize;
+ return this;
+ }
}
View
79 src/java/voldemort/client/ClientInfo.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.client;
import java.io.File;
@@ -5,9 +21,15 @@
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+/**
+ * A collection of voldemort client side information what will be populated into
+ * the voldemort cluster when a client is connected to a voldemort cluster
+ *
+ */
public class ClientInfo implements Serializable {
/**
@@ -25,12 +47,14 @@
private String deploymentPath;
private long updateTime;
private String releaseVersion;
+ private ClientConfig config;
public ClientInfo(String storeName,
String clientContext,
int clientSequence,
long bootstrapTime,
- String version) {
+ String version,
+ ClientConfig config) {
this.bootstrapTime = bootstrapTime;
this.storeName = storeName;
this.context = clientContext;
@@ -39,6 +63,7 @@ public ClientInfo(String storeName,
this.deploymentPath = createDeploymentPath();
this.updateTime = bootstrapTime;
this.releaseVersion = version;
+ this.config = config;
if(logger.isDebugEnabled()) {
logger.debug(this.toString());
@@ -132,6 +157,14 @@ public synchronized String getReleaseVersion() {
return this.releaseVersion;
}
+ public synchronized ClientConfig getClientConfig() {
+ return this.config;
+ }
+
+ /**
+ * At the moment we're not checking if the Config objects are similar. TODO:
+ * reevaluate in the future.
+ */
@Override
public boolean equals(Object object) {
if(this == object)
@@ -154,14 +187,42 @@ public boolean equals(Object object) {
@Override
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("boostrapTime[").append(bootstrapTime).append("], ");
- builder.append("context[").append(context).append("], ");
- builder.append("deploymentPath[").append(deploymentPath).append("], ");
- builder.append("localHostName[").append(localHostName).append("], ");
- builder.append("sequence[").append(sequence).append("], ");
- builder.append("storeName[").append(storeName).append("], ");
- builder.append("updateTime[").append(updateTime).append("], ");
- builder.append("releaseVersion[").append(releaseVersion).append("]");
+ builder.append("bootstrapTime=").append(bootstrapTime).append("\n");
+ builder.append("context=").append(context).append("\n");
+ builder.append("deploymentPath=").append(deploymentPath).append("\n");
+ builder.append("localHostName=").append(localHostName).append("\n");
+ builder.append("sequence=").append(sequence).append("\n");
+ builder.append("storeName=").append(storeName).append("\n");
+ builder.append("updateTime=").append(updateTime).append("\n");
+ builder.append("releaseVersion=").append(releaseVersion).append("\n");
+
+ /**
+ * Append the Client Config information. Right now we only track the
+ * following fields max_connections, max_total_connections,
+ * connection_timeout_ms, socket_timeout_ms, routing_timeout_ms,
+ * client_zone_id, failuredetector_implementation
+ *
+ */
+ builder.append("max_connections=")
+ .append(this.config.getMaxConnectionsPerNode())
+ .append("\n");
+ builder.append("max_total_connections=")
+ .append(this.config.getMaxTotalConnections())
+ .append("\n");
+ builder.append("connection_timeout_ms=")
+ .append(this.config.getConnectionTimeout(TimeUnit.MILLISECONDS))
+ .append("\n");
+ builder.append("socket_timeout_ms=")
+ .append(this.config.getSocketTimeout(TimeUnit.MILLISECONDS))
+ .append("\n");
+ builder.append("routing_timeout_ms=")
+ .append(this.config.getRoutingTimeout(TimeUnit.MILLISECONDS))
+ .append("\n");
+ builder.append("client_zone_id=").append(this.config.getClientZoneId()).append("\n");
+ builder.append("failuredetector_implementation=")
+ .append(this.config.getFailureDetectorImplementation())
+ .append("\n");
+
return builder.toString();
}
}
View
7 src/java/voldemort/client/HttpStoreClientFactory.java
@@ -19,7 +19,6 @@
import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
import java.net.URI;
-import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpVersion;
@@ -35,6 +34,7 @@
import voldemort.client.protocol.RequestFormatFactory;
import voldemort.client.protocol.RequestFormatType;
+import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.ClientStoreVerifier;
import voldemort.cluster.failuredetector.FailureDetector;
@@ -102,8 +102,7 @@ public HttpStoreClientFactory(ClientConfig config) {
}
@Override
- protected FailureDetector initFailureDetector(final ClientConfig config,
- final Collection<Node> nodes) {
+ protected FailureDetector initFailureDetector(final ClientConfig config, Cluster cluster) {
ClientStoreVerifier storeVerifier = new ClientStoreVerifier() {
@Override
@@ -116,7 +115,7 @@ protected FailureDetector initFailureDetector(final ClientConfig config,
};
- FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setNodes(nodes)
+ FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setCluster(cluster)
.setStoreVerifier(storeVerifier);
return create(failureDetectorConfig, config.isJmxEnabled());
View
18 src/java/voldemort/client/MockStoreClientFactory.java
@@ -18,7 +18,6 @@
import java.io.StringReader;
import java.util.List;
-import java.util.UUID;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.NoopFailureDetector;
@@ -107,8 +106,7 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
}
public <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V1>> resolver,
- UUID clientId) {
+ InconsistencyResolver<Versioned<V1>> resolver) {
if(this.storesXml != null)
return getRawStore(storeName);
@@ -133,11 +131,6 @@ public MockStoreClientFactory(Serializer<?> keySerializer,
return consistentStore;
}
- public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver) {
- return getRawStore(storeName, resolver, null);
- }
-
private <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName) {
List<StoreDefinition> storeDefs = storeMapper.readStoreList(new StringReader(storesXml));
StoreDefinition storeDef = null;
@@ -212,13 +205,4 @@ public void close() {
public FailureDetector getFailureDetector() {
return failureDetector;
}
-
- public <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId,
- String storesXml,
- String clusterXml) {
- return null;
- }
-
}
View
22 src/java/voldemort/client/SocketStoreClientFactory.java
@@ -19,13 +19,13 @@
import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
import java.net.URI;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
+import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.ClientStoreVerifier;
import voldemort.cluster.failuredetector.FailureDetector;
@@ -39,7 +39,6 @@
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.store.system.SystemStoreConstants;
import voldemort.utils.ByteArray;
-import voldemort.utils.JmxUtils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.Versioned;
@@ -70,11 +69,6 @@ public SocketStoreClientFactory(ClientConfig config) {
config.getSocketBufferSize(),
config.getSocketKeepAlive(),
config.isJmxEnabled());
- if(config.isJmxEnabled())
- JmxUtils.registerMbean(storeFactory,
- JmxUtils.createObjectName(JmxUtils.getPackageName(storeFactory.getClass()),
- JmxUtils.getClassName(storeFactory.getClass())
- + jmxId()));
}
@Override
@@ -120,8 +114,7 @@ public SocketStoreClientFactory(ClientConfig config) {
}
@Override
- protected FailureDetector initFailureDetector(final ClientConfig config,
- final Collection<Node> nodes) {
+ protected FailureDetector initFailureDetector(final ClientConfig config, Cluster cluster) {
failureDetectorListener = new FailureDetectorListener() {
public void nodeAvailable(Node node) {
@@ -145,6 +138,7 @@ public void nodeUnavailable(Node node) {
@Override
protected Store<ByteArray, byte[], byte[]> getStoreInternal(Node node) {
+ logger.debug("Returning a new store verifier for node: " + node);
return SocketStoreClientFactory.this.getStore(MetadataStore.METADATA_STORE_NAME,
node.getHost(),
node.getSocketPort(),
@@ -153,7 +147,7 @@ public void nodeUnavailable(Node node) {
};
- FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setNodes(nodes)
+ FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setCluster(cluster)
.setStoreVerifier(storeVerifier);
return create(failureDetectorConfig, true, failureDetectorListener);
@@ -183,11 +177,13 @@ public void close() {
super.close();
}
- public <K, V, T> Store<K, V, T> getSystemStore(String storeName, String clusterXml) {
+ public <K, V, T> Store<K, V, T> getSystemStore(String storeName,
+ String clusterXml,
+ FailureDetector fd) {
return getRawStore(storeName,
null,
- null,
SystemStoreConstants.SYSTEM_STORE_SCHEMA,
- clusterXml);
+ clusterXml,
+ fd);
}
}
View
44 src/java/voldemort/client/StoreClientFactory.java
@@ -16,8 +16,6 @@
package voldemort.client;
-import java.util.UUID;
-
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.Store;
import voldemort.versioning.InconsistencyResolver;
@@ -75,48 +73,6 @@
InconsistencyResolver<Versioned<V>> resolver);
/**
- * Get the underlying store, not the public StoreClient interface
- *
- * @param storeName The name of the store
- * @param resolver The inconsistency resolver
- * @param clientId The unique id of the client
- * @return The appropriate store
- */
- <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId);
-
- /**
-<<<<<<< HEAD
-<<<<<<< HEAD
-<<<<<<< HEAD
-=======
->>>>>>> Adding System store functionality
-=======
->>>>>>> leigao/client-registry
- * Get the underlying store, not the public StoreClient interface
- *
- * @param storeName The name of the store
- * @param resolver The inconsistency resolver
- * @param clientId The unique id of the client
- * @param storesXml Custom set of stores containing storeName
- * @return The appropriate store
- */
- <K, V, T> Store<K, V, T> getRawStore(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- UUID clientId,
- String customStoresXml,
- String clusterXmlString);
-
- /**
-<<<<<<< HEAD
-<<<<<<< HEAD
-=======
->>>>>>> add clientId for voldemort client
-=======
->>>>>>> Adding System store functionality
-=======
->>>>>>> leigao/client-registry
* Close the store client
*/
public void close();
View
37 src/java/voldemort/client/SystemStore.java
@@ -6,6 +6,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
+import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.Store;
import voldemort.store.system.SystemStoreConstants;
import voldemort.versioning.InconsistentDataException;
@@ -21,10 +22,14 @@
private volatile Store<K, V, Object> sysStore;
public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID) {
- this(storeName, bootstrapUrls, clientZoneID, null);
+ this(storeName, bootstrapUrls, clientZoneID, null, null);
}
- public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID, String clusterXml) {
+ public SystemStore(String storeName,
+ String[] bootstrapUrls,
+ int clientZoneID,
+ String clusterXml,
+ FailureDetector fd) {
String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length());
if(!SystemStoreConstants.NAME_PREFIX.equals(prefix))
throw new VoldemortException("Illegal system store : " + storeName);
@@ -39,9 +44,9 @@ public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID, S
.setEnableJmx(false)
.setEnablePipelineRoutedStore(true)
.setClientZoneId(clientZoneID);
- this.systemStoreFactory = new SocketStoreClientFactory(config);
+ this.systemStoreFactory = new SystemStoreClientFactory(config);
this.storeName = storeName;
- this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml);
+ this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
}
public Version putSysStore(K key, V value) {
@@ -56,8 +61,10 @@ public Version putSysStore(K key, V value) {
this.sysStore.put(key, versioned, null);
version = versioned.getVersion();
} catch(Exception e) {
- logger.info("Exception caught during putSysStore:");
- e.printStackTrace();
+ logger.info("Exception caught during putSysStore: " + e);
+ if(logger.isDebugEnabled()) {
+ e.printStackTrace();
+ }
}
return version;
}
@@ -69,8 +76,10 @@ public Version putSysStore(K key, Versioned<V> value) {
this.sysStore.put(key, value, null);
version = value.getVersion();
} catch(Exception e) {
- logger.info("Exception caught during putSysStore:");
- e.printStackTrace();
+ logger.info("Exception caught during putSysStore: " + e);
+ if(logger.isDebugEnabled()) {
+ e.printStackTrace();
+ }
}
return version;
}
@@ -91,8 +100,10 @@ else if(items.size() > 1)
else
logger.debug("Got null value");
} catch(Exception e) {
- logger.info("Exception caught during getSysStore:");
- e.printStackTrace();
+ logger.info("Exception caught during getSysStore: " + e);
+ if(logger.isDebugEnabled()) {
+ e.printStackTrace();
+ }
}
return versioned;
}
@@ -108,8 +119,10 @@ public V getValueSysStore(K key) {
value = versioned.getValue();
}
} catch(Exception e) {
- logger.info("Exception caught during getSysStore:");
- e.printStackTrace();
+ logger.info("Exception caught during getSysStore: " + e);
+ if(logger.isDebugEnabled()) {
+ e.printStackTrace();
+ }
}
return value;
}
View
14 src/java/voldemort/client/SystemStoreClientFactory.java
@@ -0,0 +1,14 @@
+package voldemort.client;
+
+public class SystemStoreClientFactory extends SocketStoreClientFactory {
+
+ public SystemStoreClientFactory(ClientConfig config) {
+ super(config);
+ }
+
+ @Override
+ public int getNextJmxId() {
+ // for system store, we don't increment jmx id
+ return getCurrentJmxId();
+ }
+}
View
6 src/java/voldemort/client/SystemStoreRepository.java
@@ -2,6 +2,7 @@
import java.util.concurrent.ConcurrentHashMap;
+import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.system.SystemStoreConstants;
/*
@@ -22,12 +23,13 @@ public void addSystemStore(SystemStore newSysStore, String storeName) {
this.sysStoreMap.put(storeName, newSysStore);
}
- public void createSystemStores(ClientConfig config, String clusterXml) {
+ public void createSystemStores(ClientConfig config, String clusterXml, FailureDetector fd) {
for(SystemStoreConstants.SystemStoreName storeName: SystemStoreConstants.SystemStoreName.values()) {
SystemStore sysStore = new SystemStore(storeName.name(),
config.getBootstrapUrls(),
config.getClientZoneId(),
- clusterXml);
+ clusterXml,
+ fd);
this.sysStoreMap.put(storeName.name(), sysStore);
}
}
View
149 src/java/voldemort/client/ZenStoreClient.java
@@ -19,8 +19,8 @@
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
-import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -35,7 +35,6 @@
import voldemort.store.system.SystemStoreConstants;
import voldemort.utils.JmxUtils;
import voldemort.utils.ManifestFileReader;
-import voldemort.utils.SystemTime;
import voldemort.utils.Utils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.Version;
@@ -62,26 +61,27 @@
private final AbstractStoreClientFactory abstractStoreFactory;
private final ClientConfig config;
private final SystemStoreRepository sysRepository;
- private final UUID clientId;
+ private final String clientId;
private final SchedulerService scheduler;
private ClientInfo clientInfo;
private String clusterXml;
private AsyncMetadataVersionManager asyncCheckMetadata = null;
public ZenStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- AbstractStoreClientFactory storeFactory,
- int maxMetadataRefreshAttempts) {
- this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null);
+ InconsistencyResolver<Versioned<V>> resolver,
+ AbstractStoreClientFactory storeFactory,
+ int maxMetadataRefreshAttempts) {
+ this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null, null);
}
public ZenStoreClient(String storeName,
- InconsistencyResolver<Versioned<V>> resolver,
- AbstractStoreClientFactory storeFactory,
- int maxMetadataRefreshAttempts,
- String clientContext,
- int clientSequence,
- ClientConfig config) {
+ InconsistencyResolver<Versioned<V>> resolver,
+ AbstractStoreClientFactory storeFactory,
+ int maxMetadataRefreshAttempts,
+ String clientContext,
+ int clientSequence,
+ ClientConfig config,
+ SchedulerService scheduler) {
super();
this.storeName = Utils.notNull(storeName);
@@ -93,20 +93,18 @@ public ZenStoreClient(String storeName,
clientContext,
clientSequence,
System.currentTimeMillis(),
- ManifestFileReader.getReleaseVersion());
- this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo);
+ ManifestFileReader.getReleaseVersion(),
+ config);
+ this.clientId = generateClientId(clientInfo);
this.config = config;
this.sysRepository = new SystemStoreRepository();
- this.scheduler = new SchedulerService(ASYNC_THREADS_COUNT,
- SystemTime.INSTANCE,
- ALLOW_INTERRUPT_ASYNC);
+ this.scheduler = scheduler;
+
// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
JmxUtils.getClassName(this.getClass())
- + "." + clientContext + "."
- + storeName + "."
- + clientId.toString()));
+ + "." + storeName));
// Bootstrap this client
bootStrap();
@@ -117,28 +115,32 @@ public ZenStoreClient(String storeName,
config.getAsyncMetadataRefreshInMs());
}
- registerClient(clientId.toString(), config.getClientRegistryUpdateInSecs());
- logger.info("Voldemort client created: " + clientId.toString() + "\n" + clientInfo);
+ registerClient(clientId, config.getClientRegistryUpdateInSecs());
+ logger.info("Voldemort client created: " + clientId + "\n" + clientInfo);
}
private void registerClient(String jobId, int interval) {
- SystemStore<String, String> clientRegistry = this.sysRepository.getClientRegistryStore();
- if(null != clientRegistry) {
+ if(this.sysRepository.getClientRegistryStore() != null) {
try {
- Version version = clientRegistry.putSysStore(clientId.toString(),
- clientInfo.toString());
- ClientRegistryRefresher refresher = new ClientRegistryRefresher(clientRegistry,
- clientId.toString(),
+ Version version = this.sysRepository.getClientRegistryStore()
+ .putSysStore(clientId, clientInfo.toString());
+ ClientRegistryRefresher refresher = new ClientRegistryRefresher(this.sysRepository,
+ clientId,
clientInfo,
version);
GregorianCalendar cal = new GregorianCalendar();
cal.add(Calendar.SECOND, interval);
- scheduler.schedule(jobId + refresher.getClass().getName(),
- refresher,
- cal.getTime(),
- interval * 1000);
- logger.info("Client registry refresher thread started, refresh frequency: "
- + interval + " seconds");
+
+ if(scheduler != null) {
+ scheduler.schedule(jobId + refresher.getClass().getName(),
+ refresher,
+ cal.getTime(),
+ TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS));
+ logger.info("Client registry refresher thread started, refresh interval: "
+ + interval + " seconds");
+ } else {
+ logger.warn("Client registry won't run because scheduler service is not configured");
+ }
} catch(Exception e) {
logger.warn("Unable to register with the cluster due to the following error:", e);
}
@@ -151,9 +153,9 @@ private void registerClient(String jobId, int interval) {
private AsyncMetadataVersionManager scheduleMetadataChecker(String jobId, long interval) {
AsyncMetadataVersionManager asyncCheckMetadata = null;
SystemStore<String, Long> versionStore = this.sysRepository.getVersionStore();
- if(versionStore == null)
+ if(versionStore == null) {
logger.warn("Metadata version system store not found. Cannot run Metadata version check thread.");
- else {
+ } else {
// Create a callback for re-bootstrapping the client
Callable<Void> rebootstrapCallback = new Callable<Void>() {
@@ -165,18 +167,20 @@ public Void call() throws Exception {
};
asyncCheckMetadata = new AsyncMetadataVersionManager(this.sysRepository,
- rebootstrapCallback,
- null);
+ rebootstrapCallback);
// schedule the job to run every 'checkInterval' period, starting
// now
- scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(),
- asyncCheckMetadata,
- new Date(),
- interval);
- logger.info("Metadata version check thread started. Frequency = Every " + interval
- + " ms");
-
+ if(scheduler != null) {
+ scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(),
+ asyncCheckMetadata,
+ new Date(),
+ interval);
+ logger.info("Metadata version check thread started. Frequency = Every " + interval
+ + " ms");
+ } else {
+ logger.warn("Metadata version check thread won't start because the scheduler service is not configured.");
+ }
}
return asyncCheckMetadata;
}
@@ -192,15 +196,13 @@ public void bootStrap() {
*/
clusterXml = abstractStoreFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY);
- this.store = abstractStoreFactory.getRawStore(storeName,
- resolver,
- clientId,
- null,
- clusterXml);
+ this.store = abstractStoreFactory.getRawStore(storeName, resolver, null, clusterXml, null);
// Create system stores
logger.info("Creating system stores for store " + this.storeName);
- this.sysRepository.createSystemStores(this.config, this.clusterXml);
+ this.sysRepository.createSystemStores(this.config,
+ this.clusterXml,
+ abstractStoreFactory.getFailureDetector());
/*
* Update to the new metadata versions (in case we got here from Invalid
@@ -211,30 +213,49 @@ public void bootStrap() {
asyncCheckMetadata.updateMetadataVersions();
}
+ /*
+ * Every time we bootstrap, update the bootstrap time
+ */
if(this.clientInfo != null) {
this.clientInfo.setBootstrapTime(System.currentTimeMillis());
}
}
- public void close() {
- scheduler.stopInner();
- }
-
- public UUID getClientId() {
+ public String getClientId() {
return clientId;
}
- @JmxGetter(name = "getStoreMetadataVersion")
- public String getStoreMetadataVersion() {
- String result = "Current Store Metadata Version : "
- + this.asyncCheckMetadata.getStoreMetadataVersion();
- return result;
- }
-
@JmxGetter(name = "getClusterMetadataVersion")
public String getClusterMetadataVersion() {
String result = "Current Cluster Metadata Version : "
+ this.asyncCheckMetadata.getClusterMetadataVersion();
return result;
}
+
+ /**
+ * Generate a unique client ID based on: 0. clientContext, if specified; 1.
+ * storeName; 2. deployment path; 3. client sequence
+ *
+ * @param storeName the name of the store the client is created for
+ * @param contextName the name of the client context
+ * @param clientSequence the client sequence number
+ * @return unique client ID
+ */
+ public String generateClientId(ClientInfo clientInfo) {
+ String contextName = clientInfo.getContext();
+ int clientSequence = clientInfo.getClientSequence();
+
+ String newLine = System.getProperty("line.separator");
+ StringBuilder context = new StringBuilder(contextName == null ? "" : contextName);
+ context.append(0 == clientSequence ? "" : ("." + clientSequence));
+ context.append(".").append(clientInfo.getStoreName());
+ context.append("@").append(clientInfo.getLocalHostName()).append(":");
+ context.append(clientInfo.getDeploymentPath()).append(newLine);
+
+ if(logger.isDebugEnabled()) {
+ logger.debug(context.toString());
+ }
+
+ return context.toString();
+ }
}
View
120 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -31,8 +31,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -45,12 +46,13 @@
import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
+import voldemort.client.SystemStore;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
-import voldemort.client.protocol.pb.VProto;
import voldemort.client.protocol.pb.VAdminProto.RebalancePartitionInfoMap;
+import voldemort.client.protocol.pb.VProto;
import voldemort.client.protocol.pb.VProto.RequestType;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
@@ -76,6 +78,7 @@
import voldemort.store.views.ViewStorageConfiguration;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
+import voldemort.utils.MetadataVersionStoreUtils;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
@@ -125,12 +128,17 @@
private static final long PRINT_STATS_INTERVAL = 5 * 60 * 1000; // 5 minutes
private final AdminClientConfig adminClientConfig;
+ private static final String CLUSTER_VERSION_KEY = "cluster.xml";
+ private static final int DEFAULT_ZONE_ID = 0;
+
public final static List<String> restoreStoreEngineBlackList = Arrays.asList(MysqlStorageConfiguration.TYPE_NAME,
ReadOnlyStorageConfiguration.TYPE_NAME,
ViewStorageConfiguration.TYPE_NAME);
private Cluster currentCluster;
+ private SystemStore<String, String> sysStoreVersion = null;
+
/**
* Create an instance of AdminClient given a URL of a node in the cluster.
* The bootstrap URL is used to get the cluster metadata.
@@ -152,6 +160,7 @@ public AdminClient(String bootstrapURL, AdminClientConfig adminClientConfig) {
this.networkClassLoader = new NetworkClassLoader(Thread.currentThread()
.getContextClassLoader());
this.adminClientConfig = adminClientConfig;
+ initSystemStoreClient(bootstrapURL, DEFAULT_ZONE_ID);
}
/**
@@ -175,6 +184,67 @@ public AdminClient(Cluster cluster, AdminClientConfig adminClientConfig) {
this.networkClassLoader = new NetworkClassLoader(Thread.currentThread()
.getContextClassLoader());
this.adminClientConfig = adminClientConfig;
+
+ Node node = cluster.getNodeById(0);
+ String bootstrapURL = "tcp://" + node.getHost() + ":" + node.getSocketPort();
+ initSystemStoreClient(bootstrapURL, DEFAULT_ZONE_ID);
+ }
+
+ /**
+ * Wrapper for the actual AdminClient constructor given the URL of a node in
+ * the cluster.
+ *
+ * @param bootstrapURL URL pointing to the bootstrap node
+ * @param adminClientConfig Configuration for AdminClient specifying client
+ * parameters eg. <br>
+ * <ul>
+ * <t>
+ * <li>number of threads</li>
+ * <li>number of sockets per node</li>
+ * <li>socket buffer size</li>
+ * </ul>
+ * @param zoneID The primary Zone ID for the purpose of the SystemStore
+ */
+ public AdminClient(String bootstrapURL, AdminClientConfig adminClientConfig, int zoneID) {
+ this(bootstrapURL, adminClientConfig);
+ initSystemStoreClient(bootstrapURL, zoneID);
+ }
+
+ private void initSystemStoreClient(String bootstrapURL, int zoneID) {
+ String[] bootstrapUrls = new String[1];
+ bootstrapUrls[0] = bootstrapURL;
+ this.sysStoreVersion = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
+ bootstrapUrls,
+ zoneID);
+ }
+
+ /**
+ * Increment the metadata version for the given key (cluster or store)
+ *
+ * @param versionKey The metadata key for which Version should be
+ * incremented
+ */
+ public void updateMetadataversion(String versionKey) {
+ Properties props = MetadataVersionStoreUtils.getProperties(this.sysStoreVersion);
+ if(props.getProperty(versionKey) != null) {
+ logger.debug("Version obtained = " + props.getProperty(versionKey));
+ long newValue = Long.parseLong(props.getProperty(versionKey)) + 1;
+ props.setProperty(versionKey, Long.toString(newValue));
+ } else {
+ logger.debug("Current version is null. Assuming version 0.");
+ props.setProperty(versionKey, "0");
+ }
+ MetadataVersionStoreUtils.setProperties(this.sysStoreVersion, props);
+ }
+
+ /**
+ * Set the metadata versions to the given set
+ *
+ * @param newProperties The new metadata versions to be set across all the
+ * nodes in the cluster
+ */
+ public void setMetadataversion(Properties newProperties) {
+ MetadataVersionStoreUtils.setProperties(this.sysStoreVersion, newProperties);
}
private Cluster getClusterFromBootstrapURL(String bootstrapURL) {
@@ -1454,6 +1524,44 @@ public void updateRemoteMetadata(int remoteNodeId, String key, Versioned<String>
}
/**
+ * Wrapper for updateRemoteMetadata function used against a single Node It
+ * basically loops over the entire list of Nodes that we need to execute the
+ * required operation against. It also increments the version of the
+ * corresponding metadata in the system store.
+ * <p>
+ *
+ * Metadata keys can be one of {@link MetadataStore#METADATA_KEYS}<br>
+ * eg.<br>
+ * <li>cluster metadata (cluster.xml as string)
+ * <li>stores definitions (stores.xml as string)
+ * <li>Server states <br <br>
+ * See {@link voldemort.store.metadata.MetadataStore} for more information.
+ *
+ * @param remoteNodeId Id of the node
+ * @param key Metadata key to update
+ * @param value Value for the metadata key
+ *
+ * */
+ public void updateRemoteMetadata(List<Integer> remoteNodeIds,
+ String key,
+ Versioned<String> value) {
+ for(Integer currentNodeId: remoteNodeIds) {
+ System.out.println("Setting " + key + " for "
+ + getAdminClientCluster().getNodeById(currentNodeId).getHost() + ":"
+ + getAdminClientCluster().getNodeById(currentNodeId).getId());
+ updateRemoteMetadata(currentNodeId, key, value);
+ }
+
+ /*
+ * Assuming everything is fine, we now increment the metadata version
+ * for the key
+ */
+ if(key.equals(CLUSTER_VERSION_KEY)) {
+ updateMetadataversion(key);
+ }
+ }
+
+ /**
* Get the metadata on a remote node.
* <p>
* Metadata keys can be one of {@link MetadataStore#METADATA_KEYS}<br>
@@ -2289,6 +2397,14 @@ public void rebalanceStateChange(Cluster existingCluster,
throw new VoldemortRebalancingException("Got exceptions from nodes "
+ exceptions.keySet());
}
+
+ /*
+ * If everything went smoothly, update the version of the cluster
+ * metadata
+ */
+ if(changeClusterMetadata) {
+ updateMetadataversion(CLUSTER_VERSION_KEY);
+ }
} catch(Exception e) {
if(rollback) {
View
63 src/java/voldemort/client/scheduler/AsyncMetadataVersionManager.java
@@ -13,21 +13,19 @@
* The AsyncMetadataVersionManager is used to track the Metadata version on the
* cluster and if necessary Re-bootstrap the client.
*
- * During initialization, it will retrieve the current version of the store (or
- * the entire stores.xml depending upon granularity) and cluster.xml and then
- * periodically check whether this has been updated. During init if the initial
- * version turns out to be null, it means that no change has been done to that
- * store since it was created. In this case, we assume version '0'.
+ * During initialization, it will retrieve the current version of the
+ * cluster.xml and then periodically check whether this has been updated. During
+ * init if the initial version turns out to be null, it means that no change has
+ * been done to that store since it was created. In this case, we assume version
+ * '0'.
*/
public class AsyncMetadataVersionManager implements Runnable {
- public static String STORES_VERSION_KEY = "stores.xml";
public static final String CLUSTER_VERSION_KEY = "cluster.xml";
public static final String VERSIONS_METADATA_STORE = "metadata-versions";
private final Logger logger = Logger.getLogger(this.getClass());
- private Long currentStoreVersion;
private Long currentClusterVersion;
private final Callable<Void> storeClientThunk;
private final SystemStoreRepository sysRepository;
@@ -39,38 +37,22 @@
public boolean isActive = false;
public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
- Callable<Void> storeClientThunk,
- String storeName) {
+ Callable<Void> storeClientThunk) {
this.sysRepository = sysRepository;
- if(storeName != null) {
- STORES_VERSION_KEY = storeName;
- }
// Get the properties object from the system store (containing versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.sysRepository.getMetadataVersionStore());
try {
- this.currentStoreVersion = getCurrentVersion(STORES_VERSION_KEY, versionProps);
-
- } catch(Exception e) {
- logger.error("Exception while getting currentStoreVersion : " + e);
- }
-
- try {
this.currentClusterVersion = getCurrentVersion(CLUSTER_VERSION_KEY, versionProps);
} catch(Exception e) {
logger.error("Exception while getting currentClusterVersion : " + e);
}
// If the received version is null, assume version 0
- if(currentStoreVersion == null) {
- currentStoreVersion = new Long(0);
- }
if(currentClusterVersion == null) {
currentClusterVersion = new Long(0);
}
-
- logger.debug("Initial stores.xml version = " + this.currentStoreVersion);
logger.debug("Initial cluster.xml version = " + this.currentClusterVersion);
this.storeClientThunk = storeClientThunk;
@@ -122,16 +104,12 @@ public void run() {
// Get the properties object from the system store (containing versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.sysRepository.getMetadataVersionStore());
-
- Long newStoresVersion = fetchNewVersion(STORES_VERSION_KEY,
- currentStoreVersion,
- versionProps);
Long newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
currentClusterVersion,
versionProps);
// If nothing has been updated, continue
- if((newStoresVersion != null) || (newClusterVersion != null)) {
+ if(newClusterVersion != null) {
logger.info("Metadata version mismatch detected.");
@@ -145,30 +123,26 @@ public void run() {
// do nothing, continue.
}
- try {
- if(newStoresVersion != null) {
- logger.info("Updating stores version");
- currentStoreVersion = newStoresVersion;
- }
+ /*
+ * Do another check for mismatch here since the versions might have
+ * been updated while we were sleeping
+ */
+ if(!newClusterVersion.equals(currentClusterVersion)) {
- if(newClusterVersion != null) {
+ try {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;
- }
- this.storeClientThunk.call();
- } catch(Exception e) {
- e.printStackTrace();
- logger.info(e.getMessage());
+ this.storeClientThunk.call();
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.info(e.getMessage());
+ }
}
}
}
- public Long getStoreMetadataVersion() {
- return this.currentStoreVersion;
- }
-
public Long getClusterMetadataVersion() {
return this.currentClusterVersion;
}
@@ -176,7 +150,6 @@ public Long getClusterMetadataVersion() {
// Fetch the latest versions for cluster and store
public void updateMetadataVersions() {
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.sysRepository.getMetadataVersionStore());
- this.currentStoreVersion = fetchNewVersion(STORES_VERSION_KEY, null, versionProps);
this.currentClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY, null, versionProps);
}
}
View
60 src/java/voldemort/client/scheduler/ClientRegistryRefresher.java
@@ -1,42 +1,86 @@
+/*
+ * Copyright 2008-2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.client.scheduler;
import org.apache.log4j.Logger;
import voldemort.client.ClientInfo;
-import voldemort.client.SystemStore;
+import voldemort.client.SystemStoreRepository;
+import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
+/**
+ * An async. job that keeps client registry refreshed while the client is
+ * connected to the cluster
+ *
+ */
public class ClientRegistryRefresher implements Runnable {
private final Logger logger = Logger.getLogger(this.getClass());
- private final SystemStore<String, String> clientRegistry;
+ SystemStoreRepository systemStoreRepository;
private ClientInfo clientInfo;
private final String clientId;
private Version lastVersion;
+ private boolean hadConflict;
- public ClientRegistryRefresher(SystemStore<String, String> clientRegistry,
+ public ClientRegistryRefresher(SystemStoreRepository sysRepository,
String clientId,
ClientInfo clientInfo,
Version version) {
- this.clientRegistry = clientRegistry;
+ this.systemStoreRepository = sysRepository;
this.clientInfo = clientInfo;
this.clientId = clientId;
this.lastVersion = version;
+ this.hadConflict = false;
logger.info("Initial version obtained from client registry: " + version);
}
public void run() {
+
+ if(hadConflict) {
+ // if we previously had a conflict during update, we will try to get
+ // a newer version before update this time. This case shall not
+ // happen under regular circumstances. But it is just avoid update
+ // keeping failing when strange situations occur.
+ lastVersion = this.systemStoreRepository.getClientRegistryStore()
+ .getSysStore(clientId)
+ .getVersion();
+ hadConflict = false;
+ }
clientInfo.setUpdateTime(System.currentTimeMillis());