Skip to content

Commit

Permalink
KernelTransactionImplementation doesn't reference Locks.Client
Browse files Browse the repository at this point in the history
since those instances are hard wired to a specific Locks instance. Other
similar references are always kept on the service level, like
CommitProcess, where there's a proxy in between which switches when role
switches.

This commit undos parts of #5817 because of the issue described in #5996
and because this solution where Locks is referenced conforms with
other types of references and is a simpler fix to the original problem.
  • Loading branch information
tinwelint authored and lutovich committed Dec 10, 2015
1 parent c565fd9 commit 1eb32a0
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 41 deletions.
Expand Up @@ -63,16 +63,16 @@
import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.state.NeoStoreTransactionContext;
import org.neo4j.kernel.impl.transaction.state.TransactionRecordState; import org.neo4j.kernel.impl.transaction.state.TransactionRecordState;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionEvent; import org.neo4j.kernel.impl.transaction.tracing.TransactionEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.collection.ArrayCollection; import org.neo4j.kernel.impl.util.collection.ArrayCollection;


import static org.neo4j.kernel.impl.api.TransactionApplicationMode.INTERNAL;

import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL;
import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE;
import static org.neo4j.kernel.impl.api.TransactionApplicationMode.INTERNAL;


/** /**
* This class should replace the {@link org.neo4j.kernel.api.KernelTransaction} interface, and take its name, as soon * This class should replace the {@link org.neo4j.kernel.api.KernelTransaction} interface, and take its name, as soon
Expand Down Expand Up @@ -144,6 +144,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private final Clock clock; private final Clock clock;
private final TransactionToRecordStateVisitor txStateToRecordStateVisitor = new TransactionToRecordStateVisitor(); private final TransactionToRecordStateVisitor txStateToRecordStateVisitor = new TransactionToRecordStateVisitor();
private final Collection<Command> extractedCommands = new ArrayCollection<>( 32 ); private final Collection<Command> extractedCommands = new ArrayCollection<>( 32 );
private final Locks locksManager;
private TransactionState txState; private TransactionState txState;
private LegacyIndexTransactionState legacyIndexTransactionState; private LegacyIndexTransactionState legacyIndexTransactionState;
private TransactionType transactionType = TransactionType.ANY; private TransactionType transactionType = TransactionType.ANY;
Expand All @@ -162,6 +163,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private final TransactionTracer tracer; private final TransactionTracer tracer;
private TransactionEvent transactionEvent; private TransactionEvent transactionEvent;
private CloseListener closeListener; private CloseListener closeListener;
private final NeoStoreTransactionContext context;


public KernelTransactionImplementation( StatementOperationParts operations, public KernelTransactionImplementation( StatementOperationParts operations,
SchemaWriteGuard schemaWriteGuard, LabelScanStore labelScanStore, SchemaWriteGuard schemaWriteGuard, LabelScanStore labelScanStore,
Expand All @@ -170,7 +172,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
TransactionRecordState recordState, TransactionRecordState recordState,
RecordStateForCacheAccessor recordStateForCache, RecordStateForCacheAccessor recordStateForCache,
SchemaIndexProviderMap providerMap, NeoStore neoStore, SchemaIndexProviderMap providerMap, NeoStore neoStore,
Locks.Client locks, TransactionHooks hooks, Locks locks, TransactionHooks hooks,
ConstraintIndexCreator constraintIndexCreator, ConstraintIndexCreator constraintIndexCreator,
TransactionHeaderInformationFactory headerInformationFactory, TransactionHeaderInformationFactory headerInformationFactory,
TransactionCommitProcess commitProcess, TransactionCommitProcess commitProcess,
Expand All @@ -180,7 +182,8 @@ public KernelTransactionImplementation( StatementOperationParts operations,
LegacyIndexTransactionState legacyIndexTransactionState, LegacyIndexTransactionState legacyIndexTransactionState,
Pool<KernelTransactionImplementation> pool, Pool<KernelTransactionImplementation> pool,
Clock clock, Clock clock,
TransactionTracer tracer ) TransactionTracer tracer,
NeoStoreTransactionContext context )
{ {
this.operations = operations; this.operations = operations;
this.schemaWriteGuard = schemaWriteGuard; this.schemaWriteGuard = schemaWriteGuard;
Expand All @@ -190,14 +193,15 @@ public KernelTransactionImplementation( StatementOperationParts operations,
this.recordStateForCache = recordStateForCache; this.recordStateForCache = recordStateForCache;
this.providerMap = providerMap; this.providerMap = providerMap;
this.schemaState = schemaState; this.schemaState = schemaState;
this.locksManager = locks;
this.hooks = hooks; this.hooks = hooks;
this.locks = locks;
this.constraintIndexCreator = constraintIndexCreator; this.constraintIndexCreator = constraintIndexCreator;
this.headerInformationFactory = headerInformationFactory; this.headerInformationFactory = headerInformationFactory;
this.commitProcess = commitProcess; this.commitProcess = commitProcess;
this.transactionMonitor = transactionMonitor; this.transactionMonitor = transactionMonitor;
this.persistenceCache = persistenceCache; this.persistenceCache = persistenceCache;
this.storeLayer = storeLayer; this.storeLayer = storeLayer;
this.context = context;
this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState );
this.pool = pool; this.pool = pool;
this.clock = clock; this.clock = clock;
Expand All @@ -208,8 +212,9 @@ public KernelTransactionImplementation( StatementOperationParts operations,
/** Reset this transaction to a vanilla state, turning it into a logically new transaction. */ /** Reset this transaction to a vanilla state, turning it into a logically new transaction. */
public KernelTransactionImplementation initialize( long lastCommittedTx ) public KernelTransactionImplementation initialize( long lastCommittedTx )
{ {
assert locks != null : "This transaction has been disposed off, it should not be used."; this.locks = locksManager.newClient();
this.closing = closed = failure = success = false; this.context.bind( locks );
this.closing = closed = failure = success = terminated = false;
this.transactionType = TransactionType.ANY; this.transactionType = TransactionType.ANY;
this.beforeHookInvoked = false; this.beforeHookInvoked = false;
this.recordState.initialize( lastCommittedTx ); this.recordState.initialize( lastCommittedTx );
Expand Down Expand Up @@ -622,18 +627,9 @@ private void afterRollback()
/** Release resources held up by this transaction & return it to the transaction pool. */ /** Release resources held up by this transaction & return it to the transaction pool. */
private void release() private void release()
{ {
locks.releaseAll(); locks.close();
if ( terminated ) locks = null;
{ pool.release( this );
// This transaction has been externally marked for termination.
// Just dispose of this transaction and don't return it to the pool.
dispose();
}
else
{
// Return this instance to the pool so that another transaction may use it.
pool.release( this );
}
} }


private class TransactionToRecordStateVisitor extends TxStateVisitor.Adapter private class TransactionToRecordStateVisitor extends TxStateVisitor.Adapter
Expand Down
Expand Up @@ -150,8 +150,6 @@ public KernelTransactions( NeoStoreTransactionContextSupplier neoStoreTransactio
public KernelTransactionImplementation newInstance() public KernelTransactionImplementation newInstance()
{ {
NeoStoreTransactionContext context = neoStoreTransactionContextSupplier.acquire(); NeoStoreTransactionContext context = neoStoreTransactionContextSupplier.acquire();
Locks.Client locksClient = locks.newClient();
context.bind( locksClient );
TransactionRecordState recordState = new TransactionRecordState( TransactionRecordState recordState = new TransactionRecordState(
neoStore, integrityValidator, context ); neoStore, integrityValidator, context );
LegacyIndexTransactionState legacyIndexTransactionState = LegacyIndexTransactionState legacyIndexTransactionState =
Expand All @@ -161,9 +159,10 @@ public KernelTransactionImplementation newInstance()
KernelTransactionImplementation tx = new KernelTransactionImplementation( KernelTransactionImplementation tx = new KernelTransactionImplementation(
statementOperations, schemaWriteGuard, statementOperations, schemaWriteGuard,
labelScanStore, indexingService, updateableSchemaState, recordState, recordStateForCache, providerMap, labelScanStore, indexingService, updateableSchemaState, recordState, recordStateForCache, providerMap,
neoStore, locksClient, hooks, constraintIndexCreator, transactionHeaderInformationFactory, neoStore, locks, hooks, constraintIndexCreator, transactionHeaderInformationFactory,
transactionCommitProcess, transactionMonitor, persistenceCache, storeLayer, transactionCommitProcess, transactionMonitor, persistenceCache, storeLayer,
legacyIndexTransactionState, localTxPool, Clock.SYSTEM_CLOCK, tracers.transactionTracer ); legacyIndexTransactionState, localTxPool, Clock.SYSTEM_CLOCK, tracers.transactionTracer,
context );


allTransactions.add( tx ); allTransactions.add( tx );


Expand Down Expand Up @@ -224,9 +223,7 @@ public void disposeAll()
{ {
for ( KernelTransactionImplementation tx : allTransactions ) for ( KernelTransactionImplementation tx : allTransactions )
{ {
// we mark all transactions for termination since we want to make sure these transactions // We mark all transactions for termination since we want to be on the safe side here.
// won't be reused, ever. Each transaction has, among other things, a Locks.Client and we
// certainly want to keep that from being reused from this point.
tx.markForTermination(); tx.markForTermination();
} }
localTxPool.disposeAll(); localTxPool.disposeAll();
Expand All @@ -250,6 +247,4 @@ private void assertDatabaseIsRunning()
throw new DatabaseShutdownException(); throw new DatabaseShutdownException();
} }
} }


} }
Expand Up @@ -32,12 +32,14 @@
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.store.PersistenceCache; import org.neo4j.kernel.impl.api.store.PersistenceCache;
import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreReadLayer;
import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.NoOpLocks;
import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.state.NeoStoreTransactionContext;
import org.neo4j.kernel.impl.transaction.state.TransactionRecordState; import org.neo4j.kernel.impl.transaction.state.TransactionRecordState;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;


import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
Expand All @@ -54,14 +56,16 @@ static KernelTransaction kernelTransaction()
mock( SchemaWriteGuard.class ), null, null, mock( SchemaWriteGuard.class ), null, null,
null, mock( TransactionRecordState.class ), null, mock( TransactionRecordState.class ),
mock( RecordStateForCacheAccessor.class ), mock( RecordStateForCacheAccessor.class ),
null, mock( NeoStore.class ), new NoOpClient(), new TransactionHooks(), null, mock( NeoStore.class ), new NoOpLocks(),
new TransactionHooks(),
mock( ConstraintIndexCreator.class ), headerInformationFactory, mock( ConstraintIndexCreator.class ), headerInformationFactory,
mock( TransactionRepresentationCommitProcess.class ), mock( TransactionMonitor.class ), mock( TransactionRepresentationCommitProcess.class ), mock( TransactionMonitor.class ),
mock( PersistenceCache.class ), mock( PersistenceCache.class ),
mock( StoreReadLayer.class ), mock( StoreReadLayer.class ),
mock( LegacyIndexTransactionState.class ), mock( LegacyIndexTransactionState.class ),
mock(Pool.class), mock(Pool.class),
Clock.SYSTEM_CLOCK, Clock.SYSTEM_CLOCK,
TransactionTracer.NULL ); TransactionTracer.NULL,
mock( NeoStoreTransactionContext.class ) );
} }
} }
Expand Up @@ -36,23 +36,33 @@
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation; import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.store.PersistenceCache;
import org.neo4j.kernel.impl.api.store.StoreReadLayer;
import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.locking.NoOpClient;
import org.neo4j.kernel.impl.locking.NoOpLocks;
import org.neo4j.kernel.impl.locking.community.CommunityLockManger;
import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.state.NeoStoreTransactionContext;
import org.neo4j.kernel.impl.transaction.state.TransactionRecordState; import org.neo4j.kernel.impl.transaction.state.TransactionRecordState;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.test.DoubleLatch; import org.neo4j.test.DoubleLatch;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -356,7 +366,7 @@ public Void answer( InvocationOnMock invocationOnMock ) throws Throwable
} }


@Test @Test
public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws Exception public void shouldStillReturnTransactionInstanceWithTerminationMarkToPool() throws Exception
{ {
// GIVEN // GIVEN
KernelTransactionImplementation transaction = newTransaction(); KernelTransactionImplementation transaction = newTransaction();
Expand All @@ -366,7 +376,43 @@ public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws
transaction.close(); transaction.close();


// THEN // THEN
verifyZeroInteractions( pool ); verify( pool ).release( transaction );
}

@Test
public void shouldBeAbleToReuseTerminatedTransaction() throws Exception
{
// GIVEN
KernelTransactionImplementation transaction = newTransaction();
transaction.close();
transaction.markForTermination();

// WHEN
transaction.initialize( 10L );
transaction.txState().nodeDoCreate( 11L );
transaction.success();
transaction.close();

// THEN
verify( commitProcess ).commit( any( TransactionRepresentation.class ), any( LockGroup.class ),
any( CommitEvent.class ), any( TransactionApplicationMode.class ) );
}

@Test
public void shouldAcquireNewLocksClientEveryTimeTransactionIsReused() throws Exception
{
// GIVEN
KernelTransactionImplementation transaction = newTransaction();
transaction.close();
verify( locks ).newClient();
reset( locks );

// WHEN
transaction.initialize( 10L );
transaction.close();

// THEN
verify( locks ).newClient();
} }


private final NeoStore neoStore = mock( NeoStore.class ); private final NeoStore neoStore = mock( NeoStore.class );
Expand All @@ -375,12 +421,15 @@ public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws
private final RecordStateForCacheAccessor recordStateAccessor = mock( RecordStateForCacheAccessor.class ); private final RecordStateForCacheAccessor recordStateAccessor = mock( RecordStateForCacheAccessor.class );
private final LegacyIndexTransactionState legacyIndexState = mock( LegacyIndexTransactionState.class ); private final LegacyIndexTransactionState legacyIndexState = mock( LegacyIndexTransactionState.class );
private final TransactionMonitor transactionMonitor = mock( TransactionMonitor.class ); private final TransactionMonitor transactionMonitor = mock( TransactionMonitor.class );
private final CapturingCommitProcess commitProcess = new CapturingCommitProcess(); private final CapturingCommitProcess commitProcess = spy( new CapturingCommitProcess() );
private final TransactionHeaderInformation headerInformation = mock( TransactionHeaderInformation.class ); private final TransactionHeaderInformation headerInformation = mock( TransactionHeaderInformation.class );
private final TransactionHeaderInformationFactory headerInformationFactory = private final TransactionHeaderInformationFactory headerInformationFactory =
mock( TransactionHeaderInformationFactory.class ); mock( TransactionHeaderInformationFactory.class );
private final FakeClock clock = new FakeClock(); private final FakeClock clock = new FakeClock();
private final Pool pool = mock( Pool.class ); private final Pool pool = mock( Pool.class );
private final Locks locks = spy( new NoOpLocks() );
private final PersistenceCache persistenceCache = mock( PersistenceCache.class );
private final StoreReadLayer storeReadLayer = mock( StoreReadLayer.class );


@Before @Before
public void before() public void before()
Expand All @@ -392,9 +441,10 @@ public void before()
private KernelTransactionImplementation newTransaction() private KernelTransactionImplementation newTransaction()
{ {
KernelTransactionImplementation transaction = new KernelTransactionImplementation( KernelTransactionImplementation transaction = new KernelTransactionImplementation(
null, null, null, null, null, recordState, recordStateAccessor, null, neoStore, new NoOpClient(), null, null, null, null, null, recordState, recordStateAccessor, null, neoStore,
hooks, null, headerInformationFactory, commitProcess, transactionMonitor, null, null, locks, hooks, null, headerInformationFactory, commitProcess, transactionMonitor,
legacyIndexState, pool, clock, TransactionTracer.NULL ); persistenceCache, storeReadLayer,
legacyIndexState, pool, clock, TransactionTracer.NULL, mock( NeoStoreTransactionContext.class ) );
transaction.initialize( 0 ); transaction.initialize( 0 );
return transaction; return transaction;
} }
Expand Down
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.locking;

import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class NoOpLocks extends LifecycleAdapter implements Locks
{
private boolean closed;

@Override
public void shutdown() throws Throwable
{
closed = true;
}

@Override
public Client newClient()
{
if ( closed )
{
throw new IllegalStateException();
}
return new NoOpClient();
}

@Override
public void accept( Visitor visitor )
{
}
}
Expand Up @@ -84,19 +84,19 @@ public void shouldNotHaveTransactionsRunningThroughRoleSwitchProduceInconsistenc
{ {
// Duration of this test. If the timeout is hit in the middle of a round, the round will be completed // Duration of this test. If the timeout is hit in the middle of a round, the round will be completed
// and exit after that. // and exit after that.
ManagedCluster cluster = clusterRule.startCluster();
long duration = parseTimeMillis.apply( System.getProperty( getClass().getName() + ".duration", "30s" ) ); long duration = parseTimeMillis.apply( System.getProperty( getClass().getName() + ".duration", "30s" ) );
long endTime = currentTimeMillis() + duration; long endTime = currentTimeMillis() + duration;
while ( currentTimeMillis() < endTime ) while ( currentTimeMillis() < endTime )
{ {
oneRound(); oneRound( cluster );
} }
} }


private void oneRound() throws Throwable private void oneRound( ManagedCluster cluster ) throws Throwable
{ {
// GIVEN a cluster and a node // GIVEN a cluster and a node
final String key = "key"; final String key = "key";
ManagedCluster cluster = clusterRule.startCluster();
final GraphDatabaseService master = cluster.getMaster(); final GraphDatabaseService master = cluster.getMaster();
final long nodeId = createNode( master ); final long nodeId = createNode( master );
cluster.sync(); cluster.sync();
Expand Down

0 comments on commit 1eb32a0

Please sign in to comment.