Skip to content

Commit

Permalink
more noodling
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Sep 28, 2011
1 parent 1b6715c commit 60a85b6
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 73 deletions.
172 changes: 172 additions & 0 deletions src/main/java/org/nodex/java/core/ConnectionPool2.java
@@ -0,0 +1,172 @@
/*
* Copyright 2011 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.nodex.java.core;

import org.nodex.java.core.internal.NodexInternal;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>A simple, non-blocking pool implementation</p>
*
* <p>TODO the implementation can be improved. Currently it uses synchronization which may cause
* contention issues under high load. Consider replacing with lock-free algorithm.</p>
*
* <p>TODO update to use Futures</T></p>
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class ConnectionPool2<T> {

private final Queue<T> available = new ConcurrentLinkedQueue<>();
private int maxPoolSize = 1;
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final Queue<Waiter> waiters = new ConcurrentLinkedQueue<>();

/**
* Set the maximum pool size to the value specified by {@code maxConnections}<p>
* The client will maintain up to {@code maxConnections} HTTP connections in an internal pool<p>
*/
public void setMaxPoolSize(int maxConnections) {
this.maxPoolSize = maxConnections;
}

/**
* Returns the maximum number of connections in the pool
*/
public int getMaxPoolSize() {
return maxPoolSize;
}

/**
* Get a connection from the pool. The connection is returned in the handler, some time in the future if a
* connection becomes available.
* @param handler The handler
* @param contextID The context id
*/
public void getConnection(Handler<T> handler, long contextID) {
boolean connect = false;
outer: synchronized (this) {
T conn = available.poll();
if (conn != null) {
handler.handle(conn);
} else {
if (connectionCount.get() < maxPoolSize) {
if (connectionCount.incrementAndGet() <= maxPoolSize) {
//Create new connection
connect = true;
break outer;
} else {
connectionCount.decrementAndGet();
}
}
// Add to waiters
waiters.add(new Waiter(handler, contextID));
}
}
if (connect) {
connect(handler, contextID);
}
}

/**
* Inform the pool that the connection has been closed externally.
*/
public void connectionClosed() {

Waiter waiter;

synchronized (this) {

if (connectionCount.decrementAndGet() < maxPoolSize) {
//Now the connection count has come down, maybe there is another waiter that can
//create a new connection
waiter = waiters.poll();
if (waiter != null) {
//getConnection(waiter.handler, waiter.contextID);
connectionCount.incrementAndGet();
}
} else {
waiter = null;
}
}

if (waiter != null) {
connect(waiter.handler, waiter.contextID);
}
}

/**
* Return a connection to the pool so it can be used by others.
*/
public void returnConnection(final T conn) {

Waiter waiter;

synchronized (this) {

//Return it to the pool
waiter = waiters.poll();

if (waiter != null) {
// NodexInternal.instance.executeOnContext(waiter.contextID, new Runnable() {
// public void run() {
// NodexInternal.instance.setContextID(waiter.contextID);
// waiter.handler.handle(conn);
// }
// });
} else {
available.add(conn);
}
}

if (waiter != null) {
final Waiter w = waiter;
NodexInternal.instance.executeOnContext(w.contextID, new Runnable() {
public void run() {
NodexInternal.instance.setContextID(w.contextID);
w.handler.handle(conn);
}
});
}
}

/**
* Close the pool
*/
public void close() {
available.clear();
waiters.clear();
}

/**
* Implement this method in a sub-class to implement the actual connection creation for the specific type of connection
*/
protected abstract void connect(final Handler<T> connectHandler, final long contextID);

private class Waiter {
final Handler<T> handler;
final long contextID;

private Waiter(Handler<T> handler, long contextID) {
this.handler = handler;
this.contextID = contextID;
}
}
}
74 changes: 1 addition & 73 deletions src/main/java/org/nodex/java/core/DeferredAction.java
Expand Up @@ -27,58 +27,9 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class DeferredAction<T> implements Deferred<T> {
public abstract class DeferredAction<T> extends SimpleFuture<T> implements Deferred<T> {

private T result;
private Exception exception;
private CompletionHandler<T> completionHandler;
protected boolean executed;
protected boolean complete;

/**
* {@inheritDoc}
*/
public T result() {
return result;
}

/**
* {@inheritDoc}
*/
public Exception exception() {
return exception;
}

/**
* {@inheritDoc}
*/
public boolean complete() {
return complete;
}

/**
* {@inheritDoc}
*/
public boolean succeeded() {
return complete && exception == null;
}

/**
* {@inheritDoc}
*/
public boolean failed() {
return complete && exception != null;
}

/**
* {@inheritDoc}
*/
public void handler(CompletionHandler<T> completionHandler) {
this.completionHandler = completionHandler;
if (complete) {
callHandler();
}
}

/**
* {@inheritDoc}
Expand All @@ -91,33 +42,10 @@ public Deferred<T> execute() {
return this;
}

/**
* Call this method with the result of the action when it is complete.
*/
public void setResult(T result) {
complete = true;
this.result = result;
callHandler();
}

/**
* Call this method with an Exception if the action failed
*/
public void setException(Exception e) {
complete = true;
this.exception = e;
callHandler();
}

/**
* Override this method to implement the deferred operation.
* When the operation is complete be sure to call {@link #setResult} or {@link #setException}
*/
protected abstract void run();

private void callHandler() {
if (completionHandler != null) {
completionHandler.handle(this);
}
}
}
97 changes: 97 additions & 0 deletions src/main/java/org/nodex/java/core/SimpleFuture.java
@@ -0,0 +1,97 @@
/*
* Copyright 2011 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.nodex.java.core;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class SimpleFuture<T> implements Future<T> {

private T result;
private Exception exception;
private CompletionHandler<T> completionHandler;
protected boolean complete;

/**
* {@inheritDoc}
*/
public T result() {
return result;
}

/**
* {@inheritDoc}
*/
public Exception exception() {
return exception;
}

/**
* {@inheritDoc}
*/
public boolean complete() {
return complete;
}

/**
* {@inheritDoc}
*/
public boolean succeeded() {
return complete && exception == null;
}

/**
* {@inheritDoc}
*/
public boolean failed() {
return complete && exception != null;
}

/**
* {@inheritDoc}
*/
public void handler(CompletionHandler<T> completionHandler) {
this.completionHandler = completionHandler;
if (complete) {
callHandler();
}
}

/**
* Call this method with the result of the action when it is complete.
*/
public void setResult(T result) {
complete = true;
this.result = result;
callHandler();
}

/**
* Call this method with an Exception if the action failed
*/
public void setException(Exception e) {
complete = true;
this.exception = e;
callHandler();
}

private void callHandler() {
if (completionHandler != null) {
completionHandler.handle(this);
}
}
}

0 comments on commit 60a85b6

Please sign in to comment.