Skip to content

Commit

Permalink
Additional logging when commands ignored by state machines.
Browse files Browse the repository at this point in the history
Having this information in messages.log allows to debug and reason
about the replay process.
  • Loading branch information
apcj committed Jan 28, 2016
1 parent 6cdc110 commit 3a4eecb
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 140 deletions.
Expand Up @@ -44,11 +44,11 @@ public interface RaftLog extends ReadableRaftLog

interface Listener
{
void onAppended( ReplicatedContent content, long index );
void onAppended( ReplicatedContent content, long logIndex );

void onCommitted( ReplicatedContent content, long index );
void onCommitted( ReplicatedContent content, long logIndex );

void onTruncated( long fromIndex );
void onTruncated( long fromLogIndex );
}

/**
Expand Down
Expand Up @@ -79,38 +79,54 @@ public RaftMembershipManager( Replicator replicator, RaftGroup.Builder<MEMBER> m
}

@Override
public void onAppended( ReplicatedContent content, long index )
public void onAppended( ReplicatedContent content, long logIndex )
{
if ( content instanceof RaftGroup && index > raftMembershipState.logIndex() )
if ( content instanceof RaftGroup)
{
assert uncommittedMemberChanges >= 0;
if ( logIndex > raftMembershipState.logIndex() )
{
assert uncommittedMemberChanges >= 0;

uncommittedMemberChanges++;
uncommittedMemberChanges++;

RaftGroup<MEMBER> raftGroup = (RaftGroup) content;
raftMembershipState.setVotingMembers( raftGroup.getMembers() );
RaftGroup<MEMBER> raftGroup = (RaftGroup) content;
raftMembershipState.setVotingMembers( raftGroup.getMembers() );
}
else
{
log.info( "Ignoring content at index %d, since already appended up to %d",
logIndex, raftMembershipState.logIndex() );
}
}
}

@Override
public void onCommitted( ReplicatedContent content, long index )
public void onCommitted( ReplicatedContent content, long logIndex )
{
if ( content instanceof RaftGroup && index > raftMembershipState.logIndex() )
if ( content instanceof RaftGroup )
{
assert uncommittedMemberChanges > 0;
if ( logIndex > raftMembershipState.logIndex() )
{
assert uncommittedMemberChanges > 0;

uncommittedMemberChanges--;
uncommittedMemberChanges--;

if ( uncommittedMemberChanges == 0 )
if ( uncommittedMemberChanges == 0 )
{
membershipStateMachine.onRaftGroupCommitted();
}
raftMembershipState.logIndex( logIndex );
}
else
{
membershipStateMachine.onRaftGroupCommitted();
log.info( "Ignoring content at index %d, since already committed up to %d",
logIndex, raftMembershipState.logIndex() );
}
raftMembershipState.logIndex( index );
}
}

@Override
public void onTruncated( long fromIndex )
public void onTruncated( long fromLogIndex )
{
try
{
Expand Down
Expand Up @@ -77,21 +77,21 @@ public void unsubscribe( ReplicatedContentListener listener )
}

@Override
public void onAppended( ReplicatedContent content, long index )
public void onAppended( ReplicatedContent content, long logIndex )
{
}

@Override
public void onCommitted( ReplicatedContent content, long index )
public void onCommitted( ReplicatedContent content, long logIndex )
{
for ( ReplicatedContentListener listener : listeners )
{
listener.onReplicated( content, index );
listener.onReplicated( content, logIndex );
}
}

@Override
public void onTruncated( long fromIndex )
public void onTruncated( long fromLogIndex )
{
}
}
Expand Up @@ -24,6 +24,8 @@
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.EMPTY_LONG_ARRAY;

Expand All @@ -46,11 +48,14 @@ public class ReplicatedIdAllocationStateMachine implements Replicator.Replicated
{
private final CoreMember me;
private final IdAllocationState idAllocationState;
private final Log log;

public ReplicatedIdAllocationStateMachine( CoreMember me, IdAllocationState idAllocationState )
public ReplicatedIdAllocationStateMachine( CoreMember me, IdAllocationState idAllocationState,
LogProvider logProvider )
{
this.me = me;
this.idAllocationState = idAllocationState;
this.log = logProvider.getLog( getClass() );
}

public synchronized long getFirstNotAllocated( IdType idType )
Expand Down Expand Up @@ -87,27 +92,35 @@ private void updateFirstNotAllocated( IdType idType, long idRangeEnd )
@Override
public synchronized void onReplicated( ReplicatedContent content, long logIndex )
{
if ( content instanceof ReplicatedIdAllocationRequest && logIndex > idAllocationState.logIndex() )
if ( content instanceof ReplicatedIdAllocationRequest )
{
ReplicatedIdAllocationRequest request = (ReplicatedIdAllocationRequest) content;
if ( logIndex > idAllocationState.logIndex() )
{
ReplicatedIdAllocationRequest request = (ReplicatedIdAllocationRequest) content;

IdType idType = request.idType();
IdType idType = request.idType();

if ( request.idRangeStart() == idAllocationState.firstUnallocated( idType ) )
{
if ( request.owner().equals( me ) )
if ( request.idRangeStart() == idAllocationState.firstUnallocated( idType ) )
{
idAllocationState.lastIdRangeStart( idType, request.idRangeStart() );
idAllocationState.lastIdRangeLength( idType, request.idRangeLength() );
}
updateFirstNotAllocated( idType, request.idRangeStart() + request.idRangeLength() );
if ( request.owner().equals( me ) )
{
idAllocationState.lastIdRangeStart( idType, request.idRangeStart() );
idAllocationState.lastIdRangeLength( idType, request.idRangeLength() );
}
updateFirstNotAllocated( idType, request.idRangeStart() + request.idRangeLength() );

}
/*
* We update regardless of whether this content was meant for us or not. Even if it isn't content we
* care about, any content before it has already been applied so it is safe to ignore.
*/
idAllocationState.logIndex( logIndex );
}
else
{
log.info( "Ignoring content at index %d, since already applied up to %d",
logIndex, idAllocationState.logIndex() );
}
/*
* We update regardless of whether this content was meant for us or not. Even if it isn't content we
* care about, any content before it has already been applied so it is safe to ignore.
*/
idAllocationState.logIndex( logIndex );
}
}

Expand Down
Expand Up @@ -30,14 +30,16 @@
import org.neo4j.kernel.impl.transaction.state.Loaders;
import org.neo4j.kernel.impl.transaction.state.RecordAccess;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.Token;

public class ReplicatedLabelTokenHolder extends ReplicatedTokenHolder<Token,LabelTokenRecord> implements LabelTokenHolder
{
public ReplicatedLabelTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies,
long timeoutMillis)
public ReplicatedLabelTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory,
Dependencies dependencies, long timeoutMillis, LogProvider logProvider )
{
super( replicator, idGeneratorFactory, IdType.LABEL_TOKEN, dependencies, new Token.Factory(), TokenType.LABEL, timeoutMillis );
super( replicator, idGeneratorFactory, IdType.LABEL_TOKEN,
dependencies, new Token.Factory(), TokenType.LABEL, timeoutMillis, logProvider );
}

@Override
Expand Down
Expand Up @@ -30,13 +30,16 @@
import org.neo4j.kernel.impl.transaction.state.Loaders;
import org.neo4j.kernel.impl.transaction.state.RecordAccess;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.Token;

public class ReplicatedPropertyKeyTokenHolder extends ReplicatedTokenHolder<Token,PropertyKeyTokenRecord> implements PropertyKeyTokenHolder
{
public ReplicatedPropertyKeyTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies, long timeoutMillis )
public ReplicatedPropertyKeyTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory,
Dependencies dependencies, long timeoutMillis, LogProvider logProvider )
{
super( replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, dependencies, new Token.Factory(), TokenType.PROPERTY, timeoutMillis );
super( replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN,
dependencies, new Token.Factory(), TokenType.PROPERTY, timeoutMillis, logProvider );
}

@Override
Expand Down
Expand Up @@ -31,12 +31,15 @@
import org.neo4j.kernel.impl.transaction.state.Loaders;
import org.neo4j.kernel.impl.transaction.state.RecordAccess;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.logging.LogProvider;

public class ReplicatedRelationshipTypeTokenHolder extends ReplicatedTokenHolder<RelationshipTypeToken,RelationshipTypeTokenRecord> implements RelationshipTypeTokenHolder
{
public ReplicatedRelationshipTypeTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies, long timeoutMillis )
public ReplicatedRelationshipTypeTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory,
Dependencies dependencies, long timeoutMillis, LogProvider logProvider )
{
super( replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies, new RelationshipTypeToken.Factory(), TokenType.RELATIONSHIP, timeoutMillis );
super( replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies,
new RelationshipTypeToken.Factory(), TokenType.RELATIONSHIP, timeoutMillis, logProvider );
}

@Override
Expand Down
Expand Up @@ -52,6 +52,8 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.collection.NoSuchEntryException;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.Token;
Expand All @@ -75,13 +77,14 @@ public abstract class ReplicatedTokenHolder<TOKEN extends Token, RECORD extends
private final long timeoutMillis;

private final TokenFutures tokenFutures = new TokenFutures();
private final Log log;
private long lastCommittedIndex = Long.MAX_VALUE;

// TODO: Clean up all the resolving, which now happens every time with special selection strategies.

public ReplicatedTokenHolder( Replicator replicator, IdGeneratorFactory idGeneratorFactory, IdType tokenIdType,
Dependencies dependencies, TokenFactory<TOKEN> tokenFactory, TokenType type,
long timeoutMillis)
Dependencies dependencies, TokenFactory<TOKEN> tokenFactory, TokenType type,
long timeoutMillis, LogProvider logProvider )
{
this.replicator = replicator;
this.idGeneratorFactory = idGeneratorFactory;
Expand All @@ -91,6 +94,7 @@ public ReplicatedTokenHolder( Replicator replicator, IdGeneratorFactory idGenera
this.type = type;
this.timeoutMillis = timeoutMillis;
this.tokenCache = new InMemoryTokenCache<>( this.getClass() );
this.log = logProvider.getLog( getClass() );
}

@Override
Expand Down Expand Up @@ -209,32 +213,36 @@ public Iterable<TOKEN> getAllTokens()
@Override
public void onReplicated( ReplicatedContent content, long logIndex )
{
if ( logIndex <= lastCommittedIndex )
{
return;
}
if ( content instanceof ReplicatedTokenRequest && ((ReplicatedTokenRequest) content).type().equals( type ) )
{
ReplicatedTokenRequest tokenRequest = (ReplicatedTokenRequest) content;
if ( logIndex > lastCommittedIndex )
{
ReplicatedTokenRequest tokenRequest = (ReplicatedTokenRequest) content;

Integer tokenId = tokenCache.getId( tokenRequest.tokenName() );
Integer tokenId = tokenCache.getId( tokenRequest.tokenName() );

if ( tokenId == null )
{
try
if ( tokenId == null )
{
Collection<StorageCommand> commands = ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() );
tokenId = applyToStore( commands, logIndex );
}
catch ( NoSuchEntryException e )
{
throw new IllegalStateException( "Commands did not contain token command" );
try
{
Collection<StorageCommand> commands = ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() );
tokenId = applyToStore( commands, logIndex );
}
catch ( NoSuchEntryException e )
{
throw new IllegalStateException( "Commands did not contain token command" );
}

tokenCache.put( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) );
}

tokenCache.put( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) );
tokenFutures.complete( tokenRequest.tokenName(), tokenId );
}
else
{
log.info( "Ignoring content at index %d, since already applied up to %d",
logIndex, lastCommittedIndex );
}

tokenFutures.complete( tokenRequest.tokenName(), tokenId );
}
}

Expand Down
Expand Up @@ -97,7 +97,11 @@ private void handleTransaction( ReplicatedTransaction<MEMBER> replicatedTx, long
* may still need to persist the session state (as we may crashed in between), which happens outside this
* if check.
*/
if ( !txAlreadyCommitted( logIndex ) )
if ( logIndex <= lastCommittedIndex )
{
log.info( "Ignoring transaction at log index %d since already committed up to %d", logIndex, lastCommittedIndex );
}
else
{
TransactionRepresentation tx;
try
Expand Down Expand Up @@ -167,11 +171,6 @@ private boolean operationValid( ReplicatedTransaction<MEMBER> replicatedTx )
return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() );
}

private boolean txAlreadyCommitted( long logIndex )
{
return logIndex <= lastCommittedIndex;
}

public void setLastCommittedIndex( long lastCommittedIndex )
{
this.lastCommittedIndex = lastCommittedIndex;
Expand Down
Expand Up @@ -293,8 +293,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
throw new RuntimeException( e );
}

ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself,
idAllocationState );
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine(
myself, idAllocationState, logProvider );

replicator.subscribe( idAllocationStateMachine );

Expand All @@ -316,11 +316,11 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

Long tokenCreationTimeout = config.get( CoreEdgeClusterSettings.token_creation_timeout );
ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder = new ReplicatedRelationshipTypeTokenHolder(
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout );
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider );
ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder = new ReplicatedPropertyKeyTokenHolder(
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout );
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider );
ReplicatedLabelTokenHolder labelTokenHolder = new ReplicatedLabelTokenHolder(
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout );
replicator, this.idGeneratorFactory, dependencies, tokenCreationTimeout, logProvider );

LifeSupport tokenLife = new LifeSupport();
this.relationshipTypeTokenHolder = tokenLife.add( relationshipTypeTokenHolder );
Expand Down

0 comments on commit 3a4eecb

Please sign in to comment.