Skip to content

Commit

Permalink
Fixes label index update race
Browse files Browse the repository at this point in the history
Label index and schema index updates are generated during application of
each transaction and added to a queue to be applied using WorkSync at the
at the end of the whole batch of transactions. Also concurrently applying
transaction batches may apply their changes in the same "session",
with the help of WorkSync.

Order which transactions apply are coordinated using the high-level locks
which are released per transaction after being fully applied.
The exception to this is node creation, which doesn't acquire high-level lock.

The saving grace has been, perhaps unknowingly the lower-level
LockService, which is used only during commit and was introduced to
allow safe property reading in the face of concurrent property updates.
Since some time back that has been replaced by the safe-zone approach,
but still remains for background index population, since index population
cannot be tied to specific transaction id, which safe-zone revolves around.
The LockService guarded for e.g. node CREATE followed by DELETE being
reordered to DELETE followed by CREATE. Almost.
It turns out that those locks were released before applying label index
and schema index updates. This allowed a race where transactions could
queue updates to the WorkSync in the wrong order, allowing the unexpected
reordering mentioned above.

The fix here is to hold the locks slightly longer so that closing of
appliers happens before releasing them. This necessitates the LockGroup
to be instantiated before the appliers, which means the locks will be
held throughout a whole transaction batch, as opposed to held only
for a single transaction in a batch. The only effect this has is to
potentially delay some index population job currently running.
A note here is that embedded mode never applies batches of sizes
larger than 1, but clustered setups do.
  • Loading branch information
tinwelint committed Jan 8, 2019
1 parent 93d179d commit df2ac32
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 17 deletions.
Expand Up @@ -318,18 +318,16 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro
{ {
// Have these command appliers as separate try-with-resource to have better control over // Have these command appliers as separate try-with-resource to have better control over
// point between closing this and the locks above // point between closing this and the locks above
try ( BatchTransactionApplier batchApplier = applier( mode ) ) try ( LockGroup locks = new LockGroup();
BatchTransactionApplier batchApplier = applier( mode ) )
{ {
while ( batch != null ) while ( batch != null )
{ {
try ( LockGroup locks = new LockGroup() ) try ( TransactionApplier txApplier = batchApplier.startTx( batch, locks ) )
{ {
try ( TransactionApplier txApplier = batchApplier.startTx( batch, locks ) ) batch.accept( txApplier );
{
batch.accept( txApplier );
}
batch = batch.next();
} }
batch = batch.next();
} }
} }
catch ( Throwable cause ) catch ( Throwable cause )
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.kernel.impl.transaction.command; package org.neo4j.kernel.impl.transaction.command;


import java.io.IOException;

import org.neo4j.kernel.impl.api.TransactionApplier; import org.neo4j.kernel.impl.api.TransactionApplier;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor; import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.LockGroup;
Expand Down Expand Up @@ -62,12 +60,6 @@ public NeoStoreTransactionApplier( Version version, NeoStores neoStores, CacheAc
this.cacheAccess = cacheAccess; this.cacheAccess = cacheAccess;
} }


@Override
public void close()
{
lockGroup.close();
}

@Override @Override
public boolean visitNodeCommand( Command.NodeCommand command ) public boolean visitNodeCommand( Command.NodeCommand command )
{ {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.junit.Test; import org.junit.Test;
import org.junit.rules.RuleChain; import org.junit.rules.RuleChain;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
Expand All @@ -32,10 +33,12 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.DelegatingPageCache; import org.neo4j.io.pagecache.DelegatingPageCache;
Expand All @@ -45,15 +48,22 @@
import org.neo4j.kernel.impl.api.BatchTransactionApplier; import org.neo4j.kernel.impl.api.BatchTransactionApplier;
import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade; import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade;
import org.neo4j.kernel.impl.api.CountsAccessor; import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.api.TransactionApplier;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.impl.store.UnderlyingStorageException; import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.counts.CountsTracker; import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.FakeCommitment; import org.neo4j.kernel.impl.transaction.log.FakeCommitment;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.storageengine.api.CommandsToApply; import org.neo4j.storageengine.api.CommandsToApply;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.storageengine.api.StoreFileMetadata;
import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.PageCacheRule;
Expand All @@ -69,7 +79,9 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;


Expand Down Expand Up @@ -199,13 +211,53 @@ public void shouldListAllStoreFiles()
assertEquals( expectedStoreTypes, actualStoreTypes ); assertEquals( expectedStoreTypes, actualStoreTypes );
} }


@Test
public void shouldCloseLockGroupAfterAppliers() throws Exception
{
// given
long nodeId = 5;
LockService lockService = mock( LockService.class );
Lock nodeLock = mock( Lock.class );
when( lockService.acquireNodeLock( nodeId, LockService.LockType.WRITE_LOCK ) ).thenReturn( nodeLock );
Consumer<Boolean> applierCloseCall = mock( Consumer.class ); // <-- simply so that we can use InOrder mockito construct
CapturingBatchTransactionApplierFacade applier = new CapturingBatchTransactionApplierFacade( applierCloseCall );
RecordStorageEngine engine = recordStorageEngineBuilder()
.lockService( lockService )
.transactionApplierTransformer( applier::wrapAroundActualApplier )
.build();
CommandsToApply commandsToApply = mock( CommandsToApply.class );
when( commandsToApply.accept( any() ) ).thenAnswer( invocationOnMock ->
{
// Visit one node command
Visitor<StorageCommand,IOException> visitor = invocationOnMock.getArgument( 0 );
NodeRecord after = new NodeRecord( nodeId );
after.setInUse( true );
visitor.visit( new Command.NodeCommand( new NodeRecord( nodeId ), after ) );
return null;
} );

// when
engine.apply( commandsToApply, TransactionApplicationMode.INTERNAL );

// then
InOrder inOrder = inOrder( lockService, applierCloseCall, nodeLock );
inOrder.verify( lockService ).acquireNodeLock( nodeId, LockService.LockType.WRITE_LOCK );
inOrder.verify( applierCloseCall ).accept( true );
inOrder.verify( nodeLock, times( 1 ) ).release();
inOrder.verifyNoMoreInteractions();
}

private RecordStorageEngine buildRecordStorageEngine() private RecordStorageEngine buildRecordStorageEngine()
{
return recordStorageEngineBuilder().build();
}

private RecordStorageEngineRule.Builder recordStorageEngineBuilder()
{ {
return storageEngineRule return storageEngineRule
.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ) .getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) )
.storeDirectory( storeDir ) .storeDirectory( storeDir )
.databaseHealth( databaseHealth ) .databaseHealth( databaseHealth );
.build();
} }


private Exception executeFailingTransaction( RecordStorageEngine engine ) throws IOException private Exception executeFailingTransaction( RecordStorageEngine engine ) throws IOException
Expand Down Expand Up @@ -256,4 +308,39 @@ public void close() throws Exception
} }
} }


private class CapturingBatchTransactionApplierFacade extends BatchTransactionApplierFacade
{
private final Consumer<Boolean> applierCloseCall;
private BatchTransactionApplierFacade actual;

CapturingBatchTransactionApplierFacade( Consumer<Boolean> applierCloseCall )
{
this.applierCloseCall = applierCloseCall;
}

CapturingBatchTransactionApplierFacade wrapAroundActualApplier( BatchTransactionApplierFacade actual )
{
this.actual = actual;
return this;
}

@Override
public TransactionApplier startTx( CommandsToApply transaction ) throws IOException
{
return actual.startTx( transaction );
}

@Override
public TransactionApplier startTx( CommandsToApply transaction, LockGroup lockGroup ) throws IOException
{
return actual.startTx( transaction, lockGroup );
}

@Override
public void close() throws Exception
{
applierCloseCall.accept( true );
actual.close();
}
}
} }

0 comments on commit df2ac32

Please sign in to comment.