From df2ac32df281aaba0850db1c173d0fc8208c2a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Wed, 19 Dec 2018 18:15:03 +0100 Subject: [PATCH] Fixes label index update race Label index and schema index updates are generated during application of each transaction and added to a queue to be applied using WorkSync at the at the end of the whole batch of transactions. Also concurrently applying transaction batches may apply their changes in the same "session", with the help of WorkSync. Order which transactions apply are coordinated using the high-level locks which are released per transaction after being fully applied. The exception to this is node creation, which doesn't acquire high-level lock. The saving grace has been, perhaps unknowingly the lower-level LockService, which is used only during commit and was introduced to allow safe property reading in the face of concurrent property updates. Since some time back that has been replaced by the safe-zone approach, but still remains for background index population, since index population cannot be tied to specific transaction id, which safe-zone revolves around. The LockService guarded for e.g. node CREATE followed by DELETE being reordered to DELETE followed by CREATE. Almost. It turns out that those locks were released before applying label index and schema index updates. This allowed a race where transactions could queue updates to the WorkSync in the wrong order, allowing the unexpected reordering mentioned above. The fix here is to hold the locks slightly longer so that closing of appliers happens before releasing them. This necessitates the LockGroup to be instantiated before the appliers, which means the locks will be held throughout a whole transaction batch, as opposed to held only for a single transaction in a batch. The only effect this has is to potentially delay some index population job currently running. A note here is that embedded mode never applies batches of sizes larger than 1, but clustered setups do. --- .../recordstorage/RecordStorageEngine.java | 12 +-- .../command/NeoStoreTransactionApplier.java | 8 -- .../RecordStorageEngineTest.java | 91 ++++++++++++++++++- 3 files changed, 94 insertions(+), 17 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java index 5751b1f567c7..44a2f9072959 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java @@ -318,18 +318,16 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro { // Have these command appliers as separate try-with-resource to have better control over // point between closing this and the locks above - try ( BatchTransactionApplier batchApplier = applier( mode ) ) + try ( LockGroup locks = new LockGroup(); + BatchTransactionApplier batchApplier = applier( mode ) ) { while ( batch != null ) { - try ( LockGroup locks = new LockGroup() ) + try ( TransactionApplier txApplier = batchApplier.startTx( batch, locks ) ) { - try ( TransactionApplier txApplier = batchApplier.startTx( batch, locks ) ) - { - batch.accept( txApplier ); - } - batch = batch.next(); + batch.accept( txApplier ); } + batch = batch.next(); } } catch ( Throwable cause ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/NeoStoreTransactionApplier.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/NeoStoreTransactionApplier.java index f54737bb9760..dae4735d6a29 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/NeoStoreTransactionApplier.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/NeoStoreTransactionApplier.java @@ -19,8 +19,6 @@ */ package org.neo4j.kernel.impl.transaction.command; -import java.io.IOException; - import org.neo4j.kernel.impl.api.TransactionApplier; import org.neo4j.kernel.impl.core.CacheAccessBackDoor; import org.neo4j.kernel.impl.locking.LockGroup; @@ -62,12 +60,6 @@ public NeoStoreTransactionApplier( Version version, NeoStores neoStores, CacheAc this.cacheAccess = cacheAccess; } - @Override - public void close() - { - lockGroup.close(); - } - @Override public boolean visitNodeCommand( Command.NodeCommand command ) { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java index c7a9be692c71..5f9f96606346 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.junit.rules.RuleChain; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import java.io.File; import java.io.IOException; @@ -32,10 +33,12 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import org.neo4j.helpers.Exceptions; +import org.neo4j.helpers.collection.Visitor; import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.DelegatingPageCache; @@ -45,15 +48,22 @@ import org.neo4j.kernel.impl.api.BatchTransactionApplier; import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade; import org.neo4j.kernel.impl.api.CountsAccessor; +import org.neo4j.kernel.impl.api.TransactionApplier; import org.neo4j.kernel.impl.api.TransactionToApply; +import org.neo4j.kernel.impl.locking.Lock; +import org.neo4j.kernel.impl.locking.LockGroup; +import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.impl.store.UnderlyingStorageException; import org.neo4j.kernel.impl.store.counts.CountsTracker; +import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.log.FakeCommitment; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.storageengine.api.CommandsToApply; +import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.test.rule.PageCacheRule; @@ -69,7 +79,9 @@ import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -199,13 +211,53 @@ public void shouldListAllStoreFiles() assertEquals( expectedStoreTypes, actualStoreTypes ); } + @Test + public void shouldCloseLockGroupAfterAppliers() throws Exception + { + // given + long nodeId = 5; + LockService lockService = mock( LockService.class ); + Lock nodeLock = mock( Lock.class ); + when( lockService.acquireNodeLock( nodeId, LockService.LockType.WRITE_LOCK ) ).thenReturn( nodeLock ); + Consumer applierCloseCall = mock( Consumer.class ); // <-- simply so that we can use InOrder mockito construct + CapturingBatchTransactionApplierFacade applier = new CapturingBatchTransactionApplierFacade( applierCloseCall ); + RecordStorageEngine engine = recordStorageEngineBuilder() + .lockService( lockService ) + .transactionApplierTransformer( applier::wrapAroundActualApplier ) + .build(); + CommandsToApply commandsToApply = mock( CommandsToApply.class ); + when( commandsToApply.accept( any() ) ).thenAnswer( invocationOnMock -> + { + // Visit one node command + Visitor visitor = invocationOnMock.getArgument( 0 ); + NodeRecord after = new NodeRecord( nodeId ); + after.setInUse( true ); + visitor.visit( new Command.NodeCommand( new NodeRecord( nodeId ), after ) ); + return null; + } ); + + // when + engine.apply( commandsToApply, TransactionApplicationMode.INTERNAL ); + + // then + InOrder inOrder = inOrder( lockService, applierCloseCall, nodeLock ); + inOrder.verify( lockService ).acquireNodeLock( nodeId, LockService.LockType.WRITE_LOCK ); + inOrder.verify( applierCloseCall ).accept( true ); + inOrder.verify( nodeLock, times( 1 ) ).release(); + inOrder.verifyNoMoreInteractions(); + } + private RecordStorageEngine buildRecordStorageEngine() + { + return recordStorageEngineBuilder().build(); + } + + private RecordStorageEngineRule.Builder recordStorageEngineBuilder() { return storageEngineRule .getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ) .storeDirectory( storeDir ) - .databaseHealth( databaseHealth ) - .build(); + .databaseHealth( databaseHealth ); } private Exception executeFailingTransaction( RecordStorageEngine engine ) throws IOException @@ -256,4 +308,39 @@ public void close() throws Exception } } + private class CapturingBatchTransactionApplierFacade extends BatchTransactionApplierFacade + { + private final Consumer applierCloseCall; + private BatchTransactionApplierFacade actual; + + CapturingBatchTransactionApplierFacade( Consumer applierCloseCall ) + { + this.applierCloseCall = applierCloseCall; + } + + CapturingBatchTransactionApplierFacade wrapAroundActualApplier( BatchTransactionApplierFacade actual ) + { + this.actual = actual; + return this; + } + + @Override + public TransactionApplier startTx( CommandsToApply transaction ) throws IOException + { + return actual.startTx( transaction ); + } + + @Override + public TransactionApplier startTx( CommandsToApply transaction, LockGroup lockGroup ) throws IOException + { + return actual.startTx( transaction, lockGroup ); + } + + @Override + public void close() throws Exception + { + applierCloseCall.accept( true ); + actual.close(); + } + } }