Skip to content

Commit

Permalink
Enhance set of checks in kernel transactions for starting new transac…
Browse files Browse the repository at this point in the history
…tion.

Enhance range of checks in kernel transaction to cover scenario when
availability guard is non available; kernel transactions component is stopped.
This changes will allow us to avoid blocking of threads that try to start transaction during:
- store copy (availability guard raised to non available);
- different sorts of instance restarts when new kernel transactions component created.
  • Loading branch information
MishaDemianenko committed Mar 8, 2017
1 parent aae7470 commit 5e0433f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 6 deletions.
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

class DatabaseUnavailableException extends RuntimeException
{
DatabaseUnavailableException()
{
super( "This database is unavailable." );
}

}
Expand Up @@ -103,6 +103,13 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier<Ker
// Pool of unused transactions.
private final MarshlandPool<KernelTransactionImplementation> localTxPool = new MarshlandPool<>( globalTxPool );

/**
* Kernel transactions component status. True when stopped, false when started.
* Will not allow to start new transaction by stopped instance of kernel transactions.
* Should simplify tracking of stopped component usage by up the stack components.
*/
private volatile boolean stopped = true;

public KernelTransactions( StatementLocksFactory statementLocksFactory,
ConstraintIndexCreator constraintIndexCreator,
StatementOperationContainer statementOperationContainer,
Expand Down Expand Up @@ -148,11 +155,11 @@ public KernelTransaction newInstance( KernelTransaction.Type type, SecurityConte
{
while ( !newTransactionsLock.readLock().tryLock( 1, TimeUnit.SECONDS ) )
{
assertDatabaseIsRunning();
assertRunning();
}
try
{
assertDatabaseIsRunning();
assertRunning();
TransactionId lastCommittedTransaction = transactionIdStore.getLastCommittedTransaction();
KernelTransactionImplementation tx = localTxPool.acquire();
StatementLocks statementLocks = statementLocksFactory.newInstance();
Expand Down Expand Up @@ -219,13 +226,15 @@ public boolean haveClosingTransaction()
@Override
public void start() throws Throwable
{
stopped = false;
unblockNewTransactions();
}

@Override
public void stop() throws Throwable
{
blockNewTransactions();
stopped = true;
}

@Override
Expand Down Expand Up @@ -290,12 +299,20 @@ Set<KernelTransactionImplementation> getAllTransactions()
return allTransactions;
}

private void assertDatabaseIsRunning()
private void assertRunning()
{
if ( availabilityGuard.isShutdown() )
{
throw new DatabaseShutdownException();
}
if ( !availabilityGuard.isAvailable() )
{
throw new DatabaseUnavailableException();
}
if ( stopped )
{
throw new IllegalStateException( "Can't start new transaction with stopped " + getClass() );
}
}

private void assertCurrentThreadIsNotBlockingNewTransactions()
Expand Down
Expand Up @@ -69,6 +69,7 @@
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.storageengine.api.lock.ResourceLocker;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.Race;
import org.neo4j.test.rule.concurrent.OtherThreadRule;

Expand Down Expand Up @@ -413,12 +414,62 @@ public void exceptionWhenStartingNewTransactionOnShutdownInstance() throws Throw
SecurityContext securityContext = mock( SecurityContext.class );

availabilityGuard.shutdown();
Executors.newSingleThreadExecutor().submit( () -> stopKernelTransactions( kernelTransactions ) ).get();
t2.execute( (OtherThreadExecutor.WorkerCommand<Void,Void>) state ->
{
stopKernelTransactions( kernelTransactions );
return null;
} );

expectedException.expect( DatabaseShutdownException.class );
kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L );
}

@Test
public void exceptionWhenStartingNewTransactionOnNonAvailableInstance() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
SecurityContext securityContext = mock( SecurityContext.class );

availabilityGuard.require( AvailabilityGuard.availabilityRequirement( "Perform store copy." ) );

t2.execute( (OtherThreadExecutor.WorkerCommand<Void,Void>) state ->
{
stopKernelTransactions( kernelTransactions );
return null;
} );

expectedException.expect( DatabaseUnavailableException.class );
kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L );
}

@Test
public void exceptionWhenStartingNewTransactionOnStoppedKernelTransactions() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
SecurityContext securityContext = mock( SecurityContext.class );

t2.execute( (OtherThreadExecutor.WorkerCommand<Void,Void>) state ->
{
stopKernelTransactions( kernelTransactions );
return null;
} );

expectedException.expect( IllegalStateException.class );
kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L );
}

@Test
public void startNewTransactionOnRestartedKErnelTransactions() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
SecurityContext securityContext = mock( SecurityContext.class );

kernelTransactions.stop();
kernelTransactions.start();
assertNotNull( "New transaction created by restarted kernel transactions component.",
kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L ) );
}

private void stopKernelTransactions( KernelTransactions kernelTransactions )
{
try
Expand Down
Expand Up @@ -73,11 +73,11 @@
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.time.SystemNanoClock;

import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import static org.neo4j.helpers.Exceptions.launderedException;

public class NeoStoreDataSourceRule extends ExternalResource
Expand Down Expand Up @@ -126,6 +126,7 @@ public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs
Monitors monitors = new Monitors();
LabelScanStoreProvider labelScanStoreProvider =
nativeLabelScanStoreProvider( storeDir, fs, pageCache, config, logService );
SystemNanoClock clock = Clocks.nanoClock();
dataSource = new NeoStoreDataSource( storeDir, config, idGeneratorFactory, IdReuseEligibility.ALWAYS,
idConfigurationProvider,
logService, mock( JobScheduler.class, RETURNS_MOCKS ), mock( TokenNameLookup.class ),
Expand All @@ -140,7 +141,7 @@ fs, mock( TransactionMonitor.class ), databaseHealth,
new Tracers( "null", NullLog.getInstance(), monitors, jobScheduler ),
mock( Procedures.class ),
IOLimiter.unlimited(),
mock( AvailabilityGuard.class ), Clocks.nanoClock(),
new AvailabilityGuard( clock, NullLog.getInstance() ), clock,
new CanWrite(), new StoreCopyCheckPointMutex() );

return dataSource;
Expand Down

0 comments on commit 5e0433f

Please sign in to comment.