Skip to content

Commit

Permalink
code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Lei Gao committed Jul 10, 2012
1 parent 20b47b7 commit 05f23ec
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 58 deletions.
20 changes: 10 additions & 10 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -998,9 +998,9 @@ private static void executeFetchEntries(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
// add system store to the map so they can be fetched when specified
// explicitly
storeDefinitionMap.putAll(getSystemStoreDef());
// add system stores to the map so they can be fetched when
// specified explicitly
storeDefinitionMap.putAll(getSystemStoreDefs());
}

// Pick up all the partitions
Expand Down Expand Up @@ -1045,13 +1045,13 @@ private static void executeFetchEntries(Integer nodeId,
}
}

private static Map<String, StoreDefinition> getSystemStoreDef() {
Map<String, StoreDefinition> sysStoreDef = Maps.newHashMap();
private static Map<String, StoreDefinition> getSystemStoreDefs() {
Map<String, StoreDefinition> sysStoreDefMap = Maps.newHashMap();
List<StoreDefinition> storesDefs = SystemStoreConstants.getAllSystemStoreDefs();
for(StoreDefinition def: storesDefs) {
sysStoreDef.put(def.getName(), def);
sysStoreDefMap.put(def.getName(), def);
}
return sysStoreDef;
return sysStoreDefMap;
}

private static void executeUpdateEntries(Integer nodeId,
Expand Down Expand Up @@ -1251,9 +1251,9 @@ private static void executeFetchKeys(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
// add system store to the map so they can be fetched when specified
// explicitly
storeDefinitionMap.putAll(getSystemStoreDef());
// add system stores to the map so they can be fetched when
// specified explicitly
storeDefinitionMap.putAll(getSystemStoreDefs());
}

// Pick up all the partitions
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -458,7 +458,7 @@ public String jmxId() {

/**
* Generate a unique client ID based on: 0. clientContext, if specified; 1.
* storeName 2. run path 3. client sequence
* storeName; 2. deployment path; 3. client sequence
*
* @param storeName the name of the store the client is created for
* @param contextName the name of the client context
Expand Down
45 changes: 43 additions & 2 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -84,6 +84,7 @@ public class ClientConfig {
private volatile long asyncCheckMetadataInterval = 5000;
/* 12 hr refresh internval, in seconds */
private volatile int clientRegistryRefreshInterval = 3600 * 12;
private volatile int asyncJobThreadPoolSize = 2;

public ClientConfig() {}

Expand Down Expand Up @@ -126,6 +127,8 @@ public ClientConfig() {}
public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries";
public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context";
public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval";
public static final String CLIENT_REGISTRY_REFRESH_INTERVAL = "client_registry_refresh_interval";
public static final String ASYNC_JOB_THREAD_POOL_SIZE = "async_job_thread_pool_size";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -286,11 +289,19 @@ private void setProperties(Properties properties) {
this.setMaxBootstrapRetries(props.getInt(MAX_BOOTSTRAP_RETRIES));

if(props.containsKey(CLIENT_CONTEXT_NAME)) {
this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null));
this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME));
}

if(props.containsKey(ASYNC_CHECK_METADATA_INTERVAL)) {
this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL, 5000));
this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL));
}

if(props.containsKey(CLIENT_REGISTRY_REFRESH_INTERVAL)) {
this.setClientRegistryRefreshInterval(props.getInt(CLIENT_REGISTRY_REFRESH_INTERVAL));
}

if(props.containsKey(ASYNC_JOB_THREAD_POOL_SIZE)) {
this.setClientRegistryRefreshInterval(props.getInt(ASYNC_JOB_THREAD_POOL_SIZE));
}
}

Expand Down Expand Up @@ -712,6 +723,11 @@ public String getClientContextName() {
return clientContextName;
}

/**
* Set the client context name
*
* @param clientContextName The name of client context
*/
public ClientConfig setClientContextName(String clientContextName) {
this.clientContextName = clientContextName;
return this;
Expand All @@ -721,6 +737,11 @@ public long getAsyncCheckMetadataInterval() {
return asyncCheckMetadataInterval;
}

/**
* Set the interval on which client checks for metadata change on servers
*
* @param asyncCheckMetadataInterval The metadata change interval
*/
public ClientConfig setAsyncCheckMetadataInterval(long asyncCheckMetadataInterval) {
this.asyncCheckMetadataInterval = asyncCheckMetadataInterval;
return this;
Expand All @@ -730,8 +751,28 @@ public int getClientRegistryRefreshInterval() {
return this.clientRegistryRefreshInterval;
}

/**
* Set the interval on which client refreshes its corresponding entry of the
* client registry on the servers
*
* @param clientRegistryRefreshInterval The refresh interval in seconds
*/
public ClientConfig setClientRegistryRefreshInterval(int clientRegistryRefrshInterval) {
this.clientRegistryRefreshInterval = clientRegistryRefrshInterval;
return this;
}

public int getAsyncJobThreadPoolSize() {
return asyncJobThreadPoolSize;
}

/**
* Set the # of threads for the async. job thread pool
*
* @param asyncJobThreadPoolSize The max # of threads in the async job
*/
public ClientConfig setAsyncJobThreadPoolSize(int asyncJobThreadPoolSize) {
this.asyncJobThreadPoolSize = asyncJobThreadPoolSize;
return this;
}
}
21 changes: 21 additions & 0 deletions src/java/voldemort/client/ClientInfo.java
@@ -1,3 +1,19 @@
/*
* Copyright 2008-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.client;

import java.io.File;
Expand All @@ -8,6 +24,11 @@

import org.apache.log4j.Logger;

/**
* A collection of voldemort client side information what will be populated into
* the voldemort cluster when a client is connected to a voldemort cluster
*
*/
public class ClientInfo implements Serializable {

/**
Expand Down
16 changes: 7 additions & 9 deletions src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -21,9 +21,10 @@
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -69,9 +70,6 @@
@JmxManaged(description = "A voldemort client")
public class DefaultStoreClient<K, V> implements StoreClient<K, V> {

private static final int ASYNC_THREADS_COUNT = 2;
private static final boolean ALLOW_INTERRUPT_ASYNC = true;

private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final StoreClientFactory storeFactory;

Expand Down Expand Up @@ -114,9 +112,9 @@ public DefaultStoreClient(String storeName,
this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo);
this.config = config;
this.sysRepository = new SystemStoreRepository();
this.scheduler = new SchedulerService(ASYNC_THREADS_COUNT,
this.scheduler = new SchedulerService(config.getAsyncJobThreadPoolSize(),
SystemTime.INSTANCE,
ALLOW_INTERRUPT_ASYNC);
true);
// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
Expand Down Expand Up @@ -152,15 +150,15 @@ private void registerClient(String jobId, int interval) {
scheduler.schedule(jobId + refresher.getClass().getName(),
refresher,
cal.getTime(),
interval * 1000);
logger.info("Client registry refresher thread started, refresh frequency: "
TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS));
logger.info("Client registry refresher thread started, refresh interval: "
+ interval + " seconds");
} catch(Exception e) {
logger.warn("Unable to register with the cluster due to the following error:", e);
}
} else {
logger.warn(SystemStoreConstants.SystemStoreName.voldsys$_client_registry.name()
+ "not found. Unable to registry with voldemort cluster.");
+ " not found. Unable to registry with voldemort cluster.");
}
}

Expand Down
15 changes: 0 additions & 15 deletions src/java/voldemort/client/StoreClientFactory.java
Expand Up @@ -87,13 +87,6 @@ <K, V, T> Store<K, V, T> getRawStore(String storeName,
UUID clientId);

/**
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> Adding System store functionality
=======
>>>>>>> leigao/client-registry
* Get the underlying store, not the public StoreClient interface
*
* @param storeName The name of the store
Expand All @@ -109,14 +102,6 @@ <K, V, T> Store<K, V, T> getRawStore(String storeName,
String clusterXmlString);

/**
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> add clientId for voldemort client
=======
>>>>>>> Adding System store functionality
=======
>>>>>>> leigao/client-registry
* Close the store client
*/
public void close();
Expand Down
42 changes: 41 additions & 1 deletion src/java/voldemort/client/scheduler/ClientRegistryRefresher.java
@@ -1,12 +1,34 @@
/*
* Copyright 2008-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.client.scheduler;

import org.apache.log4j.Logger;

import voldemort.client.ClientInfo;
import voldemort.client.SystemStore;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/**
* An async. job that keeps client registry refreshed while the client is
* connected to the cluster
*
*/
public class ClientRegistryRefresher implements Runnable {

private final Logger logger = Logger.getLogger(this.getClass());
Expand All @@ -15,6 +37,7 @@ public class ClientRegistryRefresher implements Runnable {
private ClientInfo clientInfo;
private final String clientId;
private Version lastVersion;
private boolean hadConflict;

public ClientRegistryRefresher(SystemStore<String, ClientInfo> clientRegistry,
String clientId,
Expand All @@ -24,19 +47,36 @@ public ClientRegistryRefresher(SystemStore<String, ClientInfo> clientRegistry,
this.clientInfo = clientInfo;
this.clientId = clientId;
this.lastVersion = version;
this.hadConflict = false;
logger.info("Initial version obtained from client registry: " + version);
}

public void run() {

if(hadConflict) {
// if we previously had a conflict during update, we will try to get
// a newer version before update this time. This case shall not
// happen under regular circumstances. But it is just avoid update
// keeping failing when strange situations occur.
lastVersion = clientRegistry.getSysStore(clientId).getVersion();
hadConflict = false;
}
clientInfo.setUpdateTime(System.currentTimeMillis());
logger.info("updating client registry with the following info for client: " + clientId
+ "\n" + clientInfo);
try {
lastVersion = clientRegistry.putSysStore(clientId,
new Versioned<ClientInfo>(clientInfo,
lastVersion));
} catch(ObsoleteVersionException e) {
Versioned<ClientInfo> existingValue = clientRegistry.getSysStore(clientId);
logger.warn("Multiple clients are updating the same client registry entry");
logger.warn(" current value: " + clientInfo + " " + lastVersion);
logger.warn(" existing value: " + existingValue.getValue() + " "
+ existingValue.getVersion());
hadConflict = true;
} catch(Exception e) {
logger.warn("encounted the following error while trying to update client registry: "
logger.warn("encountered the following error while trying to update client registry: "
+ e);
}
}
Expand Down
24 changes: 19 additions & 5 deletions src/java/voldemort/store/system/SystemStoreConstants.java
@@ -1,3 +1,19 @@
/*
* Copyright 2008-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.store.system;

import java.io.StringReader;
Expand All @@ -8,7 +24,9 @@
import voldemort.xml.StoreDefinitionsMapper;

/**
* The various system stores
* A file that defines all constants for system stores, including the store
* definitions
*
*/
public class SystemStoreConstants {

Expand Down Expand Up @@ -39,10 +57,6 @@ public static enum SystemStoreName {
+ " <schema-info version=\"0\">utf8</schema-info>"
+ " </key-serializer>"
+ " <value-serializer>"
// +
// " <type>avro-specific</type>"
// +
// " <schema-info version=\"0\">java=voldemort.client.ClientInfo</schema-info>"
+ " <type>java-serialization</type>"
+ " </value-serializer>"
+ " <retention-days>7</retention-days>"
Expand Down

0 comments on commit 05f23ec

Please sign in to comment.