diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/IndexBatchTransactionApplier.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/IndexBatchTransactionApplier.java index 4eb55205eeedd..13eaddbc6e747 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/IndexBatchTransactionApplier.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/command/IndexBatchTransactionApplier.java @@ -85,8 +85,22 @@ public TransactionApplier startTx( CommandsToApply transaction ) return transactionApplier; } - private void applyIndexUpdates() throws IOException + private void applyPendingLabelAndIndexUpdates() throws IOException { + if ( labelUpdates != null ) + { + // Updates are sorted according to node id here, an artifact of node commands being sorted + // by node id when extracting from TransactionRecordState. + try + { + labelScanStoreSync.apply( new LabelUpdateWork( labelUpdates ) ); + } + catch ( ExecutionException e ) + { + throw new IOException( "Failed to flush label updates", e ); + } + labelUpdates = null; + } if ( indexUpdates != null && indexUpdates.hasUpdates() ) { try @@ -95,7 +109,7 @@ private void applyIndexUpdates() throws IOException } catch ( ExecutionException e ) { - throw new IOException( "Failed to flush index updates prior to applying schema change", e ); + throw new IOException( "Failed to flush index updates", e ); } indexUpdates = null; } @@ -104,16 +118,7 @@ private void applyIndexUpdates() throws IOException @Override public void close() throws Exception { - // Apply all the label updates within this whole batch of transactions. - if ( labelUpdates != null ) - { - // Updates are sorted according to node id here, an artifact of node commands being sorted - // by node id when extracting from TransactionRecordState. - labelScanStoreSync.apply( new LabelUpdateWork( labelUpdates ) ); - } - - // Apply all the index updates within this whole batch of transactions. - applyIndexUpdates(); + applyPendingLabelAndIndexUpdates(); } /** @@ -218,7 +223,7 @@ public boolean visitSchemaRuleCommand( Command.SchemaRuleCommand command ) throw // In that scenario the index would be created, populated and then fed the [this time duplicate] // update for the node created before the index. The most straight forward solution is to // apply pending index updates up to this point in this batch before index schema changes occur. - applyIndexUpdates(); + applyPendingLabelAndIndexUpdates(); switch ( command.getMode() ) { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/scan/InMemoryLabelScanStore.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/scan/InMemoryLabelScanStore.java index 5af70d0515214..03eca084c41d1 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/scan/InMemoryLabelScanStore.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/scan/InMemoryLabelScanStore.java @@ -220,7 +220,7 @@ public AllEntriesLabelScanReader allNodeLabelRanges() @Override public long maxCount() { - return 0; + return nodesToLabels.size(); } @Override diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/CommandExtractor.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/CommandExtractor.java new file mode 100644 index 0000000000000..09cc77430fa09 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/CommandExtractor.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.command; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.neo4j.helpers.collection.Visitor; +import org.neo4j.storageengine.api.StorageCommand; + +public class CommandExtractor implements Visitor +{ + private final List commands = new ArrayList<>(); + + @Override + public boolean visit( StorageCommand element ) throws IOException + { + commands.add( element ); + return false; + } + + public List getCommands() + { + return commands; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/LabelAndIndexUpdateBatchingIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/LabelAndIndexUpdateBatchingIT.java new file mode 100644 index 0000000000000..cee80f315e447 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/command/LabelAndIndexUpdateBatchingIT.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.command; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.impl.api.TransactionCommitProcess; +import org.neo4j.kernel.impl.api.TransactionToApply; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; +import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand; +import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; +import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException; +import org.neo4j.kernel.impl.transaction.log.TransactionCursor; +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.storageengine.api.StorageCommand; +import org.neo4j.test.TestGraphDatabaseFactory; +import static org.junit.Assert.assertNotNull; + +import static java.util.stream.Collectors.toList; + +import static org.neo4j.helpers.collection.Iterators.singleOrNull; +import static org.neo4j.kernel.impl.transaction.tracing.CommitEvent.NULL; +import static org.neo4j.storageengine.api.TransactionApplicationMode.EXTERNAL; + +/** + * This test is for an issue with transaction batching where there would be a batch of transactions + * to be applied in the same batch; the batch containing a creation of node N with label L and property P. + * Later in that batch there would be a uniqueness constraint created for label L and property P. + * The number of nodes matching this constraint would be few and so the label scan store would be selected + * to drive the population of the index. Problem is that the label update for N would still sit in + * the batch state, to be applied at the end of the batch. Hence the node would be forgotten when the + * index was being built. + */ +public class LabelAndIndexUpdateBatchingIT +{ + private static final String PROPERTY_KEY = "key"; + private static final Label LABEL = Label.label( "label" ); + + @Test + public void indexShouldIncludeNodesCreatedPreviouslyInBatch() throws Exception + { + // GIVEN a transaction stream leading up to this issue + // perform the transactions from db-level and extract the transactions as commands + // so that they can be applied batch-wise they way we'd like to later. + + // a bunch of nodes (to have the index population later on to decide to use label scan for population) + List transactions; + GraphDatabaseAPI db = (GraphDatabaseAPI) new TestGraphDatabaseFactory().newImpermanentDatabase(); + String nodeN = "our guy"; + String otherNode = "just to create the tokens"; + try + { + try ( Transaction tx = db.beginTx() ) + { + db.createNode( LABEL ).setProperty( PROPERTY_KEY, otherNode ); + for ( int i = 0; i < 10_000; i++ ) + { + db.createNode(); + } + tx.success(); + } + // node N + try ( Transaction tx = db.beginTx() ) + { + db.createNode( LABEL ).setProperty( PROPERTY_KEY, nodeN ); + tx.success(); + } + // uniqueness constraint affecting N + try ( Transaction tx = db.beginTx() ) + { + db.schema().constraintFor( LABEL ).assertPropertyIsUnique( PROPERTY_KEY ).create(); + tx.success(); + } + transactions = extractTransactions( db ); + } + finally + { + db.shutdown(); + } + + db = (GraphDatabaseAPI) new TestGraphDatabaseFactory().newImpermanentDatabase(); + TransactionCommitProcess commitProcess = + db.getDependencyResolver().resolveDependency( TransactionCommitProcess.class ); + try + { + int cutoffIndex = findCutoffIndex( transactions ); + commitProcess.commit( toApply( transactions.subList( 0, cutoffIndex ) ), NULL, EXTERNAL ); + + // WHEN applying the two transactions (node N and the constraint) in the same batch + commitProcess.commit( toApply( transactions.subList( cutoffIndex, transactions.size() ) ), NULL, EXTERNAL ); + + // THEN node N should've ended up in the index too + try ( Transaction tx = db.beginTx() ) + { + assertNotNull( "Verification node not found", + singleOrNull( db.findNodes( LABEL, PROPERTY_KEY, otherNode ) ) ); // just to verify + assertNotNull( "Node N not found", + singleOrNull( db.findNodes( LABEL, PROPERTY_KEY, nodeN ) ) ); + tx.success(); + } + } + finally + { + db.shutdown(); + } + + } + + private static int findCutoffIndex( Collection transactions ) throws IOException + { + Iterator iterator = transactions.iterator(); + for ( int i = 0; iterator.hasNext(); i++ ) + { + TransactionRepresentation tx = iterator.next(); + CommandExtractor extractor = new CommandExtractor(); + tx.accept( extractor ); + List commands = extractor.getCommands(); + List nodeCommands = commands.stream() + .filter( command -> command instanceof NodeCommand ).collect( toList() ); + if ( nodeCommands.size() == 1 ) + { + return i; + } + } + throw new AssertionError( "Couldn't find the transaction which would be the cut-off point" ); + } + + private static TransactionToApply toApply( Collection transactions ) + { + TransactionToApply first = null, last = null; + for ( TransactionRepresentation transactionRepresentation : transactions ) + { + TransactionToApply transaction = new TransactionToApply( transactionRepresentation ); + if ( first == null ) + { + first = last = transaction; + } + else + { + last.next( transaction ); + last = transaction; + } + } + return first; + } + + private static List extractTransactions( GraphDatabaseAPI db ) + throws NoSuchTransactionException, IOException + { + LogicalTransactionStore txStore = db.getDependencyResolver().resolveDependency( LogicalTransactionStore.class ); + List transactions = new ArrayList<>(); + try ( TransactionCursor cursor = txStore.getTransactions( TransactionIdStore.BASE_TX_ID + 1 ) ) + { + cursor.forAll( tx -> transactions.add( tx.getTransactionRepresentation() ) ); + } + return transactions; + } +}