Skip to content

Commit

Permalink
Merge pull request #8213 from pontusmelke/3.1-delete-user-stresstest
Browse files Browse the repository at this point in the history
Don't close transaction from separate thread
  • Loading branch information
henriknyman committed Oct 21, 2016
2 parents 602a67a + 6f86762 commit 8273ee7
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 16 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.bolt.security.auth.AuthenticationException;
Expand Down Expand Up @@ -84,7 +85,11 @@ public StatementProcessor statementProcessor()

private void before( BoltResponseHandler handler ) throws BoltConnectionFatality
{
if ( ctx.interruptCounter.get() > 0 )
if ( ctx.isTerminated.get() )
{
close();
}
else if ( ctx.interruptCounter.get() > 0 )
{
state = state.interrupt( this );
}
Expand Down Expand Up @@ -274,6 +279,7 @@ public void close()
}
finally
{
spi.onTerminate( this );
ctx.closed = true;
//However a new transaction may have been created
//so we must always to reset
Expand All @@ -290,14 +296,20 @@ public String owner()
@Override
public void terminate()
{
close();
/*
* This is a side-channel call and we should not close anything directly.
* Just mark the transaction and set isTerminated to true and then the session
* thread will close down the connection eventually.
*/
ctx.isTerminated.set( true );
ctx.statementProcessor.markCurrentTransactionForTermination();
spi.onTerminate( this );
}

@Override
public boolean hasTerminated()
public boolean willTerminate()
{
return isClosed();
return ctx.isTerminated.get();
}

public enum State
Expand Down Expand Up @@ -675,6 +687,8 @@ static class MutableConnectionState implements BoltResponseHandler
*/
final AtomicInteger interruptCounter = new AtomicInteger();

final AtomicBoolean isTerminated = new AtomicBoolean( false );

StatementProcessor statementProcessor = NULL_STATEMENT_PROCESSOR;

String owner = null;
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.junit.Test;

import java.time.Clock;
import java.util.Collections;

import org.neo4j.bolt.testing.BoltResponseRecorder;
Expand All @@ -38,6 +39,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -71,10 +73,12 @@ public void allStateTransitionsShouldSendExactlyOneResponseToTheClient() throws
{
for ( BoltStateMachine.State initialState : BoltStateMachine.State.values() )
{
verifyOneResponse( initialState, ( machine, recorder ) -> machine.init( USER_AGENT, emptyMap(), recorder ) );
verifyOneResponse( initialState,
( machine, recorder ) -> machine.init( USER_AGENT, emptyMap(), recorder ) );
verifyOneResponse( initialState, BoltStateMachine::ackFailure );
verifyOneResponse( initialState, BoltStateMachine::reset );
verifyOneResponse( initialState, ( machine, recorder ) -> machine.run( "statement", emptyMap(), recorder ) );
verifyOneResponse( initialState,
( machine, recorder ) -> machine.run( "statement", emptyMap(), recorder ) );
verifyOneResponse( initialState, BoltStateMachine::discardAll );
verifyOneResponse( initialState, BoltStateMachine::pullAll );
}
Expand Down Expand Up @@ -437,7 +441,8 @@ public void shouldTerminateOnAuthExpiryDuringSTREAMING() throws Throwable
{
// Given
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );
doThrow( new AuthExpirationException( "Auth expired!" ) ).when( responseHandler ).onRecords( any(), anyBoolean() );
doThrow( new AuthExpirationException( "Auth expired!" ) ).when( responseHandler )
.onRecords( any(), anyBoolean() );
BoltStateMachine machine = newMachine( STREAMING );
// We assume the only implementation of statement processor is TransactionStateMachine
((TransactionStateMachine) machine.statementProcessor()).ctx.currentResult = BoltResult.EMPTY;
Expand All @@ -460,15 +465,29 @@ public void callResetEvenThoughAlreadyClosed() throws Throwable
// When we close
TransactionStateMachine statementProcessor = (TransactionStateMachine) machine.statementProcessor();
machine.close();
assertThat(statementProcessor.ctx.currentTransaction, nullValue());
assertTrue(machine.ctx.closed);
assertThat( statementProcessor.ctx.currentTransaction, nullValue() );
assertTrue( machine.ctx.closed );

//But someone runs a query and thus opens a new transaction
statementProcessor.run( "RETURN 1", Collections.emptyMap() );
assertThat(statementProcessor.ctx.currentTransaction, notNullValue());
assertThat( statementProcessor.ctx.currentTransaction, notNullValue() );

// Then, when we close again we should make sure the transaction is closed againg
machine.close();
assertThat(statementProcessor.ctx.currentTransaction, nullValue());
assertThat( statementProcessor.ctx.currentTransaction, nullValue() );
}

@Test
public void shouldCallOnTerminateWhenClosing() throws Throwable
{
// Given
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class, RETURNS_MOCKS );
final BoltStateMachine machine = new BoltStateMachine( spi, null, Clock.systemUTC() );

// When
machine.close();

// Then
verify( spi ).onTerminate( machine );
}
}
Expand Up @@ -25,5 +25,5 @@ public interface ManagedBoltStateMachine

void terminate();

boolean hasTerminated();
boolean willTerminate();
}
Expand Up @@ -145,7 +145,7 @@ public Stream<ConnectionResult> listConnections()
boltConnectionTracker
.getActiveConnections()
.stream()
.filter( session -> !session.hasTerminated() )
.filter( session -> !session.willTerminate() )
.map( ManagedBoltStateMachine::owner )
);
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.server.security.enterprise.auth;

import org.apache.commons.lang.StringUtils;
import org.apache.directory.api.util.Strings;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -65,7 +64,6 @@
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -472,7 +470,7 @@ Map<String,Long> countBoltConnectionsByUsername()
boltConnectionTracker
.getActiveConnections()
.stream()
.filter( session -> !session.hasTerminated() )
.filter( session -> !session.willTerminate() )
.map( ManagedBoltStateMachine::owner )
).collect( Collectors.toMap( r -> r.username, r -> r.connectionCount ) );
}
Expand Down
142 changes: 142 additions & 0 deletions integrationtests/src/test/java/org/neo4j/bolt/DeleteUserStressIT.java
@@ -0,0 +1,142 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.File;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.harness.junit.Neo4jRule;
import org.neo4j.io.fs.FileUtils;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.neo4j.driver.v1.AuthTokens.basic;

public class DeleteUserStressIT
{
@Rule
public Neo4jRule db = new Neo4jRule().withConfig( GraphDatabaseSettings.auth_enabled, "true" );
private Driver adminDriver;
private final Set<Throwable> errors = ConcurrentHashMap.newKeySet();

@Before
public void setup() throws Exception
{
File knownHosts = new File( System.getProperty( "user.home" ) + "/.neo4j/known_hosts" );
FileUtils.deleteFile( knownHosts );
adminDriver = GraphDatabase.driver( db.boltURI(), basic( "neo4j", "neo4j" ) );
}

@Test
public void shouldRun() throws InterruptedException
{
ExecutorService service = Executors.newFixedThreadPool( 3 );
service.submit( createUserWork );
service.submit( deleteUserWork );
service.submit( transactionWork );

service.awaitTermination( 30, TimeUnit.SECONDS );

String msg = String.join( System.lineSeparator(),
errors.stream().map( Throwable::getMessage ).collect( Collectors.toList() ) );
assertThat( msg, errors, empty());
}

@SuppressWarnings( "InfiniteLoopStatement" )
private final Runnable transactionWork = () -> {

for (; ; )
{
try ( Driver driver = GraphDatabase.driver( db.boltURI(), basic( "pontus", "sutnop" ) ) )
{

try ( Session session = driver.session();
Transaction tx = session.beginTransaction() )
{
tx.run( "UNWIND range(1, 100000) AS n RETURN n" ).consume();
tx.success();
}
}
catch ( ClientException e )
{
if ( !e.getMessage().equals( "The client is unauthorized due to authentication failure." ) )
{
errors.add( e );
}
}
}
};

@SuppressWarnings( "InfiniteLoopStatement" )
private final Runnable deleteUserWork = () -> {

for (; ; )
{
try ( Session session = adminDriver.session();
Transaction tx = session.beginTransaction() )
{
tx.run( "CALL dbms.security.deleteUser('pontus')" ).consume();
tx.success();
}
catch ( ClientException e )
{
if ( !e.getMessage().equals( "User 'pontus' does not exist." ) )
{
errors.add( e );
}
}
}
};

@SuppressWarnings( "InfiniteLoopStatement" )
private final Runnable createUserWork = () -> {
for (; ; )
{
try ( Session session = adminDriver.session();
Transaction tx = session.beginTransaction() )
{
tx.run( "CALL dbms.security.createUser('pontus', 'sutnop', false)" ).consume();
tx.success();
}
catch ( ClientException e )
{
if ( !e.getMessage().equals( "The specified user 'pontus' already exists." ) )
{
errors.add( e );
}
}
}
};
}

0 comments on commit 8273ee7

Please sign in to comment.