Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to driver #244

Merged
merged 4 commits into from
Oct 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
import org.neo4j.driver.v1.types.TypeSystem;

import static org.neo4j.driver.v1.Values.value;
Expand Down Expand Up @@ -296,9 +297,9 @@ private void ensureConnectionIsOpen()
{
if ( !connection.isOpen() )
{
throw new ClientException( "The current session cannot be reused as the underlying connection with the " +
"server has been closed due to unrecoverable errors. " +
"Please close this session and retry your statement in another new session." );
throw new ConnectionFailureException( "The current session cannot be reused as the underlying connection with the " +
"server has been closed due to unrecoverable errors. " +
"Please close this session and retry your statement in another new session." );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public BoltServerAddress apply( Value value )
//must be called from a synchronized method
private boolean call( BoltServerAddress address, String procedureName, Consumer<Record> recorder )
{
Connection acquire = null;
Connection acquire;
Session session = null;
try
{
Expand Down Expand Up @@ -271,11 +271,6 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
{
session.close();
}
if ( acquire != null )
{
acquire.close();
}

}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public PooledConnection( Connection delegate, Consumer<PooledConnection> release
this.lastUsed = clock.millis();
}

public void updateUsageTimestamp()
public void updateTimestamp()
{
lastUsed = clock.millis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -39,6 +41,8 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.util.Collections.emptyList;

/**
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
* try to return the session into the session pool, however if we failed to return it back, either because the pool
Expand Down Expand Up @@ -115,7 +119,7 @@ public Connection acquire( BoltServerAddress address )
conn = new PooledConnection( connect( address ), new
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
}
conn.updateUsageTimestamp();
conn.updateTimestamp();
return conn;
}

Expand Down Expand Up @@ -184,4 +188,19 @@ public void close()
pools.clear();
}

//for testing
public List<PooledConnection> connectionsForAddress(BoltServerAddress address)
{
LinkedBlockingQueue<PooledConnection> pooledConnections =
(LinkedBlockingQueue<PooledConnection>) pools.get( address );
if (pooledConnections == null)
{
return emptyList();
}
else
{
return new ArrayList<>( pooledConnections );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ public void doneFailure( Neo4jException error )
throw new ClientException(
"Invalid server response message `FAILURE` received for client message `ACK_FAILURE`.", error );
}

@Override
public void doneIgnored()
{
throw new ClientException(
"Invalid server response message `IGNORED` received for client message `ACK_FAILURE`." );
}
};

class InitCollector extends NoOperationCollector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;

import static junit.framework.Assert.fail;
import static junit.framework.TestCase.assertNotNull;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void shouldNotAllowMoreStatementsInSessionWhileConnectionClosed() throws
when( mock.isOpen() ).thenReturn( false );

// Expect
exception.expect( ClientException.class );
exception.expect( ConnectionFailureException.class );

// When
sess.run( "whatever" );
Expand All @@ -134,7 +135,7 @@ public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throw
when( mock.isOpen() ).thenReturn( false );

// Expect
exception.expect( ClientException.class );
exception.expect( ConnectionFailureException.class );

// When
sess.beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import org.neo4j.driver.internal.logging.ConsoleLogging;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.pooling.PooledConnection;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Config;
Expand Down Expand Up @@ -86,6 +88,26 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldOnlyPutConnectionInPoolOnce() throws IOException, InterruptedException, StubServer.ForceKilled
{
// Given
StubServer server = StubServer.start( "discover_servers.script", 9001 );
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );

// When
try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) )
{
// Then
SocketConnectionPool pool = (SocketConnectionPool) driver.connectionPool();
List<PooledConnection> pooledConnections = pool.connectionsForAddress( address( 9001 ) );
assertThat(pooledConnections, hasSize( 1 ));
}

// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.BiFunction;
import org.neo4j.driver.v1.util.Function;

import static java.util.Arrays.asList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void dispose()
assertThat( flags[0], equalTo( false ) );
}


@Test
public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable
{
Expand Down