Skip to content

Commit

Permalink
Flushes label updates before schema changes in batch applier
Browse files Browse the repository at this point in the history
when making a schema change, e.g. creating an index it may be that the
label scan store is selected to drive the index population. If so then
all label updates needs to be in the label scan store, otherwise they
will be missed.

Previously only index updates were flushed before schema changes,
this commit also flushes label changes when this happens so that index
population will see all updates.
  • Loading branch information
tinwelint committed Dec 16, 2016
1 parent 86c6132 commit 7fe7a8d
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 14 deletions.
Expand Up @@ -85,8 +85,22 @@ public TransactionApplier startTx( CommandsToApply transaction )
return transactionApplier;
}

private void applyIndexUpdates() throws IOException
private void applyPendingLabelAndIndexUpdates() throws IOException
{
if ( labelUpdates != null )
{
// Updates are sorted according to node id here, an artifact of node commands being sorted
// by node id when extracting from TransactionRecordState.
try
{
labelScanStoreSync.apply( new LabelUpdateWork( labelUpdates ) );
}
catch ( ExecutionException e )
{
throw new IOException( "Failed to flush label updates", e );
}
labelUpdates = null;
}
if ( indexUpdates != null && indexUpdates.hasUpdates() )
{
try
Expand All @@ -95,7 +109,7 @@ private void applyIndexUpdates() throws IOException
}
catch ( ExecutionException e )
{
throw new IOException( "Failed to flush index updates prior to applying schema change", e );
throw new IOException( "Failed to flush index updates", e );
}
indexUpdates = null;
}
Expand All @@ -104,16 +118,7 @@ private void applyIndexUpdates() throws IOException
@Override
public void close() throws Exception
{
// Apply all the label updates within this whole batch of transactions.
if ( labelUpdates != null )
{
// Updates are sorted according to node id here, an artifact of node commands being sorted
// by node id when extracting from TransactionRecordState.
labelScanStoreSync.apply( new LabelUpdateWork( labelUpdates ) );
}

// Apply all the index updates within this whole batch of transactions.
applyIndexUpdates();
applyPendingLabelAndIndexUpdates();
}

/**
Expand Down Expand Up @@ -218,7 +223,7 @@ public boolean visitSchemaRuleCommand( Command.SchemaRuleCommand command ) throw
// In that scenario the index would be created, populated and then fed the [this time duplicate]
// update for the node created before the index. The most straight forward solution is to
// apply pending index updates up to this point in this batch before index schema changes occur.
applyIndexUpdates();
applyPendingLabelAndIndexUpdates();

switch ( command.getMode() )
{
Expand Down
Expand Up @@ -220,7 +220,7 @@ public AllEntriesLabelScanReader allNodeLabelRanges()
@Override
public long maxCount()
{
return 0;
return nodesToLabels.size();
}

@Override
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.command;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.storageengine.api.StorageCommand;

public class CommandExtractor implements Visitor<StorageCommand,IOException>
{
private final List<StorageCommand> commands = new ArrayList<>();

@Override
public boolean visit( StorageCommand element ) throws IOException
{
commands.add( element );
return false;
}

public List<StorageCommand> getCommands()
{
return commands;
}
}
@@ -0,0 +1,183 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.command;

import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.test.TestGraphDatabaseFactory;
import static org.junit.Assert.assertNotNull;

import static java.util.stream.Collectors.toList;

import static org.neo4j.helpers.collection.Iterators.singleOrNull;
import static org.neo4j.kernel.impl.transaction.tracing.CommitEvent.NULL;
import static org.neo4j.storageengine.api.TransactionApplicationMode.EXTERNAL;

/**
* This test is for an issue with transaction batching where there would be a batch of transactions
* to be applied in the same batch; the batch containing a creation of node N with label L and property P.
* Later in that batch there would be a uniqueness constraint created for label L and property P.
* The number of nodes matching this constraint would be few and so the label scan store would be selected
* to drive the population of the index. Problem is that the label update for N would still sit in
* the batch state, to be applied at the end of the batch. Hence the node would be forgotten when the
* index was being built.
*/
public class LabelAndIndexUpdateBatchingIT
{
private static final String PROPERTY_KEY = "key";
private static final Label LABEL = Label.label( "label" );

@Test
public void indexShouldIncludeNodesCreatedPreviouslyInBatch() throws Exception
{
// GIVEN a transaction stream leading up to this issue
// perform the transactions from db-level and extract the transactions as commands
// so that they can be applied batch-wise they way we'd like to later.

// a bunch of nodes (to have the index population later on to decide to use label scan for population)
List<TransactionRepresentation> transactions;
GraphDatabaseAPI db = (GraphDatabaseAPI) new TestGraphDatabaseFactory().newImpermanentDatabase();
String nodeN = "our guy";
String otherNode = "just to create the tokens";
try
{
try ( Transaction tx = db.beginTx() )
{
db.createNode( LABEL ).setProperty( PROPERTY_KEY, otherNode );
for ( int i = 0; i < 10_000; i++ )
{
db.createNode();
}
tx.success();
}
// node N
try ( Transaction tx = db.beginTx() )
{
db.createNode( LABEL ).setProperty( PROPERTY_KEY, nodeN );
tx.success();
}
// uniqueness constraint affecting N
try ( Transaction tx = db.beginTx() )
{
db.schema().constraintFor( LABEL ).assertPropertyIsUnique( PROPERTY_KEY ).create();
tx.success();
}
transactions = extractTransactions( db );
}
finally
{
db.shutdown();
}

db = (GraphDatabaseAPI) new TestGraphDatabaseFactory().newImpermanentDatabase();
TransactionCommitProcess commitProcess =
db.getDependencyResolver().resolveDependency( TransactionCommitProcess.class );
try
{
int cutoffIndex = findCutoffIndex( transactions );
commitProcess.commit( toApply( transactions.subList( 0, cutoffIndex ) ), NULL, EXTERNAL );

// WHEN applying the two transactions (node N and the constraint) in the same batch
commitProcess.commit( toApply( transactions.subList( cutoffIndex, transactions.size() ) ), NULL, EXTERNAL );

// THEN node N should've ended up in the index too
try ( Transaction tx = db.beginTx() )
{
assertNotNull( "Verification node not found",
singleOrNull( db.findNodes( LABEL, PROPERTY_KEY, otherNode ) ) ); // just to verify
assertNotNull( "Node N not found",
singleOrNull( db.findNodes( LABEL, PROPERTY_KEY, nodeN ) ) );
tx.success();
}
}
finally
{
db.shutdown();
}

}

private static int findCutoffIndex( Collection<TransactionRepresentation> transactions ) throws IOException
{
Iterator<TransactionRepresentation> iterator = transactions.iterator();
for ( int i = 0; iterator.hasNext(); i++ )
{
TransactionRepresentation tx = iterator.next();
CommandExtractor extractor = new CommandExtractor();
tx.accept( extractor );
List<StorageCommand> commands = extractor.getCommands();
List<StorageCommand> nodeCommands = commands.stream()
.filter( command -> command instanceof NodeCommand ).collect( toList() );
if ( nodeCommands.size() == 1 )
{
return i;
}
}
throw new AssertionError( "Couldn't find the transaction which would be the cut-off point" );
}

private static TransactionToApply toApply( Collection<TransactionRepresentation> transactions )
{
TransactionToApply first = null, last = null;
for ( TransactionRepresentation transactionRepresentation : transactions )
{
TransactionToApply transaction = new TransactionToApply( transactionRepresentation );
if ( first == null )
{
first = last = transaction;
}
else
{
last.next( transaction );
last = transaction;
}
}
return first;
}

private static List<TransactionRepresentation> extractTransactions( GraphDatabaseAPI db )
throws NoSuchTransactionException, IOException
{
LogicalTransactionStore txStore = db.getDependencyResolver().resolveDependency( LogicalTransactionStore.class );
List<TransactionRepresentation> transactions = new ArrayList<>();
try ( TransactionCursor cursor = txStore.getTransactions( TransactionIdStore.BASE_TX_ID + 1 ) )
{
cursor.forAll( tx -> transactions.add( tx.getTransactionRepresentation() ) );
}
return transactions;
}
}

0 comments on commit 7fe7a8d

Please sign in to comment.