Skip to content

Commit

Permalink
Merge branch 'master' of http://github.com/btoddb/scale7-pelops into …
Browse files Browse the repository at this point in the history
…btoddb-master

Conflicts:
	pom.xml
	src/main/java/org/scale7/cassandra/pelops/Bytes.java
	src/main/java/org/scale7/cassandra/pelops/KeyspaceManager.java
	src/main/java/org/scale7/cassandra/pelops/Mutator.java
	src/main/java/org/scale7/cassandra/pelops/Selector.java
  • Loading branch information
danwashusen committed Oct 19, 2010
1 parent 8c4bf9d commit 832f039
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 19 deletions.
4 changes: 4 additions & 0 deletions .cvsignore
@@ -0,0 +1,4 @@
.settings
.classpath
.project
target
1 change: 1 addition & 0 deletions .gitignore
@@ -1,3 +1,4 @@
CVS
.project
.classpath
.settings
Expand Down
213 changes: 194 additions & 19 deletions src/main/java/org/scale7/cassandra/pelops/CachePerNodePool.java
Expand Up @@ -12,9 +12,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand All @@ -36,7 +37,7 @@
* @author dominicwilliams
*
*/
public class CachePerNodePool extends ThriftPoolBase {
public class CachePerNodePool extends ThriftPoolBase implements CachePerNodePoolMXBean {

private static final Logger logger = SystemProxy.getLoggerFromFactory(CachePerNodePool.class);

Expand All @@ -48,6 +49,28 @@ public class CachePerNodePool extends ThriftPoolBase {
private ExecutorService clusterWatcherExec = Executors.newSingleThreadExecutor();
private AtomicBoolean isShutdown = new AtomicBoolean(false);

private AtomicLong getConnCount = new AtomicLong();
private AtomicLong leastLoadedSelectedCount = new AtomicLong();
private AtomicLong leastLoadedNotSelectedCount = new AtomicLong();
private AtomicLong cacheConnNotSelectedCount = new AtomicLong();
private AtomicLong cacheConnSelectedCount = new AtomicLong();
private AtomicLong connSelectedAlreadyOpenCount = new AtomicLong();
private AtomicLong connSelectedNotAlreadyOpenCount = new AtomicLong();
private AtomicLong connCannotOpenCount = new AtomicLong();
private AtomicLong connClosedCount = new AtomicLong();
private AtomicLong connCreatedCount = new AtomicLong();
private AtomicLong connReleaseCalledCount = new AtomicLong();
private AtomicLong connAddToCacheCount = new AtomicLong();
private AtomicLong connCreateExceptionCount = new AtomicLong();
private AtomicLong connOpenedCount = new AtomicLong();
private AtomicLong connReturnedToCacheCount = new AtomicLong();
private AtomicLong deadConnCount = new AtomicLong();
private AtomicLong networkExceptionCount = new AtomicLong();
private AtomicLong purgeAllSessionConnsCount = new AtomicLong();
private AtomicLong refillBackoffCount = new AtomicLong();
private AtomicLong refillNeedConnCount = new AtomicLong();
private AtomicLong getConnBackoffCount = new AtomicLong();

/**
* Get a Cassandra connection to the least loaded node represented in the connection pool.
* @return A connection to Cassandra
Expand All @@ -60,13 +83,14 @@ public IConnection getConnection() throws Exception {
/**
* Constructs a pool instance.
* Note: unless you are performing management options the keyspace should be provided.
* @param defaultPort the port to content the nodes on (9160)
* @param cluster the cluster info to connect to
* @param keyspace the keyspace to use (note: as of 0.7.0 this is basically a required parameter)
* @param generalPolicy the general pelops policy
* @param poolPolicy the pool policy
*/
public CachePerNodePool(Cluster cluster, String keyspace, OperandPolicy generalPolicy, Policy poolPolicy) {
this.cluster = cluster;
JmxMBeanManager.getInstance().registerMBean(this, JMX_MBEAN_OBJ_NAME+"-"+keyspace);
this.cluster = cluster;
this.generalPolicy = generalPolicy;
pool = new MultiNodePool();
this.keyspace = keyspace;
Expand All @@ -90,6 +114,8 @@ public CachePerNodePool(Cluster cluster, String keyspace, OperandPolicy generalP
*/
@Override
public IConnection getConnectionExcept(String notNode) throws Exception {
getConnCount.incrementAndGet();

// Create a list of nodes we have already tried, and therefore should avoid in preference
// to trying new nodes.
List<String> triedNodes = null;
Expand All @@ -109,13 +135,17 @@ public IConnection getConnectionExcept(String notNode) throws Exception {
for (NodeContext nodeContext : nodeContexts) {
if (nodeContext.isAvailable()) {
if (triedNodes == null || !triedNodes.contains(nodeContext.node))
if (leastLoaded == null || leastLoaded.getNodeLoadIndex() >= nodeContext.getNodeLoadIndex())
if (leastLoaded == null || leastLoaded.getNodeLoadIndex() >= nodeContext.getNodeLoadIndex()) {
leastLoaded = nodeContext;
leastLoadedSelectedCount.incrementAndGet();
}
}
}
// If we could not find an available untried node then break out and try any node
if (leastLoaded == null)
if (leastLoaded == null) {
leastLoadedNotSelectedCount.incrementAndGet();
break;
}
// otherwise, try to return a connection from this least loaded untried node
IConnection conn = leastLoaded.getConnection();
if (conn != null)
Expand All @@ -138,6 +168,9 @@ public IConnection getConnectionExcept(String notNode) throws Exception {
if (conn != null)
return conn;
}

getConnBackoffCount.incrementAndGet();

// Nope, that didn't work so need to back off and try again in a moment
logger.warn("Unable to find a node to connect to. Backing off...");
failedAttempts++;
Expand Down Expand Up @@ -249,7 +282,9 @@ public class Connection implements IConnection {
int nodeSessionId = 0;

Connection(String node, int port, String keyspace, ConnectionReleaseHandler releaseHandler) throws SocketException, TException, InvalidRequestException {
this.node = node;
connCreatedCount.incrementAndGet();

this.node = node;
this.keyspace = keyspace;
this.releaseHandler = releaseHandler;
TSocket socket = new TSocket(node, port);
Expand Down Expand Up @@ -335,6 +370,7 @@ public int getSessionId() {
*/
@Override
public void close() {
connClosedCount.incrementAndGet();
transport.close();
}
}
Expand Down Expand Up @@ -395,15 +431,23 @@ IConnection getConnection() {
synchronized (connCacheLock) {
conn = connCache.poll();
}
if (conn == null)
if (conn == null) {
cacheConnNotSelectedCount.incrementAndGet();
return null;
else
}
else {
cacheConnSelectedCount.incrementAndGet();
countCached.decrementAndGet();
}

if (conn.isOpen()) {
countInUse.incrementAndGet();
connSelectedAlreadyOpenCount.incrementAndGet();
return conn;
}
else {
connSelectedNotAlreadyOpenCount.incrementAndGet();
}
}
} finally {
// Need to check whether pool refill needed
Expand All @@ -412,18 +456,35 @@ IConnection getConnection() {
}

void onConnectionRelease(Connection conn, boolean networkException) {
// This connection is no longer in use
countInUse.decrementAndGet();
connReleaseCalledCount.incrementAndGet();
// Is this connection still open/reusable?
if (!networkException) {
// Yes, we can keep this connection if we still want it!
if (conn.isOpen() && (countInUse.get() + countCached.get()) < poolPolicy.getTargetConnectionsPerNode()) {
connCache.add(conn);
countCached.incrementAndGet();
} else {
conn.close();
}
if ( conn.isOpen() &&
(countInUse.get() + countCached.get() <= poolPolicy.getTargetConnectionsPerNode() ||
countCached.get() < poolPolicy.getTargetConnectionsPerNode()) ) {
connReturnedToCacheCount.incrementAndGet();
connCache.add(conn);
countCached.incrementAndGet();
}
else {
conn.close();
}
// // Yes, we can keep this connection if we still want it!
// if (conn.isOpen() && (countInUse.get() + countCached.get()) <= poolPolicy.getTargetConnectionsPerNode()) {
// connReturnedToCacheCount.incrementAndGet();
// connCache.add(conn);
// countCached.incrementAndGet();
// } else {
// conn.close();
// }

// This connection is no longer in use
countInUse.decrementAndGet();
} else {
// This connection is no longer in use
countInUse.decrementAndGet();

networkExceptionCount.incrementAndGet();
// close connection
conn.close();
// kill all connections to this node?
Expand All @@ -446,17 +507,23 @@ public void release(Connection connection, boolean afterException) {
}
});
} catch (Exception e) {
connCreateExceptionCount.incrementAndGet();
logger.error(e.getMessage(), e);
return null;
}

if (conn.open(sessionId.get()))
if (conn.open(sessionId.get())) {
connOpenedCount.incrementAndGet();;
return conn;
}

connCannotOpenCount.incrementAndGet();

return null;
}

private void purgeConnsCreatedToSession(int nodeSessionId) {
purgeAllSessionConnsCount.incrementAndGet();
logger.warn("{} NodeContext killing all pooled connections for session {}", node, nodeSessionId);
int killedCount = 0;
synchronized (connCacheLock) {
Expand Down Expand Up @@ -498,6 +565,7 @@ public void run() {
int foundDead = 0;
for (IConnection conn : connCache)
if (!conn.isOpen()) {
deadConnCount.incrementAndGet();
countCached.decrementAndGet();
connCache.remove(conn);
foundDead++;
Expand All @@ -511,9 +579,11 @@ public void run() {
// Do we actually want to create any more connections?
while (countCached.get() < poolPolicy.getMinCachedConnectionsPerNode() ||
(countInUse.get() + countCached.get()) < poolPolicy.getTargetConnectionsPerNode()) {
refillNeedConnCount.incrementAndGet();
// Yup create new connection for cache
IConnection conn = createConnection();
if (conn == null) {
refillBackoffCount.incrementAndGet();
// Connection error occurred. Calculate back off delay
failureCount++;
backOffDelay = NetworkAlgorithms.getBinaryBackoffDelay(
Expand All @@ -524,6 +594,7 @@ public void run() {
}
// We managed to create new connection
failureCount = 0;
connAddToCacheCount.incrementAndGet();
// Add new connection to waiting cache
countCached.incrementAndGet();
connCache.add(conn);
Expand Down Expand Up @@ -659,4 +730,108 @@ public void setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
}

@Override
public long getGetConnCount() {
return getConnCount.get();
}

@Override
public long getLeastLoadedSelectedCount() {
return leastLoadedSelectedCount.get();
}

@Override
public long getLeastLoadedNotSelectedCount() {
return leastLoadedNotSelectedCount.get();
}

@Override
public long getCacheConnNotSelectedCount() {
return cacheConnNotSelectedCount.get();
}

@Override
public long getCacheConnSelectedCount() {
return cacheConnSelectedCount.get();
}

@Override
public long getConnSelectedAlreadyOpenCount() {
return connSelectedAlreadyOpenCount.get();
}

@Override
public long getConnSelectedNotAlreadyOpenCount() {
return connSelectedNotAlreadyOpenCount.get();
}

@Override
public long getConnCannotOpenCount() {
return connCannotOpenCount.get();
}

@Override
public long getConnClosedCount() {
return connClosedCount.get();
}

@Override
public long getConnCreatedCount() {
return connCreatedCount.get();
}

@Override
public long getConnReleaseCalledCount() {
return connReleaseCalledCount.get();
}

@Override
public long getConnAddToCacheCount() {
return connAddToCacheCount.get();
}

@Override
public long getConnCreateExceptionCount() {
return connCreateExceptionCount.get();
}

@Override
public long getConnOpenedCount() {
return connOpenedCount.get();
}

@Override
public long getConnReturnedToCacheCount() {
return connReturnedToCacheCount.get();
}

@Override
public long getDeadConnCount() {
return deadConnCount.get();
}

@Override
public long getNetworkExceptionCount() {
return networkExceptionCount.get();
}

@Override
public long getPurgeAllSessionConnsCount() {
return purgeAllSessionConnsCount.get();
}

@Override
public long getRefillBackoffCount() {
return refillBackoffCount.get();
}

@Override
public long getRefillNeedConnCount() {
return refillNeedConnCount.get();
}

public long getGetConnBackoffCount() {
return getConnBackoffCount.get();
}
}

0 comments on commit 832f039

Please sign in to comment.