Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Implement load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
rantav committed Mar 1, 2010
1 parent c9189d8 commit 58111e4
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 11 deletions.
2 changes: 0 additions & 2 deletions README
Expand Up @@ -11,8 +11,6 @@ This client provides:
o failover behavior on the client side
o connection pooling for improved performance and scalability
o JMX conters for monitoring and management

TODO:
o load balancing

The work was initially inspired by http://code.google.com/p/cassandra-java-client/ but has taken off to different directions since.
Expand Down
5 changes: 3 additions & 2 deletions build.xml
Expand Up @@ -9,7 +9,8 @@
<!-- ====================================================================== -->
<!-- Build environment properties -->
<!-- ====================================================================== -->
<property name="final-name" value="hector-0.5.0-5"/>
<property name="version" value="0.5.0-6"/>
<property name="final-name" value="hector-${version}"/>
<property name="build.dir" value="target"/>
<property name="project.build.directory" value="${build.outputDir}"/>
<property name="test.reports" value="${build.dir}/test-reports"/>
Expand Down Expand Up @@ -182,7 +183,7 @@
<!-- Package target -->
<!-- ====================================================================== -->

<target name="package" depends="compile,test" description="Package the application">
<target name="package" depends="compile" description="Package the application">
<mkdir dir="${build.dir}"/>
<jar jarfile="${build.dir}/${final-name}.jar"
compress="true"
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Expand Up @@ -5,7 +5,7 @@
<artifactId>hector</artifactId>
<packaging>jar</packaging>
<!-- The version follows Cassandra's major version changes, e.g. 0.5.0 goes with the 0.5.0 cassandra release-->
<version>0.5.0-5</version>
<version>0.5.0-6</version>
<name>hector</name>
<description>Cassandra Java Client Library</description>
<url>http://github.com/rantav/hector</url>
Expand Down Expand Up @@ -158,6 +158,11 @@
<version>1.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
</dependency>
</dependencies>

<reporting>
Expand Down
Expand Up @@ -32,6 +32,8 @@ public enum Counter {
READ_SUCCESS,
READ_FAIL,
POOL_EXHAUSTED,
/** Load balance connection errors */
RECOVERABLE_LB_CONNECT_ERRORS,
}

public CassandraClientMonitor() {
Expand Down Expand Up @@ -173,10 +175,15 @@ public long getRecoverableTransportExceptionCount() {
@Override
public long getRecoverableErrorCount() {
return getRecoverableTimedOutCount() + getRecoverableTransportExceptionCount() +
getRecoverableUnavailableCount();
getRecoverableUnavailableCount() + getRecoverableLoadBalancedConnectErrors();
}

public void addPool(CassandraClientPool pool) {
pools.add(pool);
}

@Override
public long getRecoverableLoadBalancedConnectErrors() {
return counters.get(Counter.RECOVERABLE_LB_CONNECT_ERRORS).longValue();
}
}
Expand Up @@ -90,6 +90,8 @@ public interface CassandraClientMonitorMBean {
*/
int getNumExhaustedPools();

long getRecoverableLoadBalancedConnectErrors();

/**
* List of exhausted pools.
* @return
Expand Down
Expand Up @@ -40,6 +40,29 @@ public interface CassandraClientPool {
CassandraClient borrowClient(String url, int port)
throws IllegalStateException, PoolExhaustedException, Exception;

/**
* Borrows a client, similar to {@link #borrowClient(String, int)}, but expects the url:port
* string format
* @param urlPort a string of the format url:port
*/
CassandraClient borrowClient(String urlPort)
throws IllegalStateException, PoolExhaustedException, Exception;

/**
* Borrows a load-balanced client, a random client from the array of given client addresses.
*
* This method is typically used to allow load balancing b/w the list of given client URLs. The
* method will return a random client from the array of the given url:port pairs.
* The method will try connecting each host in the list and will only stop when there's one
* successful connection, so in that sense it's also useful for failover.
*
* @param clientUrls An array of "url:port" cassandra client addresses.
*
* @return A randomly chosen client from the array of clientUrls.
* @throws Exception
*/
CassandraClient borrowClient(String[] clientUrls) throws Exception;

/**
* Releases a client from the pool it belongs to.
*/
Expand Down
Expand Up @@ -2,11 +2,16 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import me.prettyprint.cassandra.service.CassandraClientMonitor.Counter;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,12 +29,12 @@
/**
* Mapping b/w the host identifier (url:port) and the pool used to store connections to it.
*/
private final Map<String, CassandraClientPoolByHost> pools;
private final Map<PoolKey, CassandraClientPoolByHost> pools;

private final CassandraClientMonitor clientMonitor;

public CassandraClientPoolImpl(CassandraClientMonitor clientMonitor) {
pools = new HashMap<String, CassandraClientPoolByHost>();
pools = new HashMap<PoolKey, CassandraClientPoolByHost>();
this.clientMonitor = clientMonitor;
}

Expand Down Expand Up @@ -95,13 +100,13 @@ public int getNumPools() {

public CassandraClientPoolByHost getPool(String url, int port) {
PoolKey key = new PoolKey(url, port);
CassandraClientPoolByHost pool = pools.get(key.ip);
CassandraClientPoolByHost pool = pools.get(key);
if (pool == null) {
synchronized (pools) {
pool = pools.get(key.ip);
pool = pools.get(key);
if (pool == null) {
pool = new CassandraClientPoolByHostImpl(url, port, key.name, this, clientMonitor);
pools.put(key.ip, pool);
pools.put(key, pool);
}
}
}
Expand Down Expand Up @@ -168,6 +173,25 @@ public PoolKey(String url, int port) {
b.append(port);
name = b.toString();
}

@Override
public String toString() {
return name;
}

@Override
public boolean equals(Object obj) {
if (! (obj instanceof PoolKey)) {
return false;
}
return ((PoolKey) obj).name.equals(name);
}

@Override
public int hashCode() {
return name.hashCode();
}

}

@Override
Expand All @@ -188,4 +212,36 @@ private CassandraClientPoolByHost getPool(CassandraClient c) {
public void releaseKeyspace(Keyspace k) throws Exception {
releaseClient(k.getClient());
}

@Override
public CassandraClient borrowClient(String urlPort) throws IllegalStateException,
PoolExhaustedException, Exception {
int delim = urlPort.lastIndexOf(':');
String url = urlPort.substring(0, delim);
String strPort = urlPort.substring(delim + 1, urlPort.length());
int port = Integer.valueOf(strPort);
return borrowClient(url, port);
}

@Override
public CassandraClient borrowClient(String[] clientUrls) throws Exception {
List<String> clients = new ArrayList<String>(Arrays.asList(clientUrls));
while(!clients.isEmpty()) {
int rand = (int) (Math.random() * clients.size());
try {
return borrowClient(clients.get(rand));
} catch (Exception e) {
if (clients.size() > 1) {
log.warn("Unable to obtain client " + clients.get(rand) + " will try the next client", e);
clientMonitor.incCounter(Counter.RECOVERABLE_LB_CONNECT_ERRORS);
clients.remove(rand);
} else {
throw e;
}
}
}
// Method should never get here; an exception must have been thrown before, I'm only writing
// this to make the compiler happy.
return null;
}
}
Expand Up @@ -6,6 +6,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import java.io.IOException;

Expand Down Expand Up @@ -64,6 +65,40 @@ public void testBorrowClient() throws IllegalStateException, PoolExhaustedExcept
assertEquals(9170, client.getPort());
}

/**
* test the url:port format
*/
@Test
public void testBorrowClient1() throws IllegalStateException, PoolExhaustedException, Exception {
CassandraClient client = store.borrowClient("localhost:9170");
assertNotNull(client);
assertEquals("localhost", client.getUrl());
assertEquals(9170, client.getPort());
}

/**
* test the url:port array format (load balanced)
*/
@Test
public void testBorrowLbClient() throws IllegalStateException, PoolExhaustedException, Exception {
CassandraClient client = store.borrowClient(new String[] {"localhost:9170"});
assertNotNull(client);
assertEquals("localhost", client.getUrl());
assertEquals(9170, client.getPort());

client = store.borrowClient(new String[] {"localhost:9170", "localhost:9171", "localhost:9172"});
assertNotNull(client);
assertEquals("localhost", client.getUrl());
assertEquals(9170, client.getPort());

try {
client = store.borrowClient(new String[] {"localhost:9171", "localhost:9172"});
fail("Should not have boon able to obtain a client");
} catch (Exception e) {
// ok
}
}

@Test
public void testReleaseClient() throws IllegalStateException, PoolExhaustedException, Exception {
CassandraClient client = store.borrowClient("localhost", 9170);
Expand Down

0 comments on commit 58111e4

Please sign in to comment.