Skip to content

Commit

Permalink
Added Server.getConnectionAsync and supporting implementation in Defa…
Browse files Browse the repository at this point in the history
…ultServer and DefaultConnectionPool

The connection pool implements it with a single threaded executor service and handles wait queue size and timeout enforcement

  JAVA-1200
  • Loading branch information
jyemin committed Nov 25, 2014
1 parent 4cd0b87 commit 3b544aa
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.connection;

import com.mongodb.async.SingleResultCallback;

import java.io.Closeable;
import java.util.concurrent.TimeUnit;

Expand All @@ -25,6 +27,8 @@ interface ConnectionPool extends Closeable {

InternalConnection get(long timeout, TimeUnit timeUnit);

void getAsync(SingleResultCallback<InternalConnection> callback);

void invalidate();

void close();
Expand Down
144 changes: 119 additions & 25 deletions driver-core/src/main/com/mongodb/connection/DefaultConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.MongoInternalException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.MongoWaitQueueFullException;
import com.mongodb.async.MongoFuture;
import com.mongodb.async.SingleResultCallback;
Expand Down Expand Up @@ -53,6 +54,7 @@ class DefaultConnectionPool implements ConnectionPool {
private final AtomicInteger waitQueueSize = new AtomicInteger(0);
private final AtomicInteger generation = new AtomicInteger(0);
private final ExecutorService sizeMaintenanceTimer;
private ExecutorService asyncGetter;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
Expand Down Expand Up @@ -81,39 +83,96 @@ public InternalConnection get() {
public InternalConnection get(final long timeout, final TimeUnit timeUnit) {
try {
if (waitQueueSize.incrementAndGet() > settings.getMaxWaitQueueSize()) {
throw new MongoWaitQueueFullException(format("Too many threads are already waiting for a connection. "
+ "Max number of threads (maxWaitQueueSize) of %d has been exceeded.",
settings.getMaxWaitQueueSize()));
throw createWaitQueueFullException();
}
connectionPoolListener.waitQueueEntered(new ConnectionPoolWaitQueueEvent(serverId, currentThread().getId()));
UsageTrackingInternalConnection internalConnection = pool.get(timeout, timeUnit);
while (shouldPrune(internalConnection)) {
pool.release(internalConnection, true);
internalConnection = pool.get(timeout, timeUnit);
}
if (!internalConnection.opened()) {
try {
internalConnection.open();
} catch (Throwable t) {
pool.release(internalConnection, true);
if (t instanceof MongoException) {
throw (MongoException) t;
} else {
throw new MongoInternalException(t.toString(), t);
}
}
}
connectionPoolListener.connectionCheckedOut(new ConnectionEvent(getId(internalConnection)));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Checked out connection [%s] to server %s", getId(internalConnection), serverId.getAddress()));
}
return new PooledConnection(internalConnection);
return getPooledConnection(timeout, timeUnit);
} finally {
waitQueueSize.decrementAndGet();
connectionPoolListener.waitQueueExited(new ConnectionPoolWaitQueueEvent(serverId, currentThread().getId()));
}
}

@Override
public void getAsync(final SingleResultCallback<InternalConnection> callback) {
InternalConnection connection = null;

try {
connection = getPooledConnection(0, MILLISECONDS);
} catch (MongoTimeoutException e) {
// fall through
}

if (connection != null) {
callCallback(connection, callback);
} else if (waitQueueSize.incrementAndGet() > settings.getMaxWaitQueueSize()) {
waitQueueSize.decrementAndGet();
callback.onResult(null, createWaitQueueFullException());
} else {
final long startTimeMillis = System.currentTimeMillis();
connectionPoolListener.waitQueueEntered(new ConnectionPoolWaitQueueEvent(serverId, currentThread().getId()));
getAsyncGetter().submit(new Runnable() {
@Override
public void run() {
try {
if (getRemainingWaitTime() <= 0) {
callCallback(createTimeoutException(), callback);
} else {
callCallback(getPooledConnection(getRemainingWaitTime(), MILLISECONDS), callback);
}
} catch (Throwable t) {
callCallback(t, callback);
} finally {
waitQueueSize.decrementAndGet();
connectionPoolListener.waitQueueExited(new ConnectionPoolWaitQueueEvent(serverId, currentThread().getId()));
}
}

private long getRemainingWaitTime() {
return startTimeMillis + settings.getMaxWaitTime(MILLISECONDS) - System.currentTimeMillis();
}
});
}
}

private synchronized ExecutorService getAsyncGetter() {
if (asyncGetter == null) {
asyncGetter = Executors.newSingleThreadExecutor();
}
return asyncGetter;
}

private synchronized void shutdownAsyncGetter() {
if (asyncGetter != null) {
asyncGetter.shutdownNow();
}
}

private void callCallback(final InternalConnection connection, final SingleResultCallback<InternalConnection> callback) {
try {
callback.onResult(connection, null);
} catch (Exception e) {
// swallow any exception thrown by the callback. there is nothing we can do with it
}
}

private void callCallback(final Throwable throwable, final SingleResultCallback<InternalConnection> callback) {
try {
callback.onResult(null, wrapThrowable(throwable));
} catch (Exception e) {
// swallow any exception thrown by the callback. there is nothing we can do with it
}
}

// TODO: ditch this once callback takes a Throwable
private MongoException wrapThrowable(final Throwable t) {
if (t instanceof MongoException) {
return (MongoException) t;
} else {
return new MongoInternalException("Internal exception", t);
}
}

@Override
public void invalidate() {
generation.incrementAndGet();
Expand All @@ -126,6 +185,7 @@ public void close() {
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
shutdownAsyncGetter();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolEvent(serverId));
}
Expand All @@ -140,6 +200,40 @@ public void doMaintenance() {
}
}

private InternalConnection getPooledConnection(final long timeout, final TimeUnit timeUnit) {
UsageTrackingInternalConnection internalConnection = pool.get(timeout, timeUnit);
while (shouldPrune(internalConnection)) {
pool.release(internalConnection, true);
internalConnection = pool.get(timeout, timeUnit);
}
if (!internalConnection.opened()) {
try {
internalConnection.open();
} catch (Throwable t) {
pool.release(internalConnection, true);
if (t instanceof MongoException) {
throw (MongoException) t;
} else {
throw new MongoInternalException(t.toString(), t);
}
}
}
connectionPoolListener.connectionCheckedOut(new ConnectionEvent(internalConnection.getDescription().getConnectionId()));
LOGGER.trace(format("Checked out connection [%s] to server %s", getId(internalConnection), serverId.getAddress()));
return new PooledConnection(internalConnection);
}

private MongoTimeoutException createTimeoutException() {
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.",
settings.getMaxWaitTime(MILLISECONDS), serverId.getAddress()));
}

private MongoWaitQueueFullException createWaitQueueFullException() {
return new MongoWaitQueueFullException(format("Too many threads are already waiting for a connection. "
+ "Max number of threads (maxWaitQueueSize) of %d has been exceeded.",
settings.getMaxWaitQueueSize()));
}

ConcurrentPool<UsageTrackingInternalConnection> getPool() {
return pool;
}
Expand Down
18 changes: 18 additions & 0 deletions driver-core/src/main/com/mongodb/connection/DefaultServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ public Connection getConnection() {
}
}

@Override
public void getConnectionAsync(final SingleResultCallback<Connection> callback) {
isTrue("open", !isClosed());
connectionPool.getAsync(new SingleResultCallback<InternalConnection>() {
@Override
public void onResult(final InternalConnection result, final MongoException e) {
if (e instanceof MongoSecurityException) {
invalidate();
}
if (e != null) {
callback.onResult(null, e);
} else {
callback.onResult(connectionFactory.create(result, new DefaultServerProtocolExecutor()), null);
}
}
});
}

@Override
public ServerDescription getDescription() {
isTrue("open", !isClosed());
Expand Down
17 changes: 14 additions & 3 deletions driver-core/src/main/com/mongodb/connection/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.connection;

import com.mongodb.annotations.ThreadSafe;
import com.mongodb.async.SingleResultCallback;

/**
* A logical connection to a MongoDB server.
Expand All @@ -34,14 +35,24 @@ public interface Server {
ServerDescription getDescription();

/**
* <p>Gets a connection to this server. The connection should be closed after the caller is done with it.</p>
*
* <p>Gets a connection to this server. The connection should be released after the caller is done with it.</p>
*
* <p> Implementations of this method are allowed to block while waiting for a free connection from a pool of available connection.</p>
*
*
* <p> Implementations of this method will likely pool the underlying connection, so the effect of closing the returned connection will
* be to return the connection to the pool. </p>
*
* @return a connection this server
*/
Connection getConnection();

/**
* <p>Gets a connection to this server asynchronously. The connection should be released after the caller is done with it.</p>
*
* <p> Implementations of this method will likely pool the underlying connection, so the effect of closing the returned connection will
* be to return the connection to the pool. </p>
*
* @param callback the callback to execute when the connection is available or an error occurs
*/
void getConnectionAsync(SingleResultCallback<Connection> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
package com.mongodb.connection

import category.Slow
import com.mongodb.MongoException
import com.mongodb.MongoSocketWriteException
import com.mongodb.MongoTimeoutException
import com.mongodb.MongoWaitQueueFullException
import com.mongodb.ServerAddress
import com.mongodb.event.ConnectionPoolListener
import org.bson.ByteBuf
import org.junit.experimental.categories.Category
import spock.lang.Specification
import spock.lang.Subject

import java.util.concurrent.CountDownLatch

import static java.util.concurrent.TimeUnit.MILLISECONDS
import static java.util.concurrent.TimeUnit.MINUTES

Expand Down Expand Up @@ -302,4 +306,101 @@ class DefaultPooledConnectionProviderSpecification extends Specification {
0 * listener.connectionCheckedIn { it.connectionId.serverId == SERVER_ID && it.clusterId == SERVER_ID.clusterId }
0 * listener.connectionRemoved { it.connectionId.serverId == SERVER_ID && it.clusterId == SERVER_ID.clusterId }
}

def 'should select connection asynchronously if one is immediately available'() {
given:
provider = new DefaultConnectionPool(SERVER_ID,
connectionFactory,
ConnectionPoolSettings.builder().maxSize(1).maxWaitQueueSize(1).build(),
new NoOpConnectionPoolListener())

expect:
selectConnectionAsyncAndGet(provider)
}

def 'should select connection asynchronously if one is not immediately available'() {
given:
provider = new DefaultConnectionPool(SERVER_ID,
connectionFactory,
ConnectionPoolSettings.builder().maxSize(1).maxWaitQueueSize(1).build(),
new NoOpConnectionPoolListener())

when:
def connection = provider.get()
def connectionLatch = selectConnectionAsync(provider)
connection.close()

then:
connectionLatch.get()
}

def 'when getting a connection asynchronously should send MongoTimeoutException to callback after timeout period'() {
given:
provider = new DefaultConnectionPool(SERVER_ID,
connectionFactory,
ConnectionPoolSettings.builder().maxSize(1).maxWaitQueueSize(2).maxWaitTime(5, MILLISECONDS)
.build(),
new NoOpConnectionPoolListener())

provider.get()
def firstConnectionLatch = selectConnectionAsync(provider)
def secondConnectionLatch = selectConnectionAsync(provider)

when:
firstConnectionLatch.get()

then:
thrown(MongoTimeoutException)

when:
secondConnectionLatch.get()

then:
thrown(MongoTimeoutException)
}

def 'when getting a connection asynchronously should send MongoWaitQueueFullException to callback if there are too many waiters'() {
given:
provider = new DefaultConnectionPool(SERVER_ID,
connectionFactory,
ConnectionPoolSettings.builder().maxSize(1).maxWaitQueueSize(1).build(),
new NoOpConnectionPoolListener())

when:
provider.get()
selectConnectionAsync(provider)
selectConnectionAsyncAndGet(provider)

then:
thrown(MongoWaitQueueFullException)
}

def selectConnectionAsyncAndGet(DefaultConnectionPool pool) {
selectConnectionAsync(pool).get()
}

def selectConnectionAsync(DefaultConnectionPool pool) {
def serverLatch = new ConnectionLatch()
pool.getAsync { InternalConnection result, MongoException e ->
serverLatch.connection = result
serverLatch.throwable = e
serverLatch.latch.countDown()
}
serverLatch
}

class ConnectionLatch {
CountDownLatch latch = new CountDownLatch(1)
InternalConnection connection
Throwable throwable

def get() {
latch.await()
if (throwable != null) {
throw throwable
}
connection
}
}

}
Loading

0 comments on commit 3b544aa

Please sign in to comment.