Skip to content

Commit

Permalink
Acquires shared SCHEMA lock after-the-fact on master accepting slave …
Browse files Browse the repository at this point in the history
…commit

This because in HA the shared SCHEMA lock isn't acquired on the master,
only local on each slave. Now that the master no longer holds the exclusive SCHEMA
lock during index population for uniqueness constraint there may be a race
when activating the constraint (after index has been fully built) and concurrent
slave transactions potentially violating the constraint.
  • Loading branch information
tinwelint committed Feb 20, 2017
1 parent 4c1fd59 commit 5b762e8
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 21 deletions.
Expand Up @@ -103,9 +103,9 @@ public long transactionId()
} }


@Override @Override
public void accept( Visitor<StorageCommand,IOException> visitor ) throws IOException public boolean accept( Visitor<StorageCommand,IOException> visitor ) throws IOException
{ {
transactionRepresentation.accept( visitor ); return transactionRepresentation.accept( visitor );
} }


public TransactionRepresentation transactionRepresentation() public TransactionRepresentation transactionRepresentation()
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.kernel.impl.api.BatchTransactionApplier; import org.neo4j.kernel.impl.api.BatchTransactionApplier;
import org.neo4j.kernel.impl.api.TransactionApplier; import org.neo4j.kernel.impl.api.TransactionApplier;
import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand; import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand; import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;
import org.neo4j.storageengine.api.CommandsToApply; import org.neo4j.storageengine.api.CommandsToApply;
Expand Down Expand Up @@ -70,14 +69,14 @@ public void close() throws Exception
public boolean visitNodeCommand( NodeCommand command ) throws IOException public boolean visitNodeCommand( NodeCommand command ) throws IOException
{ {
nodeCommandsById.put( command.getKey(), command ); nodeCommandsById.put( command.getKey(), command );
if ( !hasUpdates && hasLabelChanges( command ) ) if ( !hasUpdates && mayResultInIndexUpdates( command ) )
{ {
hasUpdates = true; hasUpdates = true;
} }
return false; return false;
} }


private boolean hasLabelChanges( NodeCommand command ) public static boolean mayResultInIndexUpdates( NodeCommand command )
{ {
long before = command.getBefore().getLabelField(); long before = command.getBefore().getLabelField();
long after = command.getAfter().getLabelField(); long after = command.getAfter().getLabelField();
Expand All @@ -88,11 +87,15 @@ private boolean hasLabelChanges( NodeCommand command )


} }


public static boolean mayResultInIndexUpdates( PropertyCommand command )
{
return command.getAfter().isNodeSet();
}

@Override @Override
public boolean visitPropertyCommand( PropertyCommand command ) throws IOException public boolean visitPropertyCommand( PropertyCommand command ) throws IOException
{ {
PropertyRecord record = command.getAfter(); if ( mayResultInIndexUpdates( command ) )
if ( record.isNodeSet() )
{ {
long nodeId = command.getAfter().getNodeId(); long nodeId = command.getAfter().getNodeId();
List<PropertyCommand> group = propertyCommandsByNodeIds.get( nodeId ); List<PropertyCommand> group = propertyCommandsByNodeIds.get( nodeId );
Expand Down
Expand Up @@ -62,15 +62,16 @@ public void setHeader( byte[] additionalHeader, int masterId, int authorId, long
} }


@Override @Override
public void accept( Visitor<StorageCommand,IOException> visitor ) throws IOException public boolean accept( Visitor<StorageCommand,IOException> visitor ) throws IOException
{ {
for ( StorageCommand command : commands ) for ( StorageCommand command : commands )
{ {
if ( visitor.visit( command ) ) if ( visitor.visit( command ) )
{ {
return; return true;
} }
} }
return false;
} }


@Override @Override
Expand Down
Expand Up @@ -32,7 +32,8 @@ public interface CommandStream
/** /**
* Accepts a visitor into the commands making up this transaction. * Accepts a visitor into the commands making up this transaction.
* @param visitor {@link Visitor} which will see the commands. * @param visitor {@link Visitor} which will see the commands.
* @return {@code true} if any {@link StorageCommand} visited returned {@code true}, otherwise {@code false}.
* @throws IOException if there were any problem reading the commands. * @throws IOException if there were any problem reading the commands.
*/ */
void accept( Visitor<StorageCommand,IOException> visitor ) throws IOException; boolean accept( Visitor<StorageCommand,IOException> visitor ) throws IOException;
} }
Expand Up @@ -19,46 +19,69 @@
*/ */
package org.neo4j.kernel.ha; package org.neo4j.kernel.ha;


import java.io.IOException;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.transaction.TransactionPropagator; import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator; import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.TransactionApplicationMode;


import static org.neo4j.kernel.impl.api.index.NodePropertyCommandsExtractor.mayResultInIndexUpdates;
import static org.neo4j.kernel.impl.locking.ResourceTypes.SCHEMA;
import static org.neo4j.kernel.impl.locking.ResourceTypes.schemaResource;

/** /**
* Commit process on the master side in HA, where transactions either comes in from slaves committing, * Commit process on the master side in HA, where transactions either comes in from slaves committing,
* or gets created and committed directly on the master. * or gets created and committed directly on the master.
*/ */
public class MasterTransactionCommitProcess implements TransactionCommitProcess public class MasterTransactionCommitProcess implements TransactionCommitProcess
{ {
/**
* Detector of transactions coming in from slaves which should acquire the shared schema lock before
* being applied (and validated for application).
*/
private static final Visitor<StorageCommand,IOException> REQUIRES_SHARED_SCHEMA_LOCK = command ->
command instanceof NodeCommand && mayResultInIndexUpdates( (NodeCommand) command ) ||
command instanceof PropertyCommand && mayResultInIndexUpdates( (PropertyCommand) command );

private final TransactionCommitProcess inner; private final TransactionCommitProcess inner;
private final TransactionPropagator txPropagator; private final TransactionPropagator txPropagator;
private final IntegrityValidator validator; private final IntegrityValidator validator;
private final Monitor monitor; private final Monitor monitor;
private final Locks locks;


public interface Monitor public interface Monitor
{ {
void missedReplicas( int number ); void missedReplicas( int number );
} }


public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, TransactionPropagator txPropagator, public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, TransactionPropagator txPropagator,
IntegrityValidator validator, Monitor monitor ) IntegrityValidator validator, Monitor monitor, Locks locks )
{ {
this.inner = commitProcess; this.inner = commitProcess;
this.txPropagator = txPropagator; this.txPropagator = txPropagator;
this.validator = validator; this.validator = validator;
this.monitor = monitor; this.monitor = monitor;
this.locks = locks;
} }


@Override @Override
public long commit( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode ) public long commit( TransactionToApply batch, CommitEvent commitEvent, TransactionApplicationMode mode )
throws TransactionFailureException throws TransactionFailureException
{ {
validate( batch ); long result;

try ( Locks.Client locks = validate( batch ) )
long result = inner.commit( batch, commitEvent, mode ); {
result = inner.commit( batch, commitEvent, mode );
}


// Assuming all the transactions come from the same author // Assuming all the transactions come from the same author
int missedReplicas = txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() ); int missedReplicas = txPropagator.committed( result, batch.transactionRepresentation().getAuthorId() );
Expand All @@ -71,13 +94,68 @@ public long commit( TransactionToApply batch, CommitEvent commitEvent, Transacti
return result; return result;
} }


private void validate( TransactionToApply batch ) throws TransactionFailureException private Locks.Client validate( TransactionToApply batch ) throws TransactionFailureException
{ {
while ( batch != null ) Locks.Client locks = null;
boolean success = false;
try
{
while ( batch != null )
{
locks = acquireSharedSchemaLockIfTransactionResultsInIndexUpdates( batch, locks );
validator.validateTransactionStartKnowledge(
batch.transactionRepresentation().getLatestCommittedTxWhenStarted() );
batch = batch.next();
}
success = true;
return locks;
}
finally
{
if ( !success )
{
locks.close();
locks = null;
}
}
}

/**
* Looks at the transaction coming from slave and decide whether or not the shared schema lock
* should be acquired before letting it apply.
* <p>
* In HA the shared schema lock isn't acquired on the master. This has been fine due to other
* factors and guards being in place. However this was introduced when releasing the exclusive schema lock
* during index population when creating a uniqueness constraint. This added locking guards for race
* between constraint creating transaction (on master) and concurrent slave transactions which may
* result in index updates for that constraint, and potentially break it.
*
* @param batch {@link TransactionToApply} to apply, only HEAD since linked list looping is done outside.
* @param locks potentially existing locks client, otherwise this method will create and return
* if there's a need to acquire a lock.
* @return either, if {@code locks} is non-null then the same instance, or if {@code locks} is null
* and some locking is required then a new locks instance.
* @throws TransactionFailureException on failure to read transaction.
*/
private Locks.Client acquireSharedSchemaLockIfTransactionResultsInIndexUpdates( TransactionToApply batch,
Locks.Client locks ) throws TransactionFailureException
{
try
{
if ( batch.accept( REQUIRES_SHARED_SCHEMA_LOCK ) )
{
if ( locks == null )
{
locks = this.locks.newClient();
}
locks.acquireShared( SCHEMA, schemaResource() );
}
return locks;
}
catch ( IOException e )
{ {
validator.validateTransactionStartKnowledge( throw new TransactionFailureException(
batch.transactionRepresentation().getLatestCommittedTxWhenStarted() ); "Weird error when trying to figure out whether or not to acquire shared schema lock", e );
batch = batch.next();
} }
} }
} }
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.kernel.ha.transaction.TransactionPropagator; import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator; import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
Expand All @@ -40,15 +41,18 @@ public class CommitProcessSwitcher extends AbstractComponentSwitcher<Transaction
private final RequestContextFactory requestContextFactory; private final RequestContextFactory requestContextFactory;
private final DependencyResolver dependencyResolver; private final DependencyResolver dependencyResolver;
private final MasterTransactionCommitProcess.Monitor monitor; private final MasterTransactionCommitProcess.Monitor monitor;
private final Locks locks;


public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master, public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master,
DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory, DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory,
Locks locks,
Monitors monitors, DependencyResolver dependencyResolver ) Monitors monitors, DependencyResolver dependencyResolver )
{ {
super( delegate ); super( delegate );
this.txPropagator = txPropagator; this.txPropagator = txPropagator;
this.master = master; this.master = master;
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.locks = locks;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
this.monitor = monitors.newMonitor( MasterTransactionCommitProcess.Monitor.class ); this.monitor = monitors.newMonitor( MasterTransactionCommitProcess.Monitor.class );
} }
Expand All @@ -67,6 +71,6 @@ protected TransactionCommitProcess getMasterImpl()
dependencyResolver.resolveDependency( StorageEngine.class ) ); dependencyResolver.resolveDependency( StorageEngine.class ) );


IntegrityValidator validator = dependencyResolver.resolveDependency( IntegrityValidator.class ); IntegrityValidator validator = dependencyResolver.resolveDependency( IntegrityValidator.class );
return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor ); return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor, locks );
} }
} }
Expand Up @@ -606,7 +606,7 @@ private CommitProcessFactory createCommitProcessFactory( Dependencies dependenci
TransactionCommitProcess.class ); TransactionCommitProcess.class );


CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator, CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator,
master, commitProcessDelegate, requestContextFactory, monitors, dependencies ); master, commitProcessDelegate, requestContextFactory, lockManager, monitors, dependencies );
componentSwitcherContainer.add( commitProcessSwitcher ); componentSwitcherContainer.add( commitProcessSwitcher );


return new HighlyAvailableCommitProcessFactory( commitProcessDelegate ); return new HighlyAvailableCommitProcessFactory( commitProcessDelegate );
Expand Down

0 comments on commit 5b762e8

Please sign in to comment.