From 9b5c42ac3ee2d72d0444c0ad2534c44da657050a Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 23 Nov 2017 15:45:43 +0100 Subject: [PATCH] Simplified state in connection queue Commit replaces two state variables with a single one and adds couple unit tests for state transitions. --- .../BlockingPooledConnectionQueue.java | 26 ++--- .../BlockingPooledConnectionQueueTest.java | 96 ++++++++++++++++++- 2 files changed, 110 insertions(+), 12 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index f6f8e09bdf..8fe5d4014e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.logging.DelegatingLogger; import org.neo4j.driver.internal.net.BoltServerAddress; @@ -40,12 +40,15 @@ public class BlockingPooledConnectionQueue { public static final String LOG_NAME = "ConnectionQueue"; + private static final int ACTIVE = 1; + private static final int INACTIVE = 2; + private static final int TERMINATED = 3; + /** The backing queue, keeps track of connections currently in queue */ private final BlockingQueue queue; private final Logger logger; - private final AtomicBoolean isDeactivated = new AtomicBoolean( false ); - private final AtomicBoolean isTerminating = new AtomicBoolean( false ); + private final AtomicInteger state = new AtomicInteger( ACTIVE ); /** Keeps track of acquired connections */ private final Set acquiredConnections = @@ -72,7 +75,7 @@ public boolean offer( PooledConnection pooledConnection ) { disposeSafely( pooledConnection ); } - if ( isDeactivated.get() || isTerminating.get() ) + if ( state.get() != ACTIVE ) { terminateIdleConnections(); } @@ -93,12 +96,13 @@ public PooledConnection acquire( Supplier supplier ) } acquiredConnections.add( connection ); - if ( isDeactivated.get() || isTerminating.get() ) + int poolState = state.get(); + if ( poolState != ACTIVE ) { acquiredConnections.remove( connection ); disposeSafely( connection ); - throw new IllegalStateException( "Pool is " + (isDeactivated.get() ? "deactivated" : "terminated") + ", " + - "new connections can't be acquired" ); + throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") + + ", new connections can't be acquired" ); } else { @@ -129,12 +133,12 @@ public boolean contains( PooledConnection pooledConnection ) public void activate() { - isDeactivated.compareAndSet( true, false ); + state.compareAndSet( INACTIVE, ACTIVE ); } public void deactivate() { - if ( isDeactivated.compareAndSet( false, true ) ) + if ( state.compareAndSet( ACTIVE, INACTIVE ) ) { terminateIdleConnections(); } @@ -142,7 +146,7 @@ public void deactivate() public boolean isActive() { - return !isDeactivated.get(); + return state.get() == ACTIVE; } /** @@ -153,7 +157,7 @@ public boolean isActive() */ public void terminate() { - if ( isTerminating.compareAndSet( false, true ) ) + if ( state.getAndSet( TERMINATED ) != TERMINATED ) { terminateIdleConnections(); terminateAcquiredConnections(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 45675750c3..2d94b6b25d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -333,11 +334,104 @@ public void shouldTerminateOfferedConnectionWhenDeactivated() } @Test - public void shouldReportWhenActive() + public void shouldBeActiveWhenNotDeactivatedAndNotTerminated() { BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); assertTrue( queue.isActive() ); + } + + @Test + public void shouldNotBeActiveWhenDeactivated() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); + assertTrue( queue.isActive() ); + queue.deactivate(); + assertFalse( queue.isActive() ); + } + + @Test + public void shouldNotBeActiveWhenTerminated() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); + assertTrue( queue.isActive() ); + queue.terminate(); + assertFalse( queue.isActive() ); + } + + @Test + public void shouldBeActiveAfterDeactivationAndActivation() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); + assertTrue( queue.isActive() ); + queue.deactivate(); + assertFalse( queue.isActive() ); + queue.activate(); + assertTrue( queue.isActive() ); + } + + @Test + public void shouldNotBeActiveAfterTerminationAndActivation() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); + assertTrue( queue.isActive() ); + queue.terminate(); + assertFalse( queue.isActive() ); + queue.activate(); + assertFalse( queue.isActive() ); + } + + @Test + public void shouldBePossibleToAcquireFromActivatedQueue() + { + Supplier connectionSupplier = connectionSupplierMock(); + when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); + BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); queue.deactivate(); + + try + { + queue.acquire( connectionSupplier ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) ); + } + + queue.activate(); + + assertNotNull( queue.acquire( connectionSupplier ) ); + } + + @Test + public void shouldNotBePossibleToActivateTerminatedQueue() + { + Supplier connectionSupplier = connectionSupplierMock(); + when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); + BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); + queue.terminate(); + + try + { + queue.acquire( connectionSupplier ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); + } + + queue.activate(); + + try + { + queue.acquire( connectionSupplier ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); + } assertFalse( queue.isActive() ); }