Skip to content

Commit

Permalink
AdminClientPool, to easily pool AdminClient
Browse files Browse the repository at this point in the history
AdminClientPool is added for managing the pools of AdminClient.

AdminClient unlike StoreClient can't be used across Cluster
modifications. So previously AdminClient needs to be created
every time. this was costly as the connections need to be re-established
every time.

AdminClientPool solves this problem by discarding AdminClient if cluster
is modified.

AdminClientPool still does not solve the problem of failing operation
during cluster modification. But it will work correctly after the
cluster is modified.
  • Loading branch information
arunthirupathi committed Jun 28, 2016
1 parent 807ca49 commit 79a0aa3
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 2 deletions.
142 changes: 142 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClientPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package voldemort.client.protocol.admin;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import voldemort.client.ClientConfig;


/**
* AdminClientPool utility class for caching AdminClient.
* Checkout is non-blocking and if AdminClient does not exist in the cache
* a new AdminClient will be created and returned.
* CheckIn is non-blocking as well, when max number of clients are already
* cached, further CheckIn of AdminClient will be closed and discarded.
*/
public class AdminClientPool {
private final AdminClientConfig adminConfig;
private final ClientConfig clientConfig;
private final BlockingQueue<AdminClient> clientCache;
private final AtomicBoolean isClosed;

/**
* Create AdminClientPool
* @param maxClients number of clients to cache, due to concurrent checkout,
* more number of AdminClient may be created. But on CheckIn, additional
* ones are discarded.
* @param adminConfig AdminClientConfig used for creation of AdminClient
* @param clientConfig ClientConfig used for creation of AdminClient.
*/

public AdminClientPool(int maxClients, AdminClientConfig adminConfig, ClientConfig clientConfig) {
if (maxClients <= 0) {
throw new IllegalArgumentException("maxClients should be positive");
}

if (adminConfig == null) {
throw new IllegalArgumentException("AdminClientConfig is null");
}

if (clientConfig == null) {
throw new IllegalArgumentException("ClientConfig is null");
}

String[] bootstrapUrls = clientConfig.getBootstrapUrls();
if (bootstrapUrls == null || bootstrapUrls.length == 0) {
throw new IllegalArgumentException("ClientConfig has no bootstrap Urls specified");
}

this.adminConfig = adminConfig;
this.clientConfig = clientConfig;
this.clientCache = new ArrayBlockingQueue<AdminClient>(maxClients);
this.isClosed = new AtomicBoolean(false);
}

private AdminClient createAdminClient() {
return new AdminClient(adminConfig, clientConfig);
}

/**
* number of AdminClient in the cache
* @return number of AdminClient in the cache.
*/
public int size() {
if (isClosed.get()) {
throw new IllegalStateException("Pool is closing");
}

return clientCache.size();
}

/**
* get an AdminClient from the cache if exists, if not create new one
* and return it. This method is non-blocking.
*
* All AdminClient returned from checkout, once after the completion of
* usage must be returned to the pool by calling checkin. If not,
* there will be leak of AdminClients (connections, threads and file handles).
*
* @return AdminClient
*/
public AdminClient checkout() {
if (isClosed.get()) {
throw new IllegalStateException("Pool is closing");
}

AdminClient client;

// Try to get one from the Cache.
while ((client = clientCache.poll()) != null) {
if (!client.isClusterModified()) {
return client;
} else {
// Cluster is Modified, after the AdminClient is created. Close it
client.close();
}
}

// None is available, create new one.
return createAdminClient();
}

/**
* submit the adminClient after usage is completed.
* Behavior is undefined, if checkin is called with objects not retrieved
* from checkout.
*
* @param client AdminClient retrieved from checkout
*/
public void checkin(AdminClient client) {
if (isClosed.get()) {
throw new IllegalStateException("Pool is closing");
}

if (client == null) {
throw new IllegalArgumentException("client is null");
}

boolean isCheckedIn = clientCache.offer(client);

if (!isCheckedIn) {
// Cache is already full, close this AdminClient
client.close();
}
}

/**
* close the AdminPool, if no long required.
* After closed, all public methods will throw IllegalStateException
*/
public void close() {
boolean isPreviouslyClosed = isClosed.getAndSet(true);
if (isPreviouslyClosed) {
return;
}

AdminClient client;
while ((client = clientCache.poll()) != null) {
client.close();
}
}
}
108 changes: 108 additions & 0 deletions test/unit/voldemort/client/AdminClientPoolTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package voldemort.client;

import java.io.IOException;
import java.util.Properties;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import voldemort.ServerTestUtils;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.admin.AdminClientPool;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.VoldemortServer;

public class AdminClientPoolTest {
private final static String storesXmlfile = "test/common/voldemort/config/stores.xml";

private final static int MAX_CLIENTS = 5;
private final static int NUM_SERVERS = 1;
private VoldemortServer[] servers;
private Cluster cluster;
private AdminClientPool pool;
private final ClientConfig clientConfig;

public AdminClientPoolTest() throws IOException {
servers = new VoldemortServer[NUM_SERVERS];
int partitionMap[][] = { { 0, 1, 2, 3 } };
cluster = ServerTestUtils.startVoldemortCluster(servers, partitionMap, new Properties(), storesXmlfile);

Node node = cluster.getNodeById(0);
String bootstrapUrl = "tcp://" + node.getHost() + ":" + node.getSocketPort();
clientConfig = new ClientConfig().setBootstrapUrls(bootstrapUrl);
pool = new AdminClientPool(MAX_CLIENTS, new AdminClientConfig(), clientConfig);
}

@After
public void tearDown() throws IOException {
pool.close();
for (VoldemortServer server : servers) {
ServerTestUtils.stopVoldemortServer(server);
}
}

@Test
public void testCreate() {
AdminClient client1 = pool.checkout();
Assert.assertEquals("Nothing should exist in cache", 0, pool.size());

pool.checkin(client1);

Assert.assertEquals("Size should be 1", 1, pool.size());
AdminClient client2 = pool.checkout();

Assert.assertSame("CheckOut after checkin is not returning the same", client1, client2);
Assert.assertEquals("Size should be 0 again", 0, pool.size());
}

@Test
public void testMulitCreate() {
final int SIZE = MAX_CLIENTS + 2;
AdminClient[] clients = new AdminClient[SIZE];

for (int i = 0; i < SIZE; i++) {
clients[i] = pool.checkout();
Assert.assertEquals("Nothing should exist in cache", 0, pool.size());
}

for (int i = 0; i < MAX_CLIENTS; i++) {
pool.checkin(clients[i]);
Assert.assertEquals("Different size of pool", i + 1, pool.size());
}

for (int i = MAX_CLIENTS; i < SIZE; i++) {
pool.checkin(clients[i]);
Assert.assertEquals("No more additional elements", MAX_CLIENTS, pool.size());
}
}

@Test
public void testClose() {
AdminClient client = pool.checkout();

pool.close();
try {
pool.checkout();
Assert.fail(" checkout should have failed");
} catch (IllegalStateException ex) {

}

try {
pool.checkin(client);
Assert.fail(" checkin should have failed");
} catch (IllegalStateException ex) {

}

try {
pool.size();
Assert.fail(" size should have failed ");
} catch (IllegalStateException ex) {

}
}
}
3 changes: 1 addition & 2 deletions test/unit/voldemort/client/VerifyOrAddStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import java.util.List;
import java.util.Properties;

import junit.framework.Assert;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down

0 comments on commit 79a0aa3

Please sign in to comment.