Skip to content

Commit

Permalink
Ensures all lock clients closed during role switch
Browse files Browse the repository at this point in the history
Observed issue:
After an instance switching role in HA, for some reason some transactions
just wouldn't respect the locks that other transactions held. This would
lead to some transactions making changes on top of stale data, effectively
overwriting changes of other transactions.

Cause(s):
Among other things a new lock manager instance is created during role
switching. Transaction instances pooled and created with a locks client
instance which is kept throughout its life. While there may be transactions
still executing at the time of switching role, all open transactions are
marked for termination and transactions pool cleared. Transactions marked
for termination cannot commit, but when closed they were happily returned
to the transaction pool and would be reused again and again.
These transactions would have its termination flag reset at the point of
being reused and would still contain a locks client for the previous
lock manager. These transaction instances would forever disrespect the
locks that all other transactions respected and would continue to do so until
the next role switch, where they at least would have a chance of being
properly disposed of. Although the next role switch would have a chance of
introducing new rogue transactions as well...

Solution(s)
Many small issues here and there caused this to happen. There are a number
of changes that will prevent this from happening. Some of them are
belt-and-suspenders additions, although at virtually no cost:

- Disposing kernel transactions as part of switching to pending.
  Previously they were disposed of as late as after creating the new ones,
  which would increase the chance of transactions using locks clients from
  the old lock manager to be used and cause harm.
- Mark _all_ transactions, not just open transactions, for termination
  when switching role. No transactions referring to the previous lock manager
  can be around after the role switch.
- Do not return transaction (KernelTransactionImplementation) instances
  that are marked for termination to the pool.
- Have Locks instances know when they are closed and refuse to hand out new
  clients if closed.

On top of those changes, with accompanied unit tests, there's an added
stress test `TransactionThroughMasterSwitchStressIT` which quite
deterministically could reproduce the problem. It can be set to run for a
longer period of time (default 30s in the main build) using
system property, like:

  `-Dorg.neo4j.kernel.ha.transaction.TransactionThroughMasterSwitchStressIT.duration=10m`

Currently that stress test provokes only one role switch scenario:
master --> pending --> master, although more scenarios could be added
later on for an even broader net to catch these sorts of problems.
  • Loading branch information
tinwelint committed Nov 5, 2015
1 parent 02290a8 commit d1a04b5
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 65 deletions.
27 changes: 21 additions & 6 deletions community/io/src/test/java/org/neo4j/test/TargetDirectory.java
Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.test;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;

import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;

import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -57,9 +58,21 @@ public class TargetDirectory
public class TestDirectory implements TestRule
{
private File subdir = null;
private boolean keepDirectoryAfterSuccefulTest;

private TestDirectory() { }

/**
* Tell this {@link Rule} to keep the store directory, even after a successful test.
* It's just a useful debug mechanism to have for analyzing store after a test.
* by default directories aren't kept.
*/
public TestDirectory keepDirectoryAfterSuccefulTest()
{
keepDirectoryAfterSuccefulTest = true;
return this;
}

public String absolutePath()
{
return directory().getAbsolutePath();
Expand Down Expand Up @@ -88,7 +101,8 @@ public File directory( String name ) {
return dir;
}

public File graphDbDir() {
public File graphDbDir()
{
return directory( "graph-db" );
}

Expand Down Expand Up @@ -124,7 +138,7 @@ public String toString()

private void complete( boolean success )
{
if ( success && subdir != null )
if ( success && subdir != null && !keepDirectoryAfterSuccefulTest )
{
try
{
Expand Down Expand Up @@ -155,6 +169,7 @@ private void complete( boolean success )
* {@link org.neo4j.test.TargetDirectory} directly. The easiest way to do this is with
* {@link #testDirForTest(Class)}.
*/
@Deprecated
public static TargetDirectory forTest( Class<?> owningTest )
{
return new TargetDirectory( new DefaultFileSystemAbstraction(), owningTest );
Expand Down
Expand Up @@ -1365,14 +1365,25 @@ public boolean unregisterIndexProvider( String name )
}

/**
* Hook that must be called whenever there is an HA mode switch (eg master/slave switch).
* Hook that must be called before there is an HA mode switch (eg master/slave switch),
* i.e. after state has changed to pending and before state is about to change to the new target state.
* This must only be called when the database is otherwise inaccessible.
*/
public void beforeModeSwitch()
{
// Get rid of all pooled transactions, as they will otherwise reference
// components that have been swapped out during the mode switch.
kernelModule.kernelTransactions().disposeAll();
}

/**
* Hook that must be called after an HA mode switch (eg master/slave switch) have completed.
* This must only be called when the database is otherwise inaccessible.
*/
public void afterModeSwitch()
{
loadSchemaCache();

// Stop all running transactions and get rid of all pooled transactions, as they will otherwise reference
// Get rid of all pooled transactions, as they will otherwise reference
// components that have been swapped out during the mode switch.
kernelModule.kernelTransactions().disposeAll();
}
Expand Down
Expand Up @@ -69,9 +69,10 @@
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.util.collection.ArrayCollection;

import static org.neo4j.kernel.impl.api.TransactionApplicationMode.INTERNAL;

import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL;
import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE;
import static org.neo4j.kernel.impl.api.TransactionApplicationMode.INTERNAL;

/**
* This class should replace the {@link org.neo4j.kernel.api.KernelTransaction} interface, and take its name, as soon
Expand Down Expand Up @@ -208,7 +209,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
public KernelTransactionImplementation initialize( long lastCommittedTx )
{
assert locks != null : "This transaction has been disposed off, it should not be used.";
this.terminated = closing = closed = failure = success = false;
this.closing = closed = failure = success = false;
this.transactionType = TransactionType.ANY;
this.beforeHookInvoked = false;
this.recordState.initialize( lastCommittedTx );
Expand Down Expand Up @@ -335,11 +336,7 @@ private void closeTransaction()
{
assertTransactionOpen();
closed = true;
if ( currentStatement != null )
{
currentStatement.forceClose();
currentStatement = null;
}
closeCurrentStatementIfAny();
if ( closeListener != null )
{
closeListener.notify( success );
Expand Down Expand Up @@ -626,7 +623,17 @@ private void afterRollback()
private void release()
{
locks.releaseAll();
pool.release( this );
if ( terminated )
{
// This transaction has been externally marked for termination.
// Just dispose of this transaction and don't return it to the pool.
dispose();
}
else
{
// Return this instance to the pool so that another transaction may use it.
pool.release( this );
}
}

private class TransactionToRecordStateVisitor extends TxStateVisitor.Adapter
Expand Down Expand Up @@ -959,4 +966,10 @@ public void registerCloseListener( CloseListener listener )
assert closeListener == null;
closeListener = listener;
}

@Override
public String toString()
{
return "KernelTransaction[" + this.locks.getLockSessionId() + "]";
}
}
Expand Up @@ -224,10 +224,10 @@ public void disposeAll()
{
for ( KernelTransactionImplementation tx : allTransactions )
{
if ( tx.isOpen() )
{
tx.markForTermination();
}
// we mark all transactions for termination since we want to make sure these transactions
// won't be reused, ever. Each transaction has, among other things, a Locks.Client and we
// certainly want to keep that from being reused from this point.
tx.markForTermination();
}
localTxPool.disposeAll();
globalTxPool.disposeAll();
Expand Down
Expand Up @@ -118,6 +118,8 @@ interface Client extends AutoCloseable
/**
* A client is able to grab and release locks, and compete with other clients for them. This can be re-used until
* you call {@link Locks.Client#close()}.
*
* @throws IllegalStateException if this instance has been closed, i.e has had {@link #shutdown()} called.
*/
Client newClient();

Expand Down
Expand Up @@ -25,10 +25,18 @@
public class CommunityLockManger extends LifecycleAdapter implements Locks
{
private final LockManagerImpl manager = new LockManagerImpl( new RagManager() );
private volatile boolean closed;

@Override
public Client newClient()
{
// We check this volatile closed flag here, which may seem like a contention overhead, but as the time
// of writing we apply pooling of transactions and in extension pooling of lock clients,
// so this method is called very rarely.
if ( closed )
{
throw new IllegalStateException( this + " already closed" );
}
return new CommunityLockClient( manager );
}

Expand All @@ -51,4 +59,10 @@ public boolean visit( RWLock element ) throws RuntimeException
}
} );
}

@Override
public void shutdown() throws Throwable
{
closed = true;
}
}
Expand Up @@ -39,16 +39,15 @@
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.NoOpClient;
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.state.TransactionRecordState;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.test.DoubleLatch;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyListOf;
Expand All @@ -57,8 +56,11 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class KernelTransactionImplementationTest
{
@Test
Expand Down Expand Up @@ -353,6 +355,20 @@ public Void answer( InvocationOnMock invocationOnMock ) throws Throwable
assertEquals( startingTime+5, commitProcess.transaction.getTimeCommitted() );
}

@Test
public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws Exception
{
// GIVEN
KernelTransactionImplementation transaction = newTransaction();

// WHEN
transaction.markForTermination();
transaction.close();

// THEN
verifyZeroInteractions( pool );
}

private final NeoStore neoStore = mock( NeoStore.class );
private final TransactionHooks hooks = new TransactionHooks();
private final TransactionRecordState recordState = mock( TransactionRecordState.class );
Expand All @@ -364,6 +380,7 @@ public Void answer( InvocationOnMock invocationOnMock ) throws Throwable
private final TransactionHeaderInformationFactory headerInformationFactory =
mock( TransactionHeaderInformationFactory.class );
private final FakeClock clock = new FakeClock();
private final Pool pool = mock( Pool.class );

@Before
public void before()
Expand All @@ -377,7 +394,7 @@ private KernelTransactionImplementation newTransaction()
KernelTransactionImplementation transaction = new KernelTransactionImplementation(
null, null, null, null, null, recordState, recordStateAccessor, null, neoStore, new NoOpClient(),
hooks, null, headerInformationFactory, commitProcess, transactionMonitor, null, null,
legacyIndexState, mock( Pool.class ), clock, TransactionTracer.NULL );
legacyIndexState, pool, clock, TransactionTracer.NULL );
transaction.initialize( 0 );
return transaction;
}
Expand Down
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.locking;

import org.junit.Ignore;
import org.junit.Test;

import org.neo4j.kernel.impl.locking.Locks.Client;

import static org.junit.Assert.fail;

@Ignore("Not a test. This is a compatibility suite, run from LockingCompatibilityTestSuite.")
public class CloseCompatibility extends LockingCompatibilityTestSuite.Compatibility
{
public CloseCompatibility( LockingCompatibilityTestSuite suite )
{
super( suite );
}

@Test
public void shouldNotBeAbleToHandOutClientsIfShutDown() throws Throwable
{
// GIVEN a lock manager and working clients
try ( Client client = locks.newClient() )
{
client.acquireExclusive( ResourceTypes.NODE, 0 );
}

// WHEN
locks.stop();
locks.shutdown();

// THEN
try
{
locks.newClient();
fail( "Should fail" );
}
catch ( IllegalStateException e )
{
// Good
}
}
}
Expand Up @@ -19,11 +19,9 @@
*/
package org.neo4j.kernel.impl.locking;

import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.fail;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import static org.neo4j.test.OtherThreadRule.isWaiting;
import org.junit.Rule;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -32,19 +30,24 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Rule;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.neo4j.kernel.api.index.ParameterizedSuiteRunner;
import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.test.OtherThreadRule;

import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.fail;
import static org.hamcrest.MatcherAssert.assertThat;

import static org.neo4j.test.OtherThreadRule.isWaiting;

/** Base for locking tests. */
@RunWith(ParameterizedSuiteRunner.class)
@Suite.SuiteClasses({
AcquireAndReleaseLocksCompatibility.class,
DeadlockCompatibility.class,
LockReentrancyCompatibility.class,
RWLockCompatibility.class
RWLockCompatibility.class,
CloseCompatibility.class
})
public abstract class LockingCompatibilityTestSuite
{
Expand Down

0 comments on commit d1a04b5

Please sign in to comment.