diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index 7aec774c57..baac31465a 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -998,9 +998,9 @@ private static void executeFetchEntries(Integer nodeId, stores = Lists.newArrayList(); stores.addAll(storeDefinitionMap.keySet()); } else { - // add system store to the map so they can be fetched when specified - // explicitly - storeDefinitionMap.putAll(getSystemStoreDef()); + // add system stores to the map so they can be fetched when + // specified explicitly + storeDefinitionMap.putAll(getSystemStoreDefs()); } // Pick up all the partitions @@ -1045,13 +1045,13 @@ private static void executeFetchEntries(Integer nodeId, } } - private static Map getSystemStoreDef() { - Map sysStoreDef = Maps.newHashMap(); + private static Map getSystemStoreDefs() { + Map sysStoreDefMap = Maps.newHashMap(); List storesDefs = SystemStoreConstants.getAllSystemStoreDefs(); for(StoreDefinition def: storesDefs) { - sysStoreDef.put(def.getName(), def); + sysStoreDefMap.put(def.getName(), def); } - return sysStoreDef; + return sysStoreDefMap; } private static void executeUpdateEntries(Integer nodeId, @@ -1251,9 +1251,9 @@ private static void executeFetchKeys(Integer nodeId, stores = Lists.newArrayList(); stores.addAll(storeDefinitionMap.keySet()); } else { - // add system store to the map so they can be fetched when specified - // explicitly - storeDefinitionMap.putAll(getSystemStoreDef()); + // add system stores to the map so they can be fetched when + // specified explicitly + storeDefinitionMap.putAll(getSystemStoreDefs()); } // Pick up all the partitions diff --git a/src/java/voldemort/client/AbstractStoreClientFactory.java b/src/java/voldemort/client/AbstractStoreClientFactory.java index af455040a5..cc14819937 100644 --- a/src/java/voldemort/client/AbstractStoreClientFactory.java +++ b/src/java/voldemort/client/AbstractStoreClientFactory.java @@ -458,7 +458,7 @@ public String jmxId() { /** * Generate a unique client ID based on: 0. clientContext, if specified; 1. - * storeName 2. run path 3. client sequence + * storeName; 2. deployment path; 3. client sequence * * @param storeName the name of the store the client is created for * @param contextName the name of the client context diff --git a/src/java/voldemort/client/ClientConfig.java b/src/java/voldemort/client/ClientConfig.java index 8d8e27d755..04b8b41e0b 100644 --- a/src/java/voldemort/client/ClientConfig.java +++ b/src/java/voldemort/client/ClientConfig.java @@ -84,6 +84,7 @@ public class ClientConfig { private volatile long asyncCheckMetadataInterval = 5000; /* 12 hr refresh internval, in seconds */ private volatile int clientRegistryRefreshInterval = 3600 * 12; + private volatile int asyncJobThreadPoolSize = 2; public ClientConfig() {} @@ -126,6 +127,8 @@ public ClientConfig() {} public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries"; public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context"; public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval"; + public static final String CLIENT_REGISTRY_REFRESH_INTERVAL = "client_registry_refresh_interval"; + public static final String ASYNC_JOB_THREAD_POOL_SIZE = "async_job_thread_pool_size"; /** * Instantiate the client config using a properties file @@ -286,11 +289,19 @@ private void setProperties(Properties properties) { this.setMaxBootstrapRetries(props.getInt(MAX_BOOTSTRAP_RETRIES)); if(props.containsKey(CLIENT_CONTEXT_NAME)) { - this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null)); + this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME)); } if(props.containsKey(ASYNC_CHECK_METADATA_INTERVAL)) { - this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL, 5000)); + this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL)); + } + + if(props.containsKey(CLIENT_REGISTRY_REFRESH_INTERVAL)) { + this.setClientRegistryRefreshInterval(props.getInt(CLIENT_REGISTRY_REFRESH_INTERVAL)); + } + + if(props.containsKey(ASYNC_JOB_THREAD_POOL_SIZE)) { + this.setClientRegistryRefreshInterval(props.getInt(ASYNC_JOB_THREAD_POOL_SIZE)); } } @@ -712,6 +723,11 @@ public String getClientContextName() { return clientContextName; } + /** + * Set the client context name + * + * @param clientContextName The name of client context + */ public ClientConfig setClientContextName(String clientContextName) { this.clientContextName = clientContextName; return this; @@ -721,6 +737,11 @@ public long getAsyncCheckMetadataInterval() { return asyncCheckMetadataInterval; } + /** + * Set the interval on which client checks for metadata change on servers + * + * @param asyncCheckMetadataInterval The metadata change interval + */ public ClientConfig setAsyncCheckMetadataInterval(long asyncCheckMetadataInterval) { this.asyncCheckMetadataInterval = asyncCheckMetadataInterval; return this; @@ -730,8 +751,28 @@ public int getClientRegistryRefreshInterval() { return this.clientRegistryRefreshInterval; } + /** + * Set the interval on which client refreshes its corresponding entry of the + * client registry on the servers + * + * @param clientRegistryRefreshInterval The refresh interval in seconds + */ public ClientConfig setClientRegistryRefreshInterval(int clientRegistryRefrshInterval) { this.clientRegistryRefreshInterval = clientRegistryRefrshInterval; return this; } + + public int getAsyncJobThreadPoolSize() { + return asyncJobThreadPoolSize; + } + + /** + * Set the # of threads for the async. job thread pool + * + * @param asyncJobThreadPoolSize The max # of threads in the async job + */ + public ClientConfig setAsyncJobThreadPoolSize(int asyncJobThreadPoolSize) { + this.asyncJobThreadPoolSize = asyncJobThreadPoolSize; + return this; + } } diff --git a/src/java/voldemort/client/ClientInfo.java b/src/java/voldemort/client/ClientInfo.java index b9e92ff225..d87f98b0a8 100644 --- a/src/java/voldemort/client/ClientInfo.java +++ b/src/java/voldemort/client/ClientInfo.java @@ -1,3 +1,19 @@ +/* + * Copyright 2008-2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.client; import java.io.File; @@ -8,6 +24,11 @@ import org.apache.log4j.Logger; +/** + * A collection of voldemort client side information what will be populated into + * the voldemort cluster when a client is connected to a voldemort cluster + * + */ public class ClientInfo implements Serializable { /** diff --git a/src/java/voldemort/client/DefaultStoreClient.java b/src/java/voldemort/client/DefaultStoreClient.java index ccd8749890..3d507e71f7 100644 --- a/src/java/voldemort/client/DefaultStoreClient.java +++ b/src/java/voldemort/client/DefaultStoreClient.java @@ -21,9 +21,10 @@ import java.util.GregorianCalendar; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.UUID; +import java.util.Map.Entry; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -69,9 +70,6 @@ @JmxManaged(description = "A voldemort client") public class DefaultStoreClient implements StoreClient { - private static final int ASYNC_THREADS_COUNT = 2; - private static final boolean ALLOW_INTERRUPT_ASYNC = true; - private final Logger logger = Logger.getLogger(DefaultStoreClient.class); private final StoreClientFactory storeFactory; @@ -114,9 +112,9 @@ public DefaultStoreClient(String storeName, this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo); this.config = config; this.sysRepository = new SystemStoreRepository(); - this.scheduler = new SchedulerService(ASYNC_THREADS_COUNT, + this.scheduler = new SchedulerService(config.getAsyncJobThreadPoolSize(), SystemTime.INSTANCE, - ALLOW_INTERRUPT_ASYNC); + true); // Registering self to be able to bootstrap client dynamically via JMX JmxUtils.registerMbean(this, JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), @@ -152,15 +150,15 @@ private void registerClient(String jobId, int interval) { scheduler.schedule(jobId + refresher.getClass().getName(), refresher, cal.getTime(), - interval * 1000); - logger.info("Client registry refresher thread started, refresh frequency: " + TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS)); + logger.info("Client registry refresher thread started, refresh interval: " + interval + " seconds"); } catch(Exception e) { logger.warn("Unable to register with the cluster due to the following error:", e); } } else { logger.warn(SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name() - + "not found. Unable to registry with voldemort cluster."); + + " not found. Unable to registry with voldemort cluster."); } } diff --git a/src/java/voldemort/client/StoreClientFactory.java b/src/java/voldemort/client/StoreClientFactory.java index 5cff508ffa..71ebbef7dd 100644 --- a/src/java/voldemort/client/StoreClientFactory.java +++ b/src/java/voldemort/client/StoreClientFactory.java @@ -87,13 +87,6 @@ Store getRawStore(String storeName, UUID clientId); /** -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> Adding System store functionality -======= ->>>>>>> leigao/client-registry * Get the underlying store, not the public StoreClient interface * * @param storeName The name of the store @@ -109,14 +102,6 @@ Store getRawStore(String storeName, String clusterXmlString); /** -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> add clientId for voldemort client -======= ->>>>>>> Adding System store functionality -======= ->>>>>>> leigao/client-registry * Close the store client */ public void close(); diff --git a/src/java/voldemort/client/scheduler/ClientRegistryRefresher.java b/src/java/voldemort/client/scheduler/ClientRegistryRefresher.java index 2b57926b49..ed49a600a9 100644 --- a/src/java/voldemort/client/scheduler/ClientRegistryRefresher.java +++ b/src/java/voldemort/client/scheduler/ClientRegistryRefresher.java @@ -1,12 +1,34 @@ +/* + * Copyright 2008-2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.client.scheduler; import org.apache.log4j.Logger; import voldemort.client.ClientInfo; import voldemort.client.SystemStore; +import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.Version; import voldemort.versioning.Versioned; +/** + * An async. job that keeps client registry refreshed while the client is + * connected to the cluster + * + */ public class ClientRegistryRefresher implements Runnable { private final Logger logger = Logger.getLogger(this.getClass()); @@ -15,6 +37,7 @@ public class ClientRegistryRefresher implements Runnable { private ClientInfo clientInfo; private final String clientId; private Version lastVersion; + private boolean hadConflict; public ClientRegistryRefresher(SystemStore clientRegistry, String clientId, @@ -24,10 +47,20 @@ public ClientRegistryRefresher(SystemStore clientRegistry, this.clientInfo = clientInfo; this.clientId = clientId; this.lastVersion = version; + this.hadConflict = false; logger.info("Initial version obtained from client registry: " + version); } public void run() { + + if(hadConflict) { + // if we previously had a conflict during update, we will try to get + // a newer version before update this time. This case shall not + // happen under regular circumstances. But it is just avoid update + // keeping failing when strange situations occur. + lastVersion = clientRegistry.getSysStore(clientId).getVersion(); + hadConflict = false; + } clientInfo.setUpdateTime(System.currentTimeMillis()); logger.info("updating client registry with the following info for client: " + clientId + "\n" + clientInfo); @@ -35,8 +68,15 @@ public void run() { lastVersion = clientRegistry.putSysStore(clientId, new Versioned(clientInfo, lastVersion)); + } catch(ObsoleteVersionException e) { + Versioned existingValue = clientRegistry.getSysStore(clientId); + logger.warn("Multiple clients are updating the same client registry entry"); + logger.warn(" current value: " + clientInfo + " " + lastVersion); + logger.warn(" existing value: " + existingValue.getValue() + " " + + existingValue.getVersion()); + hadConflict = true; } catch(Exception e) { - logger.warn("encounted the following error while trying to update client registry: " + logger.warn("encountered the following error while trying to update client registry: " + e); } } diff --git a/src/java/voldemort/store/system/SystemStoreConstants.java b/src/java/voldemort/store/system/SystemStoreConstants.java index c818ed51fd..3bbb2562d3 100644 --- a/src/java/voldemort/store/system/SystemStoreConstants.java +++ b/src/java/voldemort/store/system/SystemStoreConstants.java @@ -1,3 +1,19 @@ +/* + * Copyright 2008-2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.store.system; import java.io.StringReader; @@ -8,7 +24,9 @@ import voldemort.xml.StoreDefinitionsMapper; /** - * The various system stores + * A file that defines all constants for system stores, including the store + * definitions + * */ public class SystemStoreConstants { @@ -39,10 +57,6 @@ public static enum SystemStoreName { + " utf8" + " " + " " - // + - // " avro-specific" - // + - // " java=voldemort.client.ClientInfo" + " java-serialization" + " " + " 7" diff --git a/src/java/voldemort/utils/ManifestFileReader.java b/src/java/voldemort/utils/ManifestFileReader.java index a3287a3681..8ba26d7515 100644 --- a/src/java/voldemort/utils/ManifestFileReader.java +++ b/src/java/voldemort/utils/ManifestFileReader.java @@ -1,3 +1,19 @@ +/* + * Copyright 2008-2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package voldemort.utils; import java.io.FileInputStream; @@ -6,6 +22,10 @@ import org.apache.log4j.Logger; +/** + * A utility class that abstract out fields from manifest file + * + */ public class ManifestFileReader { protected static final Logger logger = Logger.getLogger(ManifestFileReader.class); diff --git a/test/unit/voldemort/client/DefaultSocketStoreClientTest.java b/test/unit/voldemort/client/DefaultSocketStoreClientTest.java index 6d2bc94d2e..d5cce3cac7 100644 --- a/test/unit/voldemort/client/DefaultSocketStoreClientTest.java +++ b/test/unit/voldemort/client/DefaultSocketStoreClientTest.java @@ -101,19 +101,4 @@ public void test() { assertNull("After a successful delete(k), get(k) should return null.", client.get("k")); } - @Test - public void testClientRegistryHappyPath() { - ClientConfig clientConfig = new ClientConfig().setMaxThreads(4) - .setMaxTotalConnections(4) - .setMaxConnectionsPerNode(4) - .setBootstrapUrls(socketUrl) - .setClientContextName("testClientRegistryHappyPath"); - SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); - this.client = socketFactory.getStoreClient(testStoreName); - client.put("k", "v"); - adminClient.fetchEntries(0, testStoreName, null, null, false); - adminClient.fetchEntries(1, testStoreName, null, null, false); - // TODO: verify that the values in registry are correct. - } - }