Skip to content

Commit

Permalink
Dynamically select operation parts: guarded or non guarded chain base…
Browse files Browse the repository at this point in the history
…d on transaction timeout value.
  • Loading branch information
MishaDemianenko committed Sep 7, 2016
1 parent 9515758 commit 8f8b593
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 60 deletions.
Expand Up @@ -59,8 +59,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
val kernelTransaction = mock[KernelTransactionImplementation]
when(kernelTransaction.mode()).thenReturn(AccessMode.Static.FULL)
val storeStatement = mock[StorageStatement]
val operations = mock[StatementOperationParts](RETURNS_DEEP_STUBS)
statement = new KernelStatement(kernelTransaction, null, operations, storeStatement, new Procedures())
statement = new KernelStatement(kernelTransaction, null, storeStatement, new Procedures())
statement.acquire()
}

Expand Down
Expand Up @@ -63,6 +63,7 @@
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.StackingQueryRegistrationOperations;
import org.neo4j.kernel.impl.api.StateHandlingStatementOperations;
import org.neo4j.kernel.impl.api.StatementOperationContainer;
import org.neo4j.kernel.impl.api.StatementOperationParts;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHooks;
Expand Down Expand Up @@ -163,7 +164,6 @@
import org.neo4j.logging.Logger;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.time.Clocks;

import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;

Expand Down Expand Up @@ -788,13 +788,13 @@ private KernelModule buildKernel( TransactionAppender appender,
LegacyIndexStore legacyIndexStore = new LegacyIndexStore( config,
indexConfigStore, kernelProvider, legacyIndexProviderLookup );

StatementOperationParts statementOperations = dependencies.satisfyDependency( buildStatementOperations(
storeLayer, autoIndexing, constraintIndexCreator, updateableSchemaState, guard,
legacyIndexStore ) );
StatementOperationContainer statementOperationContainer = dependencies.satisfyDependency(
buildStatementOperations( storeLayer, autoIndexing,
constraintIndexCreator, updateableSchemaState, guard, legacyIndexStore ) );

TransactionHooks hooks = new TransactionHooks();
KernelTransactions kernelTransactions = life.add( new KernelTransactions( statementLocksFactory,
constraintIndexCreator, statementOperations, schemaWriteGuard, transactionHeaderInformationFactory,
constraintIndexCreator, statementOperationContainer, schemaWriteGuard, transactionHeaderInformationFactory,
transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life,
tracers, storageEngine, procedures, transactionIdStore, clock ) );

Expand Down Expand Up @@ -972,7 +972,7 @@ public DependencyResolver getDependencyResolver()
return dependencies;
}

private StatementOperationParts buildStatementOperations(
private StatementOperationContainer buildStatementOperations(
StoreReadLayer storeReadLayer, AutoIndexing autoIndexing,
ConstraintIndexCreator constraintIndexCreator, UpdateableSchemaState updateableSchemaState,
Guard guard, LegacyIndexStore legacyIndexStore )
Expand All @@ -985,7 +985,7 @@ private StatementOperationParts buildStatementOperations(
legacyIndexStore );

QueryRegistrationOperations queryRegistrationOperations =
new StackingQueryRegistrationOperations( StackingQueryRegistrationOperations.LAST_QUERY_ID, Clocks.systemClock() );
new StackingQueryRegistrationOperations( StackingQueryRegistrationOperations.LAST_QUERY_ID, clock );

StatementOperationParts parts = new StatementOperationParts( stateHandlingContext, stateHandlingContext,
stateHandlingContext, stateHandlingContext, stateHandlingContext, stateHandlingContext,
Expand All @@ -1010,10 +1010,11 @@ private StatementOperationParts buildStatementOperations(
// + Guard
GuardingStatementOperations guardingOperations = new GuardingStatementOperations(
parts.entityWriteOperations(), parts.entityReadOperations(), guard );
parts = parts.override( null, null, guardingOperations, guardingOperations, null, null, null, null,
StatementOperationParts guardedParts = parts.override( null, null, guardingOperations,
guardingOperations, null, null, null, null,
null, null, null, null );

return parts;
return new StatementOperationContainer( guardedParts, parts );
}

@Override
Expand Down
Expand Up @@ -45,14 +45,14 @@
* <ol>
* <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li>
* <li>For every transaction...</li>
* <li>Call {@link #initialize(StatementLocks)} which makes this instance
* <li>Call {@link #initialize(StatementLocks, StatementOperationParts)} which makes this instance
* full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li>
* <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction...
* Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in
* the end an equal number of calls must have been issued.</li>
* <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement,
* even if there are more than one current call to {@link #acquire()}. This instance is now again ready
* to be {@link #initialize(StatementLocks) initialized} and used for the transaction
* to be {@link #initialize(StatementLocks, StatementOperationParts) initialized} and used for the transaction
* instance again, when it's initialized.</li>
* </ol>
*/
Expand All @@ -66,18 +66,13 @@ public class KernelStatement implements TxStateHolder, Statement
private int referenceCount;
private volatile ExecutingQueryList executingQueryList;

public KernelStatement(
KernelTransactionImplementation transaction,
TxStateHolder txStateHolder,
StatementOperationParts operations,
StorageStatement storeStatement,
Procedures procedures
)
public KernelStatement( KernelTransactionImplementation transaction,
TxStateHolder txStateHolder, StorageStatement storeStatement, Procedures procedures )
{
this.transaction = transaction;
this.txStateHolder = txStateHolder;
this.storeStatement = storeStatement;
this.facade = new OperationsFacade( transaction, this, operations, procedures );
this.facade = new OperationsFacade( transaction, this, procedures );
this.executingQueryList = ExecutingQueryList.EMPTY;
}

Expand Down Expand Up @@ -173,9 +168,10 @@ void assertOpen()
}
}

void initialize( StatementLocks statementLocks )
void initialize( StatementLocks statementLocks, StatementOperationParts operationParts )
{
this.statementLocks = statementLocks;
facade.initialize( operationParts );
}

public StatementLocks locks()
Expand Down
Expand Up @@ -127,7 +127,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private final SchemaWriteGuard schemaWriteGuard;
private final TransactionHooks hooks;
private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts operations;
private final StatementOperationContainer operationContainer;
private final StorageEngine storageEngine;
private final TransactionTracer tracer;
private final Pool<KernelTransactionImplementation> pool;
Expand All @@ -146,6 +146,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private LegacyIndexTransactionState legacyIndexTransactionState;
private TransactionWriteState writeState;
private TransactionHooks.TransactionHooksState hooksState;
private StatementOperationParts currentTransactionOperations;
private final KernelStatement currentStatement;
private final StorageStatement storageStatement;
private final List<CloseListener> closeListeners = new ArrayList<>( 2 );
Expand Down Expand Up @@ -174,7 +175,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
*/
private final Lock terminationReleaseLock = new ReentrantLock();

public KernelTransactionImplementation( StatementOperationParts operations,
public KernelTransactionImplementation( StatementOperationContainer operationContainer,
SchemaWriteGuard schemaWriteGuard,
TransactionHooks hooks,
ConstraintIndexCreator constraintIndexCreator,
Expand All @@ -188,7 +189,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
TransactionTracer tracer,
StorageEngine storageEngine )
{
this.operations = operations;
this.operationContainer = operationContainer;
this.schemaWriteGuard = schemaWriteGuard;
this.hooks = hooks;
this.constraintIndexCreator = constraintIndexCreator;
Expand All @@ -202,7 +203,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
this.clock = clock;
this.tracer = tracer;
this.storageStatement = storeLayer.newStatement();
this.currentStatement = new KernelStatement( this, this, operations, storageStatement, procedures );
this.currentStatement = new KernelStatement( this, this, storageStatement, procedures );
}

/**
Expand All @@ -226,7 +227,8 @@ public KernelTransactionImplementation initialize(
this.accessMode = accessMode;
this.transactionId = NOT_COMMITTED_TRANSACTION_ID;
this.commitTime = NOT_COMMITTED_TRANSACTION_COMMIT_TIME;
this.currentStatement.initialize( statementLocks );
this.currentTransactionOperations = timeoutMillis > 0 ? operationContainer.guardedParts() : operationContainer.nonGuarderParts();
this.currentStatement.initialize( statementLocks, currentTransactionOperations );
return this;
}

Expand Down Expand Up @@ -605,7 +607,7 @@ private long commit() throws TransactionFailureException
catch ( ConstraintValidationKernelException | CreateConstraintFailureException e )
{
throw new ConstraintViolationTransactionFailureException(
e.getUserMessage( new KeyReadTokenNameLookup( operations.keyReadOperations() ) ), e );
e.getUserMessage( new KeyReadTokenNameLookup( currentTransactionOperations.keyReadOperations() ) ), e );
}
finally
{
Expand Down Expand Up @@ -718,6 +720,7 @@ private void release()
legacyIndexTransactionState = null;
txState = null;
hooksState = null;
currentTransactionOperations = null;
closeListeners.clear();
reuseCount++;
pool.release( this );
Expand Down
Expand Up @@ -68,7 +68,7 @@ public class KernelTransactions extends LifecycleAdapter

private final StatementLocksFactory statementLocksFactory;
private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts statementOperations;
private final StatementOperationContainer statementOperations;
private final SchemaWriteGuard schemaWriteGuard;
private final TransactionHeaderInformationFactory transactionHeaderInformationFactory;
private final TransactionCommitProcess transactionCommitProcess;
Expand Down Expand Up @@ -100,7 +100,7 @@ public class KernelTransactions extends LifecycleAdapter

public KernelTransactions( StatementLocksFactory statementLocksFactory,
ConstraintIndexCreator constraintIndexCreator,
StatementOperationParts statementOperations,
StatementOperationContainer statementOperationContainer,
SchemaWriteGuard schemaWriteGuard,
TransactionHeaderInformationFactory txHeaderFactory,
TransactionCommitProcess transactionCommitProcess,
Expand All @@ -117,7 +117,7 @@ public KernelTransactions( StatementLocksFactory statementLocksFactory,
{
this.statementLocksFactory = statementLocksFactory;
this.constraintIndexCreator = constraintIndexCreator;
this.statementOperations = statementOperations;
this.statementOperations = statementOperationContainer;
this.schemaWriteGuard = schemaWriteGuard;
this.transactionHeaderInformationFactory = txHeaderFactory;
this.transactionCommitProcess = transactionCommitProcess;
Expand Down
Expand Up @@ -101,18 +101,22 @@ public class OperationsFacade
{
private final KernelTransaction tx;
private final KernelStatement statement;
private final StatementOperationParts operations;
private final Procedures procedures;
private StatementOperationParts operations;

OperationsFacade( KernelTransaction tx, KernelStatement statement,
StatementOperationParts operations, Procedures procedures )
Procedures procedures )
{
this.tx = tx;
this.statement = statement;
this.operations = operations;
this.procedures = procedures;
}

public void initialize( StatementOperationParts operationParts )
{
this.operations = operationParts;
}

final KeyReadOperations tokenRead()
{
return operations.keyReadOperations();
Expand Down
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

/**
* Container for different version of operation parts.
* To allow dynamic parts switching depending of transaction nature.
* <ul>
* Currently supported only 2 different kinds of parts:
* <li>full chain of operation without guarding layer</li>
* <li>full chain of operation with guarding layer</li>
* </ul>
*/
public class StatementOperationContainer
{
private StatementOperationParts guardedParts;
private StatementOperationParts nonGuarderParts;

public StatementOperationContainer( StatementOperationParts guardedParts,
StatementOperationParts nonGuarderParts )
{
this.guardedParts = guardedParts;
this.nonGuarderParts = nonGuarderParts;
}

public StatementOperationParts guardedParts()
{
return guardedParts;
}

public StatementOperationParts nonGuarderParts()
{
return nonGuarderParts;
}
}
Expand Up @@ -19,14 +19,13 @@
*/
package org.neo4j.kernel.api;

import java.time.Clock;
import java.util.function.Supplier;

import org.neo4j.collection.pool.Pool;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.StatementOperationParts;
import org.neo4j.kernel.impl.api.StatementOperationContainer;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
Expand Down Expand Up @@ -78,7 +77,7 @@ static Instances kernelTransactionWithInternals( AccessMode accessMode )
when( storageEngine.storeReadLayer() ).thenReturn( storeReadLayer );

KernelTransactionImplementation transaction = new KernelTransactionImplementation(
mock( StatementOperationParts.class ),
mock( StatementOperationContainer.class ),
mock( SchemaWriteGuard.class ),
new TransactionHooks(),
mock( ConstraintIndexCreator.class ), new Procedures(), headerInformationFactory,
Expand Down
Expand Up @@ -103,6 +103,6 @@ public void shouldAlwaysReturnFalseFromNodeHasLabelForNoSuchLabelConstant() thro

private OperationsFacade stubStatement()
{
return new OperationsFacade( mock(KernelTransaction.class), mock( KernelStatement.class ), null, new Procedures() );
return new OperationsFacade( mock(KernelTransaction.class), mock( KernelStatement.class ), new Procedures() );
}
}
Expand Up @@ -40,7 +40,7 @@ public void shouldThrowTerminateExceptionWhenTransactionTerminated() throws Exce
when( transaction.getReasonIfTerminated() ).thenReturn( Status.Transaction.Terminated );
when( transaction.mode() ).thenReturn( AccessMode.Static.FULL );

KernelStatement statement = new KernelStatement( transaction, null, null, mock( StorageStatement.class ), null );
KernelStatement statement = new KernelStatement( transaction, null, mock( StorageStatement.class ), null );
statement.acquire();

statement.readOperations().nodeExists( 0 );
Expand All @@ -52,7 +52,7 @@ public void shouldReleaseStorageStatementWhenForceClosed() throws Exception
// given
StorageStatement storeStatement = mock( StorageStatement.class );
KernelStatement statement = new KernelStatement( mock( KernelTransactionImplementation.class ),
null, null, storeStatement, new Procedures() );
null, storeStatement, new Procedures() );
statement.acquire();

// when
Expand Down
Expand Up @@ -101,6 +101,26 @@ public static Collection<Object[]> parameters()
);
}

@Test
public void useGuardedPartsWhenTransactionHaveTimeout() throws Exception
{
try ( KernelTransaction transaction = newTransaction( 10L ) )
{
transaction.success();
}
verify( operationContainer ).guardedParts();
}

@Test
public void useNonGuardedPartsWhenTransactionHaveTimeout() throws Exception
{
try ( KernelTransaction transaction = newTransaction( accessMode() ) )
{
transaction.success();
}
verify( operationContainer ).nonGuarderParts();
}

@Test
public void shouldCommitSuccessfulTransaction() throws Exception
{
Expand Down
Expand Up @@ -357,7 +357,7 @@ private static class TestKernelTransaction extends KernelTransactionImplementati
@SuppressWarnings( "unchecked" )
TestKernelTransaction( CommitTrackingMonitor monitor )
{
super( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), new TransactionHooks(),
super( mock( StatementOperationContainer.class ), mock( SchemaWriteGuard.class ), new TransactionHooks(),
mock( ConstraintIndexCreator.class ), new Procedures(), TransactionHeaderInformationFactory.DEFAULT,
mock( TransactionCommitProcess.class ), monitor, () -> mock( LegacyIndexTransactionState.class ),
mock( Pool.class ), Clocks.fakeClock(), TransactionTracer.NULL,
Expand Down

0 comments on commit 8f8b593

Please sign in to comment.