Skip to content

Commit

Permalink
add SchedulerService in voldemort client
Browse files Browse the repository at this point in the history
  • Loading branch information
Lei Gao committed Jul 3, 2012
1 parent 9a582b4 commit 20b47b7
Show file tree
Hide file tree
Showing 30 changed files with 238 additions and 281 deletions.
10 changes: 6 additions & 4 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -79,9 +79,11 @@ public class ClientConfig {

private volatile int maxBootstrapRetries = 2;
private volatile String clientContextName = "default";

/* 5 second check interval, in ms */
private volatile long asyncCheckMetadataInterval = 5000;
/* 12 hr refresh internval */
private volatile long clientRegistryRefreshInterval = 3600 * 1000 * 12;
/* 12 hr refresh internval, in seconds */
private volatile int clientRegistryRefreshInterval = 3600 * 12;

public ClientConfig() {}

Expand Down Expand Up @@ -724,11 +726,11 @@ public ClientConfig setAsyncCheckMetadataInterval(long asyncCheckMetadataInterva
return this;
}

public long getClientRegistryRefreshInterval() {
public int getClientRegistryRefreshInterval() {
return this.clientRegistryRefreshInterval;
}

public ClientConfig setClientRegistryRefreshInterval(long clientRegistryRefrshInterval) {
public ClientConfig setClientRegistryRefreshInterval(int clientRegistryRefrshInterval) {
this.clientRegistryRefreshInterval = clientRegistryRefrshInterval;
return this;
}
Expand Down
59 changes: 48 additions & 11 deletions src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -16,10 +16,13 @@

package voldemort.client;

import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;

import org.apache.log4j.Logger;
Expand All @@ -29,7 +32,10 @@
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.client.scheduler.AsyncMetadataVersionManager;
import voldemort.client.scheduler.ClientRegistryRefresher;
import voldemort.cluster.Node;
import voldemort.common.service.SchedulerService;
import voldemort.routing.RoutingStrategy;
import voldemort.serialization.Serializer;
import voldemort.store.InvalidMetadataException;
Expand All @@ -39,6 +45,7 @@
import voldemort.store.system.SystemStoreConstants;
import voldemort.utils.JmxUtils;
import voldemort.utils.ManifestFileReader;
import voldemort.utils.SystemTime;
import voldemort.utils.Utils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.InconsistentDataException;
Expand All @@ -62,6 +69,9 @@
@JmxManaged(description = "A voldemort client")
public class DefaultStoreClient<K, V> implements StoreClient<K, V> {

private static final int ASYNC_THREADS_COUNT = 2;
private static final boolean ALLOW_INTERRUPT_ASYNC = true;

private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final StoreClientFactory storeFactory;

Expand All @@ -72,9 +82,10 @@ public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final SystemStoreRepository sysRepository;
private final UUID clientId;
private volatile Store<K, V, Object> store;
private AsyncMetadataVersionManager asyncCheckMetadata = null;
private final SchedulerService scheduler;
private ClientInfo clientInfo;
private String clusterXml;
private AsyncMetadataVersionManager asyncCheckMetadata = null;

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
Expand Down Expand Up @@ -103,7 +114,9 @@ public DefaultStoreClient(String storeName,
this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo);
this.config = config;
this.sysRepository = new SystemStoreRepository();

this.scheduler = new SchedulerService(ASYNC_THREADS_COUNT,
SystemTime.INSTANCE,
ALLOW_INTERRUPT_ASYNC);
// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
Expand All @@ -117,18 +130,31 @@ public DefaultStoreClient(String storeName,

// Initialize the background thread for checking metadata version
if(config != null) {
asyncCheckMetadata = createMetadataChecker();
asyncCheckMetadata = scheduleMetadataChecker(clientId.toString(),
config.getAsyncCheckMetadataInterval());
}

registerClient();
registerClient(clientId.toString(), config.getClientRegistryRefreshInterval());
logger.info("Voldemort client created: " + clientId.toString() + "\n" + clientInfo);
}

private void registerClient() {
private void registerClient(String jobId, int interval) {
SystemStore<String, ClientInfo> clientRegistry = this.sysRepository.getClientRegistryStore();
if(null != clientRegistry) {
try {
clientRegistry.putSysStore(clientId.toString(), clientInfo);
Version version = clientRegistry.putSysStore(clientId.toString(), clientInfo);
ClientRegistryRefresher refresher = new ClientRegistryRefresher(clientRegistry,
clientId.toString(),
clientInfo,
version);
GregorianCalendar cal = new GregorianCalendar();
cal.add(Calendar.SECOND, interval);
scheduler.schedule(jobId + refresher.getClass().getName(),
refresher,
cal.getTime(),
interval * 1000);
logger.info("Client registry refresher thread started, refresh frequency: "
+ interval + " seconds");
} catch(Exception e) {
logger.warn("Unable to register with the cluster due to the following error:", e);
}
Expand All @@ -138,7 +164,7 @@ private void registerClient() {
}
}

private AsyncMetadataVersionManager createMetadataChecker() {
private AsyncMetadataVersionManager scheduleMetadataChecker(String jobId, long interval) {
AsyncMetadataVersionManager asyncCheckMetadata = null;
SystemStore<String, Long> versionStore = this.sysRepository.getVersionStore();
if(versionStore == null)
Expand All @@ -155,10 +181,17 @@ public Void call() throws Exception {
};

asyncCheckMetadata = new AsyncMetadataVersionManager(this.sysRepository,
config,
rebootstrapCallback);
logger.info("Metadata version check thread started. Frequency = Every "
+ config.getAsyncCheckMetadataInterval() + " ms");

// schedule the job to run every 'checkInterval' period, starting
// now
scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(),
asyncCheckMetadata,
new Date(),
interval);
logger.info("Metadata version check thread started. Frequency = Every " + interval
+ " ms");

}
return asyncCheckMetadata;
}
Expand Down Expand Up @@ -193,6 +226,10 @@ public void bootStrap() {
}
}

public void close() {
scheduler.stopInner();
}

public boolean delete(K key) {
Versioned<V> versioned = get(key);
if(versioned == null)
Expand Down
11 changes: 9 additions & 2 deletions src/java/voldemort/client/SystemStore.java
Expand Up @@ -10,6 +10,7 @@
import voldemort.store.system.SystemStoreConstants;
import voldemort.versioning.InconsistentDataException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

public class SystemStore<K, V> {
Expand Down Expand Up @@ -43,7 +44,8 @@ public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID, S
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml);
}

public void putSysStore(K key, V value) {
public Version putSysStore(K key, V value) {
Version version = null;
try {
logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = getSysStore(key);
Expand All @@ -52,20 +54,25 @@ public void putSysStore(K key, V value) {
else
versioned.setObject(value);
this.sysStore.put(key, versioned, null);
version = versioned.getVersion();
} catch(Exception e) {
logger.info("Exception caught during putSysStore:");
e.printStackTrace();
}
return version;
}

public void putSysStore(K key, Versioned<V> value) {
public Version putSysStore(K key, Versioned<V> value) {
Version version = null;
try {
logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName);
this.sysStore.put(key, value, null);
version = value.getVersion();
} catch(Exception e) {
logger.info("Exception caught during putSysStore:");
e.printStackTrace();
}
return version;
}

public Versioned<V> getSysStore(K key) {
Expand Down
14 changes: 0 additions & 14 deletions src/java/voldemort/client/VoldemortOperation.java

This file was deleted.

@@ -1,4 +1,4 @@
package voldemort.client;
package voldemort.client.scheduler;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Random;
Expand All @@ -7,6 +7,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.client.SystemStoreRepository;
import voldemort.versioning.Versioned;

/*
Expand All @@ -28,18 +29,14 @@ public class AsyncMetadataVersionManager implements Runnable {
private final Logger logger = Logger.getLogger(this.getClass());
private Versioned<Long> currentStoreVersion;
private Versioned<Long> currentClusterVersion;
private volatile boolean isRunning;
private final Callable<Void> storeClientThunk;
private long asyncMetadataCheckInterval;
private final SystemStoreRepository sysRepository;
private final ClientConfig clientConfig;

// Random delta generator
private final int DELTA_MAX = 2000;
private final Random randomGenerator = new Random(System.currentTimeMillis());

public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
ClientConfig config,
Callable<Void> storeClientThunk) {
this.sysRepository = sysRepository;

Expand All @@ -64,9 +61,6 @@ public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
if(currentClusterVersion == null)
currentClusterVersion = new Versioned<Long>((long) 0);

// Initialize and start the background check thread
isRunning = true;

Thread checkVersionThread = new Thread(this, "AsyncVersionCheckThread");
checkVersionThread.setDaemon(true);
checkVersionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
Expand All @@ -78,14 +72,6 @@ public void uncaughtException(Thread t, Throwable e) {
});

this.storeClientThunk = storeClientThunk;
this.asyncMetadataCheckInterval = config.getAsyncCheckMetadataInterval();
this.clientConfig = config;
checkVersionThread.start();

}

public void destroy() {
isRunning = false;
}

/*
Expand Down Expand Up @@ -120,23 +106,12 @@ public Versioned<Long> fetchNewVersion(String versionKey, Versioned<Long> curVer
}

public void run() {
Versioned<Long> newStoresVersion, newClusterVersion;
while(!Thread.currentThread().isInterrupted() && isRunning) {
newStoresVersion = newClusterVersion = null;

try {
Thread.sleep(asyncMetadataCheckInterval);
} catch(InterruptedException e) {
break;
}

newStoresVersion = fetchNewVersion(STORES_VERSION_KEY, currentStoreVersion);
newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY, currentClusterVersion);
Versioned<Long> newStoresVersion = fetchNewVersion(STORES_VERSION_KEY, currentStoreVersion);
Versioned<Long> newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
currentClusterVersion);

// If nothing has been updated, continue
if(newStoresVersion == null && newClusterVersion == null) {
continue;
}
// If nothing has been updated, continue
if((newStoresVersion != null) || (newClusterVersion != null)) {

logger.info("Metadata version mismatch detected.");

Expand All @@ -147,7 +122,7 @@ public void run() {
logger.info("Sleeping for delta : " + delta + " (ms) before re-bootstrapping.");
Thread.sleep(delta);
} catch(InterruptedException e) {
break;
// do nothing, continue.
}

try {
Expand All @@ -164,8 +139,8 @@ public void run() {
e.printStackTrace();
logger.info(e.getMessage());
}

}

}

public Versioned<Long> getStoreMetadataVersion() {
Expand Down
43 changes: 43 additions & 0 deletions src/java/voldemort/client/scheduler/ClientRegistryRefresher.java
@@ -0,0 +1,43 @@
package voldemort.client.scheduler;

import org.apache.log4j.Logger;

import voldemort.client.ClientInfo;
import voldemort.client.SystemStore;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

public class ClientRegistryRefresher implements Runnable {

private final Logger logger = Logger.getLogger(this.getClass());

private final SystemStore<String, ClientInfo> clientRegistry;
private ClientInfo clientInfo;
private final String clientId;
private Version lastVersion;

public ClientRegistryRefresher(SystemStore<String, ClientInfo> clientRegistry,
String clientId,
ClientInfo clientInfo,
Version version) {
this.clientRegistry = clientRegistry;
this.clientInfo = clientInfo;
this.clientId = clientId;
this.lastVersion = version;
logger.info("Initial version obtained from client registry: " + version);
}

public void run() {
clientInfo.setUpdateTime(System.currentTimeMillis());
logger.info("updating client registry with the following info for client: " + clientId
+ "\n" + clientInfo);
try {
lastVersion = clientRegistry.putSysStore(clientId,
new Versioned<ClientInfo>(clientInfo,
lastVersion));
} catch(Exception e) {
logger.warn("encounted the following error while trying to update client registry: "
+ e);
}
}
}
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package voldemort.server;
package voldemort.common.service;

import java.util.concurrent.atomic.AtomicBoolean;

Expand Down

0 comments on commit 20b47b7

Please sign in to comment.