Skip to content

Commit

Permalink
Small cleanup around replicated tokens
Browse files Browse the repository at this point in the history
The most important change is adding a log point when token commands
are ignored because they already exist.
  • Loading branch information
martinfurmanski committed Dec 6, 2018
1 parent 77b8a73 commit 658a2db
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 29 deletions.
Expand Up @@ -102,7 +102,7 @@ public static byte[] commandBytes( Collection<StorageCommand> commands )
} }
catch ( IOException e ) catch ( IOException e )
{ {
e.printStackTrace(); // TODO: Handle or throw. throw new RuntimeException( "This should never happen since the channel is backed by an in-memory buffer.", e );
} }


/* /*
Expand Down Expand Up @@ -135,7 +135,7 @@ static Collection<StorageCommand> extractCommands( byte[] commandBytes )
} }
catch ( IOException e ) catch ( IOException e )
{ {
e.printStackTrace(); // TODO: Handle or throw. throw new RuntimeException( "This should never happen since the channel is backed by an in-memory buffer.", e );
} }


return commands; return commands;
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.causalclustering.core.state.machines.token; package org.neo4j.causalclustering.core.state.machines.token;


import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.function.Consumer; import java.util.function.Consumer;


Expand All @@ -35,7 +34,6 @@
import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.util.collection.NoSuchEntryException;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageCommand;
Expand All @@ -44,6 +42,7 @@
import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.TransactionApplicationMode;


import static java.lang.String.format; import static java.lang.String.format;
import static org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequestSerializer.extractCommands;
import static org.neo4j.causalclustering.core.state.machines.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader; import static org.neo4j.causalclustering.core.state.machines.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader;


public class ReplicatedTokenStateMachine<TOKEN extends Token> implements StateMachine<ReplicatedTokenRequest> public class ReplicatedTokenStateMachine<TOKEN extends Token> implements StateMachine<ReplicatedTokenRequest>
Expand Down Expand Up @@ -82,31 +81,27 @@ public synchronized void applyCommand( ReplicatedTokenRequest tokenRequest, long
return; return;
} }


Integer tokenId = tokenRegistry.getId( tokenRequest.tokenName() ); Collection<StorageCommand> commands = extractCommands( tokenRequest.commandBytes() );
int newTokenId = extractTokenId( commands );


if ( tokenId == null ) Integer existingTokenId = tokenRegistry.getId( tokenRequest.tokenName() );
{
try
{
Collection<StorageCommand> commands =
ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() );
tokenId = applyToStore( commands, commandIndex );
}
catch ( NoSuchEntryException e )
{
throw new IllegalStateException( "Commands did not contain token command" );
}


tokenRegistry.addToken( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) ); if ( existingTokenId == null )
{
applyToStore( commands, commandIndex );
tokenRegistry.addToken( tokenFactory.newToken( tokenRequest.tokenName(), newTokenId ) );
callback.accept( Result.of( newTokenId ) );
}
else
{
// This should be rare so a warning is in order.
log.warn( format( "Ignored %s (newTokenId=%d) since it already exists with existingTokenId=%d", tokenRequest, newTokenId, existingTokenId ) );
callback.accept( Result.of( existingTokenId ) );
} }

callback.accept( Result.of( tokenId ) );
} }


private int applyToStore( Collection<StorageCommand> commands, long logIndex ) throws NoSuchEntryException private void applyToStore( Collection<StorageCommand> commands, long logIndex )
{ {
int tokenId = extractTokenId( commands );

PhysicalTransactionRepresentation representation = new PhysicalTransactionRepresentation( commands ); PhysicalTransactionRepresentation representation = new PhysicalTransactionRepresentation( commands );
representation.setHeader( encodeLogIndexAsTxHeader( logIndex ), 0, 0, 0, 0L, 0L, 0 ); representation.setHeader( encodeLogIndexAsTxHeader( logIndex ), 0, 0, 0, 0L, 0L, 0 );


Expand All @@ -119,11 +114,9 @@ private int applyToStore( Collection<StorageCommand> commands, long logIndex ) t
{ {
throw new RuntimeException( e ); throw new RuntimeException( e );
} }

return tokenId;
} }


private int extractTokenId( Collection<StorageCommand> commands ) throws NoSuchEntryException private int extractTokenId( Collection<StorageCommand> commands )
{ {
for ( StorageCommand command : commands ) for ( StorageCommand command : commands )
{ {
Expand All @@ -132,11 +125,11 @@ private int extractTokenId( Collection<StorageCommand> commands ) throws NoSuchE
return ((Command.TokenCommand<? extends TokenRecord>) command).getAfter().getIntId(); return ((Command.TokenCommand<? extends TokenRecord>) command).getAfter().getIntId();
} }
} }
throw new NoSuchEntryException( "Expected command not found" ); throw new IllegalStateException( "Commands did not contain token command" );
} }


@Override @Override
public synchronized void flush() throws IOException public synchronized void flush()
{ {
// already implicitly flushed to the store // already implicitly flushed to the store
} }
Expand All @@ -146,7 +139,7 @@ public long lastAppliedIndex()
{ {
if ( commitProcess == null ) if ( commitProcess == null )
{ {
/** See {@link #installCommitProcess}. */ /* See {@link #installCommitProcess}. */
throw new IllegalStateException( "Value has not been installed" ); throw new IllegalStateException( "Value has not been installed" );
} }
return lastCommittedIndex; return lastCommittedIndex;
Expand Down

0 comments on commit 658a2db

Please sign in to comment.