Skip to content

Commit

Permalink
add close method for shutting down client's SchedulerService in Abstr…
Browse files Browse the repository at this point in the history
…actStoreClientFactory
  • Loading branch information
Lei Gao committed Jul 25, 2012
1 parent fe46419 commit fd1f9fb
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
24 changes: 23 additions & 1 deletion src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -20,6 +20,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -33,6 +35,7 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.common.service.SchedulerService;
import voldemort.serialization.ByteArraySerializer;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.SerializationException;
Expand All @@ -58,6 +61,7 @@
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.utils.ByteArray;
import voldemort.utils.JmxUtils;
import voldemort.utils.SystemTime;
import voldemort.versioning.ChainedResolver;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.TimeBasedInconsistencyResolver;
Expand Down Expand Up @@ -101,6 +105,7 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory {
private final int clientZoneId;
private final String clientContextName;
private final AtomicInteger sequencer;
private final HashSet<SchedulerService> clientAsyncServiceRepo;

public AbstractStoreClientFactory(ClientConfig config) {
this.config = config;
Expand All @@ -122,6 +127,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
config.getTimeoutConfig());

this.sequencer = new AtomicInteger(0);
this.clientAsyncServiceRepo = new HashSet<SchedulerService>();

if(this.isJmxEnabled) {
JmxUtils.registerMbean(threadPool,
Expand Down Expand Up @@ -151,13 +157,19 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {

public <K, V> StoreClient<K, V> getStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
SchedulerService service = new SchedulerService(config.getAsyncJobThreadPoolSize(),
SystemTime.INSTANCE,
true);
clientAsyncServiceRepo.add(service);

return new DefaultStoreClient<K, V>(storeName,
resolver,
this,
3,
clientContextName,
sequencer.getAndIncrement(),
config);
config,
service);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -457,6 +469,16 @@ public void close() {

if(failureDetector != null)
failureDetector.destroy();

stopClientAsyncSchedulers();
}

private void stopClientAsyncSchedulers() {
Iterator<SchedulerService> it = clientAsyncServiceRepo.iterator();
while(it.hasNext()) {
it.next().stop();
}
clientAsyncServiceRepo.clear();
}

/* Give a unique id to avoid jmx clashes */
Expand Down
48 changes: 25 additions & 23 deletions src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -46,7 +46,6 @@
import voldemort.store.system.SystemStoreConstants;
import voldemort.utils.JmxUtils;
import voldemort.utils.ManifestFileReader;
import voldemort.utils.SystemTime;
import voldemort.utils.Utils;
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.InconsistentDataException;
Expand Down Expand Up @@ -89,7 +88,7 @@ public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts) {
this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null);
this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null, null);
}

public DefaultStoreClient(String storeName,
Expand All @@ -98,7 +97,8 @@ public DefaultStoreClient(String storeName,
int maxMetadataRefreshAttempts,
String clientContext,
int clientSequence,
ClientConfig config) {
ClientConfig config,
SchedulerService scheduler) {

this.storeName = Utils.notNull(storeName);
this.resolver = resolver;
Expand All @@ -112,9 +112,7 @@ public DefaultStoreClient(String storeName,
this.clientId = AbstractStoreClientFactory.generateClientId(clientInfo);
this.config = config;
this.sysRepository = new SystemStoreRepository();
this.scheduler = new SchedulerService(config.getAsyncJobThreadPoolSize(),
SystemTime.INSTANCE,
true);
this.scheduler = scheduler;
// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
Expand Down Expand Up @@ -147,12 +145,17 @@ private void registerClient(String jobId, int interval) {
version);
GregorianCalendar cal = new GregorianCalendar();
cal.add(Calendar.SECOND, interval);
scheduler.schedule(jobId + refresher.getClass().getName(),
refresher,
cal.getTime(),
TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS));
logger.info("Client registry refresher thread started, refresh interval: "
+ interval + " seconds");

if(scheduler != null) {
scheduler.schedule(jobId + refresher.getClass().getName(),
refresher,
cal.getTime(),
TimeUnit.MILLISECONDS.convert(interval, TimeUnit.SECONDS));
logger.info("Client registry refresher thread started, refresh interval: "
+ interval + " seconds");
} else {
logger.warn("Client registry won't run because scheduler service is not configured");
}
} catch(Exception e) {
logger.warn("Unable to register with the cluster due to the following error:", e);
}
Expand Down Expand Up @@ -183,13 +186,16 @@ public Void call() throws Exception {

// schedule the job to run every 'checkInterval' period, starting
// now
scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(),
asyncCheckMetadata,
new Date(),
interval);
logger.info("Metadata version check thread started. Frequency = Every " + interval
+ " ms");

if(scheduler != null) {
scheduler.schedule(jobId + asyncCheckMetadata.getClass().getName(),
asyncCheckMetadata,
new Date(),
interval);
logger.info("Metadata version check thread started. Frequency = Every " + interval
+ " ms");
} else {
logger.warn("Metadata version check thread won't start because the scheduler service is not configured.");
}
}
return asyncCheckMetadata;
}
Expand Down Expand Up @@ -224,10 +230,6 @@ public void bootStrap() {
}
}

public void close() {
scheduler.stopInner();
}

public boolean delete(K key) {
Versioned<V> versioned = get(key);
if(versioned == null)
Expand Down
26 changes: 12 additions & 14 deletions test/unit/voldemort/client/ClientRegistryTest.java
Expand Up @@ -30,7 +30,7 @@

import com.google.common.collect.Lists;

@SuppressWarnings({ "unchecked" })
@SuppressWarnings( { "unchecked" })
public class ClientRegistryTest {

public static final String SERVER_LOCAL_URL = "tcp://localhost:";
Expand Down Expand Up @@ -143,7 +143,7 @@ public void testHappyPath() {
infoList = getClientRegistryContent(it);
assertTrue("Client registry not updated.",
infoList.get(0).getBootstrapTime() < infoList.get(0).getUpdateTime());
((DefaultStoreClient<String, String>) client1).close();
socketFactory.close();
}

@Test
Expand Down Expand Up @@ -227,8 +227,7 @@ public void testTwoClients() {
assertTrue("Client registry not updated.",
infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime());

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
socketFactory.close();
}

@Test
Expand Down Expand Up @@ -329,8 +328,7 @@ public void testTwoStores() {
assertTrue("Client registry not updated.",
infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime());

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
socketFactory.close();
}

@Test
Expand Down Expand Up @@ -462,8 +460,8 @@ public void testTwoFactories() {
assertTrue("Client registry not updated.",
infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime());

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
socketFactory1.close();
socketFactory2.close();
}

@Test
Expand Down Expand Up @@ -556,8 +554,8 @@ public void testOneServerFailre() {
assertTrue("Client registry not updated.",
infoList.get(1).getBootstrapTime() < infoList.get(1).getUpdateTime());

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
socketFactory1.close();
socketFactory2.close();
}

@Test
Expand Down Expand Up @@ -592,8 +590,6 @@ public void testRepeatRegistrationSameFactory() {
client1.put("k1", "v1");
client2.put("k2", "v2");

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
}

Iterator<Pair<ByteArray, Versioned<byte[]>>> it = adminClient.fetchEntries(1,
Expand All @@ -604,6 +600,8 @@ public void testRepeatRegistrationSameFactory() {
ArrayList<ClientInfo> infoList = getClientRegistryContent(it);
assertEquals("Incrrect # of entries created in client registry", 6, infoList.size());

socketFactory1.close();
socketFactory2.close();
}

@Test
Expand Down Expand Up @@ -709,8 +707,8 @@ public void testRepeatRegistrationDifferentFactories() {
client1LastBootstrapTime = infoList.get(0).getBootstrapTime();
client2LastBootstrapTime = infoList.get(0).getBootstrapTime();

((DefaultStoreClient<String, String>) client1).close();
((DefaultStoreClient<String, String>) client2).close();
socketFactory1.close();
socketFactory2.close();
}
}

Expand Down

0 comments on commit fd1f9fb

Please sign in to comment.