Skip to content

Commit

Permalink
Make sure connection is only called once.
Browse files Browse the repository at this point in the history
Instead of guarding in `PooledConnection` as well as in `NetworkSession` we
make sure we only close connection once in `RoutingDriver`
  • Loading branch information
pontusmelke committed Oct 6, 2016
1 parent 5540e80 commit 3004db0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public BoltServerAddress apply( Value value )
//must be called from a synchronized method
private boolean call( BoltServerAddress address, String procedureName, Consumer<Record> recorder )
{
Connection acquire = null;
Connection acquire;
Session session = null;
try
{
Expand Down Expand Up @@ -271,11 +271,6 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
{
session.close();
}
if ( acquire != null )
{
acquire.close();
}

}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.neo4j.driver.internal.net.pooling;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Collector;
Expand Down Expand Up @@ -59,7 +58,6 @@ public class PooledConnection implements Connection
private Runnable onError = null;
private final Clock clock;
private long lastUsed;
private final AtomicBoolean released = new AtomicBoolean( false );

public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
{
Expand All @@ -69,9 +67,8 @@ public PooledConnection( Connection delegate, Consumer<PooledConnection> release
this.lastUsed = clock.millis();
}

public void setInUse()
public void updateTimestamp()
{
released.set(false);
lastUsed = clock.millis();
}

Expand Down Expand Up @@ -200,10 +197,7 @@ public void receiveOne()
*/
public void close()
{
if ( released.compareAndSet( false, true ))
{
release.accept( this );
}
release.accept( this );
// put the full logic of deciding whether to dispose the connection or to put it back to
// the pool into the release object
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -39,6 +41,8 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.util.Collections.emptyList;

/**
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
* try to return the session into the session pool, however if we failed to return it back, either because the pool
Expand Down Expand Up @@ -115,7 +119,7 @@ public Connection acquire( BoltServerAddress address )
conn = new PooledConnection( connect( address ), new
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
}
conn.setInUse();
conn.updateTimestamp();
return conn;
}

Expand Down Expand Up @@ -184,4 +188,19 @@ public void close()
pools.clear();
}

//for testing
public List<PooledConnection> connectionsForAddress(BoltServerAddress address)
{
LinkedBlockingQueue<PooledConnection> pooledConnections =
(LinkedBlockingQueue<PooledConnection>) pools.get( address );
if (pooledConnections == null)
{
return emptyList();
}
else
{
return new ArrayList<>( pooledConnections );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import org.neo4j.driver.internal.logging.ConsoleLogging;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.pooling.PooledConnection;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Config;
Expand Down Expand Up @@ -86,6 +88,26 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldOnlyPutConnectionInPoolOnce() throws IOException, InterruptedException, StubServer.ForceKilled
{
// Given
StubServer server = StubServer.start( "discover_servers.script", 9001 );
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );

// When
try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) )
{
// Then
SocketConnectionPool pool = (SocketConnectionPool) driver.connectionPool();
List<PooledConnection> pooledConnections = pool.connectionsForAddress( address( 9001 ) );
assertThat(pooledConnections, hasSize( 1 ));
}

// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.BiFunction;
import org.neo4j.driver.v1.util.Function;

import static java.util.Arrays.asList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,39 +120,6 @@ public void dispose()
assertThat( flags[0], equalTo( false ) );
}

@Test
public void shouldOnlyReturnOnceEventhougCloseIsBeingCalledMultipleTimes() throws Throwable
{
// Given
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(2);

final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( false ), VALID_CONNECTION );

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
{
@Override
public void dispose()
{
flags[0] = true;
}
};

// When
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();

// Then
assertThat( pool, hasItem(pooledConnection) );
assertThat( pool.size(), equalTo( 1 ) );
assertThat( flags[0], equalTo( false ) );
}

@Test
public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable
Expand Down

0 comments on commit 3004db0

Please sign in to comment.