Skip to content

Commit

Permalink
Couple of test, message & logging improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 2, 2018
1 parent 405e81b commit f9b231a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 140 deletions.
Expand Up @@ -217,17 +217,22 @@ private boolean processNextBatch( int batchCount )
@Override
public void handleSchedulingError( Throwable t )
{
String message;
Neo4jError error;
if ( ExceptionUtils.hasCause( t, RejectedExecutionException.class ) )
{
error = Neo4jError.from( Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description() );
message = String.format( "Unable to schedule bolt session '%s' for execution since there are no available threads to " +
"serve it at the moment. You can retry at a later time or consider increasing max pool / queue size for bolt connector(s).", id() );
}
else
{
error = Neo4jError.fatalFrom( t );
message = String.format( "Unexpected error during scheduling of bolt session '%s'.", id() );
}

userLog.error( String.format( "Unexpected error during scheduling of bolt session '%s'.", id() ), t );
log.error( message, t );
userLog.error( message );
machine.markFailed( error );
processNextBatch( 1 );
}
Expand Down
Expand Up @@ -20,15 +20,13 @@
package org.neo4j.bolt.transport;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionFactory;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionFactory;
import org.neo4j.bolt.runtime.BoltConnectionReadLimiter;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

public class DefaultBoltProtocolHandlerFactory implements BoltProtocolHandlerFactory
{
Expand Down
Expand Up @@ -19,9 +19,6 @@
*/
package org.neo4j.bolt.runtime.integration;

import org.apache.commons.text.CharacterPredicates;
import org.apache.commons.text.RandomStringGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -30,32 +27,27 @@
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.neo4j.bolt.AbstractBoltTransportsTest;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket;
import org.neo4j.bolt.v1.transport.integration.TransportTestUtil;
import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SecureWebSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.bolt.v1.transport.socket.client.WebSocketConnection;
import org.neo4j.cypher.result.QueryResult;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.hamcrest.CoreMatchers.any;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -66,45 +58,28 @@
import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgRecord;
import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess;
import static org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket.DEFAULT_CONNECTOR_KEY;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.chunk;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives;

@RunWith( Parameterized.class )
public class BoltSchedulerIT
public class BoltSchedulerIT extends AbstractBoltTransportsTest
{
private final int numberOfWriters = 20;
private final int numberOfReaders = 50;
private final int numberOfIterations = 10;
private AssertableLogProvider logProvider;
private final int numberOfIterations = 5;

private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
private Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(), fsRule::get, getSettingsFunction() );
private Random rndSleep = new Random();

@Rule
public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server );

@Parameterized.Parameter
public Supplier<TransportConnection> connectionCreator;

private HostnamePort address;
private AtomicInteger idCounter = new AtomicInteger();

@Parameterized.Parameters
public static Collection<Supplier<TransportConnection>> transports()
{
return asList( () -> new SecureSocketConnection(), () -> new SocketConnection(), () -> new SecureWebSocketConnection(),
() -> new WebSocketConnection() );
}

protected TestGraphDatabaseFactory getTestGraphDatabaseFactory()
{
TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory();

logProvider = new AssertableLogProvider();

factory.setInternalLogProvider( logProvider );

return factory;
return new TestGraphDatabaseFactory();
}

protected Consumer<Map<String,String>> getSettingsFunction()
Expand All @@ -126,12 +101,6 @@ public void setup() throws Exception
address = server.lookupDefaultConnector();
}

@After
public void after() throws Exception
{

}

@Test
public void readerAndWritersShouldWork() throws Exception
{
Expand All @@ -141,12 +110,12 @@ public void readerAndWritersShouldWork() throws Exception
{
for ( int i = 0; i < numberOfReaders; i++ )
{
readers.add( performHandshake( connectionCreator.get() ) );
readers.add( performHandshake( connectionClass.newInstance() ) );
}

for ( int i = 0; i < numberOfWriters; i++ )
{
writers.add( performHandshake( connectionCreator.get() ) );
writers.add( performHandshake( connectionClass.newInstance() ) );
}

List<CompletableFuture<Void>> allRequests = new ArrayList<>();
Expand Down Expand Up @@ -180,11 +149,11 @@ public void readerAndWritersShouldWork() throws Exception

private TransportConnection performHandshake( TransportConnection connection ) throws Exception
{
connection.connect( address ).send( TransportTestUtil.acceptedVersions( 1, 0, 0, 0 ) ).send(
TransportTestUtil.chunk( init( "TestClient/1.1", emptyMap() ) ) );
connection.connect( address ).send( util.acceptedVersions( 1, 0, 0, 0 ) ).send(
util.chunk( init( "TestClient/1.1", emptyMap() ) ) );

assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

return connection;
}
Expand All @@ -198,25 +167,25 @@ private void performWriteQueries( TransportConnection connection, int numberOfIt
int id = idCounter.incrementAndGet();
String label = "LABEL";

connection.send( chunk( run( "BEGIN" ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( run( "BEGIN" ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

connection.send( chunk( run( String.format( "CREATE (n:%s { id: %d })", label, id ) ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( run( String.format( "CREATE (n:%s { id: %d })", label, id ) ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

connection.send( chunk( run( "COMMIT" ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( run( "COMMIT" ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

Thread.sleep( rndSleep.nextInt( 1000 ) );

connection.send( chunk( reset() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( reset() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}
}
catch ( Exception ex )
Expand All @@ -233,25 +202,25 @@ private void performReadQueries( TransportConnection connection, int numberOfIte
{
String label = "LABEL";

connection.send( chunk( run( "BEGIN" ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( run( "BEGIN" ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

connection.send( chunk( run( String.format( "MATCH (n:%s) RETURN COUNT(n)", label ) ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgRecord( any( QueryResult.Record.class ) ), msgSuccess() ) );
connection.send( util.chunk( run( String.format( "MATCH (n:%s) RETURN COUNT(n)", label ) ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgRecord( any( QueryResult.Record.class ) ), msgSuccess() ) );

connection.send( chunk( run( "COMMIT" ) ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( chunk( pullAll() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( run( "COMMIT" ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( pullAll() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

Thread.sleep( rndSleep.nextInt( 1000 ) );

connection.send( chunk( reset() ) );
assertThat( connection, eventuallyReceives( msgSuccess() ) );
connection.send( util.chunk( reset() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

Thread.sleep( rndSleep.nextInt( 1000 ) );
}
Expand Down

0 comments on commit f9b231a

Please sign in to comment.