Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
refactorings for new CassandraHost and related configurator classes
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate authored and rantav committed Apr 2, 2010
1 parent 8690dbe commit 2687509
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
private final String url;
private final int port;

public CassandraClientFactory(CassandraClientPool pools, String url, int port,
public CassandraClientFactory(CassandraClientPool pools, CassandraHost cassandraHost,
CassandraClientMonitor clientMonitor) {
this.pool = pools;
this.url = url;
this.port = port;
timeout = getTimeout();
this.url = cassandraHost.getUrl();
this.port = cassandraHost.getPort();
timeout = getTimeout(cassandraHost);
this.clientMonitor = clientMonitor;
}

Expand All @@ -57,7 +57,7 @@ public CassandraClientFactory(String url, int port) {
this.pool = new CassandraClientPoolImpl(this.clientMonitor);
this.url = url;
this.port = port;
timeout = getTimeout();
timeout = getTimeout(null);
}

public CassandraClient create() throws TTransportException, TException, UnknownHostException {
Expand All @@ -84,22 +84,28 @@ private Cassandra.Client createThriftClient(String url, int port)
}

/**
* Gets an environment variable CASSANDRA_THRIFT_SOCKET_TIMEOUT value.
* If CassandraHost was not null we use {@link CassandraHost#getCassandraThriftSocketTimeout()}
* if it was greater than zero. Otherwise look for an environment
* variable name CASSANDRA_THRIFT_SOCKET_TIMEOUT value.
* If doesn't exist, returns 0.
* @param cassandraHost
*/
private int getTimeout() {
private int getTimeout(CassandraHost cassandraHost) {
int timeoutVar = 0;
if ( cassandraHost != null && cassandraHost.getCassandraThriftSocketTimeout() > 0 ) {
timeoutVar = cassandraHost.getCassandraThriftSocketTimeout();
} else {
String timeoutStr = System.getProperty(
SystemProperties.CASSANDRA_THRIFT_SOCKET_TIMEOUT.toString());
if (timeoutStr == null || timeoutStr.length() == 0) {
return 0;
} else {
if (timeoutStr != null && timeoutStr.length() > 0) {
try {
return Integer.valueOf(timeoutStr);
timeoutVar = Integer.valueOf(timeoutStr);
} catch (NumberFormatException e) {
log.error("Invalid value for CASSANDRA_THRIFT_SOCKET_TIMEOUT", e);
return 0;
}
}
}
return timeoutVar;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package me.prettyprint.cassandra.service;

import me.prettyprint.cassandra.service.CassandraClientPoolByHost.ExhaustedPolicy;
import java.util.Set;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.util.Set;

/**
* Defines the various JMX methods the CassandraClientMonitor exposes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@
*/
/*package*/ interface CassandraClientPoolByHost {

enum ExhaustedPolicy {
WHEN_EXHAUSTED_FAIL, WHEN_EXHAUSTED_GROW, WHEN_EXHAUSTED_BLOCK
}

public static final ExhaustedPolicy DEFAULT_EXHAUSTED_POLICY =
ExhaustedPolicy.WHEN_EXHAUSTED_BLOCK;

public static final int DEFAULT_MAX_ACTIVE = 50;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,26 @@
*/
private final Set<CassandraClient> liveClientsFromPool;

public CassandraClientPoolByHostImpl(String cassandraUrl, int cassandraPort, String name,
CassandraClientPool pools, CassandraClientMonitor clientMonitor) {
this(cassandraUrl, cassandraPort, name, pools, clientMonitor, DEFAULT_MAX_ACTIVE,
DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED,
DEFAULT_MAX_IDLE, DEFAULT_EXHAUSTED_POLICY);
}

public CassandraClientPoolByHostImpl(String cassandraUrl, int cassandraPort, String name,
CassandraClientPool pools, CassandraClientMonitor clientMonitor, int maxActive,
long maxWait, int maxIdle, ExhaustedPolicy exhaustedPolicy) {
this(cassandraUrl, cassandraPort, name, pools, maxActive, maxWait, maxIdle,
exhaustedPolicy, new CassandraClientFactory(pools, cassandraUrl, cassandraPort, clientMonitor));

}

public CassandraClientPoolByHostImpl(String cassandraUrl, int cassandraPort, String name,
CassandraClientPool pools, int maxActive,
long maxWait, int maxIdle, ExhaustedPolicy exhaustedPolicy,
CassandraClientFactory clientFactory) {
log.debug("Creating new connection pool for {}:{}", cassandraUrl, cassandraPort);
url = cassandraUrl;
port = cassandraPort;
this.name = name;
this.maxActive = maxActive;
this.maxIdle = maxIdle;
this.maxWaitTimeWhenExhausted = maxWait;
this.exhaustedPolicy = exhaustedPolicy;
this.clientFactory = clientFactory;
public CassandraClientPoolByHostImpl(CassandraHost cassandraHost,
CassandraClientPool pools,
CassandraClientMonitor cassandraClientMonitor) {
this(cassandraHost, pools, cassandraClientMonitor, new CassandraClientFactory(pools, cassandraHost, cassandraClientMonitor));
}

public CassandraClientPoolByHostImpl(CassandraHost cassandraHost,
CassandraClientPool pools,
CassandraClientMonitor cassandraClientMonitor,
CassandraClientFactory cassandraClientFactory) {
log.debug("Creating new connection pool for {}", cassandraHost.getUrlPort());
url = cassandraHost.getUrl();
port = cassandraHost.getPort();
this.name = cassandraHost.getName();
this.maxActive = cassandraHost.getMaxActive();
this.maxIdle = cassandraHost.getMaxIdle();
this.maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted();
this.exhaustedPolicy = cassandraHost.getExhaustedPolicy();
this.clientFactory = cassandraClientFactory;

blockedThreadsCount = new AtomicInteger(0);
// Create a set implemented as a ConcurrentHashMap for performance and concurrency.
liveClientsFromPool =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public CassandraClientPool createNew() {
return pool;
}

public CassandraClientPool createNew(String[] urlPorts) {
CassandraClientPool pool = new CassandraClientPoolImpl(jmx.getCassandraMonitor(), urlPorts);
public CassandraClientPool createNew(CassandraHostConfigurator cassandraHostConfigurator) {
CassandraClientPool pool = new CassandraClientPoolImpl(jmx.getCassandraMonitor(), cassandraHostConfigurator.buildCassandraHosts());
jmx.addPool(pool);
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package me.prettyprint.cassandra.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -30,20 +28,20 @@
/**
* Mapping b/w the host identifier (url:port) and the pool used to store connections to it.
*/
private final Map<PoolKey, CassandraClientPoolByHost> pools;
private final Map<CassandraHost, CassandraClientPoolByHost> pools;

private final CassandraClientMonitor clientMonitor;

public CassandraClientPoolImpl(CassandraClientMonitor clientMonitor) {
pools = new HashMap<PoolKey, CassandraClientPoolByHost>();
pools = new HashMap<CassandraHost, CassandraClientPoolByHost>();
this.clientMonitor = clientMonitor;
}

public CassandraClientPoolImpl(CassandraClientMonitor clientMonitor, String[] cassandraHosts) {
public CassandraClientPoolImpl(CassandraClientMonitor clientMonitor, CassandraHost[] cassandraHosts) {
this(clientMonitor);
for (String urlPort : cassandraHosts) {
log.debug("Creating pool-by-host instance: {}", urlPort);
getPool(parseHostFromUrl(urlPort),parsePortFromUrl(urlPort));
for (CassandraHost cassandraHost : cassandraHosts) {
log.debug("Creating pool-by-host instance: {}", cassandraHost);
getPool(cassandraHost);
}
}

Expand All @@ -53,8 +51,8 @@ public CassandraClient borrowClient() throws IllegalStateException,
PoolExhaustedException, Exception {
String[] clients = new String[pools.size()];
int x = 0;
for(PoolKey poolKey : pools.keySet()) {
clients[x] = poolKey.getUrlPort();
for(CassandraHost cassandraHost : pools.keySet()) {
clients[x] = cassandraHost.getUrlPort();
x++;
}
return borrowClient(clients);
Expand All @@ -63,7 +61,7 @@ public CassandraClient borrowClient() throws IllegalStateException,
@Override
public CassandraClient borrowClient(String url, int port)
throws IllegalStateException, PoolExhaustedException, Exception {
return getPool(url, port).borrowClient();
return getPool(new CassandraHost(url, port)).borrowClient();
}

@Override
Expand Down Expand Up @@ -120,21 +118,21 @@ public int getNumPools() {
return pools.size();
}

public CassandraClientPoolByHost getPool(String url, int port) {
PoolKey key = new PoolKey(url, port);
CassandraClientPoolByHost pool = pools.get(key);
public CassandraClientPoolByHost getPool(CassandraHost cassandraHost) {
CassandraClientPoolByHost pool = pools.get(cassandraHost);
if (pool == null) {
synchronized (pools) {
pool = pools.get(key);
pool = pools.get(cassandraHost);
if (pool == null) {
pool = new CassandraClientPoolByHostImpl(url, port, key.name, this, clientMonitor);
pools.put(key, pool);
pool = new CassandraClientPoolByHostImpl(cassandraHost, this, clientMonitor);
pools.put(cassandraHost, pool);
}
}
}
return pool;
}


@Override
public Set<String> getPoolNames() {
Set<String> names = new HashSet<String>();
Expand Down Expand Up @@ -165,71 +163,6 @@ public Set<String> getKnownHosts() {
return hosts;
}

private class PoolKey {
private final String url, ip;
private final int port;
private final String name;

public PoolKey(String url2, int port) {
this.port = port;
StringBuilder b = new StringBuilder();
InetAddress address;
String turl, tip;
try {
address = InetAddress.getByName(url2);
tip = address.getHostAddress();
turl = isPerformNameResolution() ? address.getHostName() : tip;
} catch (UnknownHostException e) {
log.error("Unable to resolve host {}", url2);
turl = url2;
tip = url2;
}
url = turl;
ip = tip;
b.append(url);
if (isPerformNameResolution()) {
b.append("(");
b.append(ip);
b.append(")");
}
b.append(":");
b.append(port);
name = b.toString();
}

private String getUrlPort() {
return new StringBuilder(32).append(url).append(':').append(port).toString();
}

/**
* Checks whether name resolution should occur.
* @return
*/
private boolean isPerformNameResolution() {
String sysprop = System.getProperty(
SystemProperties.HECTOR_PERFORM_NAME_RESOLUTION.toString());
return sysprop != null && Boolean.valueOf(sysprop);

}
@Override
public String toString() {
return name;
}

@Override
public boolean equals(Object obj) {
if (! (obj instanceof PoolKey)) {
return false;
}
return ((PoolKey) obj).name.equals(name);
}

@Override
public int hashCode() {
return name.hashCode();
}

}

@Override
public void invalidateClient(CassandraClient client) {
Expand All @@ -241,7 +174,7 @@ void reportDestroyed(CassandraClient client) {
}

private CassandraClientPoolByHost getPool(CassandraClient c) {
return getPool(c.getUrl(), c.getPort());
return getPool(new CassandraHost(c.getUrl(), c.getPort()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import me.prettyprint.cassandra.service.CassandraClientPoolByHost.ExhaustedPolicy;
import me.prettyprint.cassandra.service.ExhaustedPolicy;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -28,9 +28,13 @@ public void setupTest() throws Exception {
when(factory.makeObject()).thenReturn(createdClient);
when(factory.validateObject(createdClient)).thenReturn(true);

pool = new CassandraClientPoolByHostImpl("url", 1111, "name", poolStore, 50 /*maxActive*/,
10000 /*maxWait*/, 5 /*maxIdle*/, ExhaustedPolicy.WHEN_EXHAUSTED_FAIL,
factory);
CassandraHost cassandraHost = new CassandraHost("url", 1111);
cassandraHost.setMaxActive(50);
cassandraHost.setMaxWaitTimeWhenExhausted(10000);
cassandraHost.setMaxIdle(5);
cassandraHost.setExhaustedPolicy(ExhaustedPolicy.WHEN_EXHAUSTED_FAIL);
pool = new CassandraClientPoolByHostImpl(cassandraHost, poolStore, new CassandraClientMonitor(), factory);

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setupTest() {

@Test
public void testGetPool() {
CassandraClientPoolByHost pool = store.getPool("x", 1);
CassandraClientPoolByHost pool = store.getPool(new CassandraHost("x", 1));
assertNotNull(pool);
}

Expand Down Expand Up @@ -142,10 +142,11 @@ public void testInvalidateClient()
CassandraClient client3 = store.borrowClient("localhost", 9170);
assertNotNull(client3);
assertSame("client2 should have been reused", client2, client3);
CassandraHost cassandraHost = new CassandraHost("localhost", 9170);
assertFalse("client1 is not in the liveClients set anymore",
store.getPool("localhost", 9170).getLiveClients().contains(client1));
store.getPool(cassandraHost).getLiveClients().contains(client1));
assertTrue("client2 is in the liveClients set anymore",
store.getPool("localhost", 9170).getLiveClients().contains(client2));
store.getPool(cassandraHost).getLiveClients().contains(client2));
}

@Test
Expand All @@ -166,11 +167,12 @@ public void testInvalidateAll()
assertNotSame(client1, client3);
assertNotSame(client2, client3);

CassandraHost cassandraHost = new CassandraHost("localhost", 9170);
assertFalse("client1 should not be in the liveClients set anymore",
store.getPool("localhost", 9170).getLiveClients().contains(client1));
store.getPool(cassandraHost).getLiveClients().contains(client1));
assertFalse("client2 should not be in the liveClients set anymore",
store.getPool("localhost", 9170).getLiveClients().contains(client2));
store.getPool(cassandraHost).getLiveClients().contains(client2));
assertTrue("client3 should be in the liveClients set ",
store.getPool("localhost", 9170).getLiveClients().contains(client3));
store.getPool(cassandraHost).getLiveClients().contains(client3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void teardown() throws IOException {
public void setupCase() throws TTransportException, TException, UnknownHostException {
pools = mock(CassandraClientPool.class);
monitor = mock(CassandraClientMonitor.class);
client = new CassandraClientFactory(pools, "localhost", 9170, monitor).create();
client = new CassandraClientFactory(pools, new CassandraHost("localhost", 9170), monitor).create();
}

@Test
Expand Down
Loading

0 comments on commit 2687509

Please sign in to comment.