From 658a2db5cc721e3c6fcee7ee6646ad44373d9883 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 22 Nov 2018 15:50:02 +0100 Subject: [PATCH] Small cleanup around replicated tokens The most important change is adding a log point when token commands are ignored because they already exist. --- .../ReplicatedTokenRequestSerializer.java | 4 +- .../token/ReplicatedTokenStateMachine.java | 47 ++++++++----------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java index 67c5c30046bb3..e89955ce0eed2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenRequestSerializer.java @@ -102,7 +102,7 @@ public static byte[] commandBytes( Collection commands ) } catch ( IOException e ) { - e.printStackTrace(); // TODO: Handle or throw. + throw new RuntimeException( "This should never happen since the channel is backed by an in-memory buffer.", e ); } /* @@ -135,7 +135,7 @@ static Collection extractCommands( byte[] commandBytes ) } catch ( IOException e ) { - e.printStackTrace(); // TODO: Handle or throw. + throw new RuntimeException( "This should never happen since the channel is backed by an in-memory buffer.", e ); } return commands; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenStateMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenStateMachine.java index 4c36e20a8c3fe..d593aa65d51a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenStateMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenStateMachine.java @@ -19,7 +19,6 @@ */ package org.neo4j.causalclustering.core.state.machines.token; -import java.io.IOException; import java.util.Collection; import java.util.function.Consumer; @@ -35,7 +34,6 @@ import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; -import org.neo4j.kernel.impl.util.collection.NoSuchEntryException; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.StorageCommand; @@ -44,6 +42,7 @@ import org.neo4j.storageengine.api.TransactionApplicationMode; import static java.lang.String.format; +import static org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequestSerializer.extractCommands; import static org.neo4j.causalclustering.core.state.machines.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader; public class ReplicatedTokenStateMachine implements StateMachine @@ -82,31 +81,27 @@ public synchronized void applyCommand( ReplicatedTokenRequest tokenRequest, long return; } - Integer tokenId = tokenRegistry.getId( tokenRequest.tokenName() ); + Collection commands = extractCommands( tokenRequest.commandBytes() ); + int newTokenId = extractTokenId( commands ); - if ( tokenId == null ) - { - try - { - Collection commands = - ReplicatedTokenRequestSerializer.extractCommands( tokenRequest.commandBytes() ); - tokenId = applyToStore( commands, commandIndex ); - } - catch ( NoSuchEntryException e ) - { - throw new IllegalStateException( "Commands did not contain token command" ); - } + Integer existingTokenId = tokenRegistry.getId( tokenRequest.tokenName() ); - tokenRegistry.addToken( tokenFactory.newToken( tokenRequest.tokenName(), tokenId ) ); + if ( existingTokenId == null ) + { + applyToStore( commands, commandIndex ); + tokenRegistry.addToken( tokenFactory.newToken( tokenRequest.tokenName(), newTokenId ) ); + callback.accept( Result.of( newTokenId ) ); + } + else + { + // This should be rare so a warning is in order. + log.warn( format( "Ignored %s (newTokenId=%d) since it already exists with existingTokenId=%d", tokenRequest, newTokenId, existingTokenId ) ); + callback.accept( Result.of( existingTokenId ) ); } - - callback.accept( Result.of( tokenId ) ); } - private int applyToStore( Collection commands, long logIndex ) throws NoSuchEntryException + private void applyToStore( Collection commands, long logIndex ) { - int tokenId = extractTokenId( commands ); - PhysicalTransactionRepresentation representation = new PhysicalTransactionRepresentation( commands ); representation.setHeader( encodeLogIndexAsTxHeader( logIndex ), 0, 0, 0, 0L, 0L, 0 ); @@ -119,11 +114,9 @@ private int applyToStore( Collection commands, long logIndex ) t { throw new RuntimeException( e ); } - - return tokenId; } - private int extractTokenId( Collection commands ) throws NoSuchEntryException + private int extractTokenId( Collection commands ) { for ( StorageCommand command : commands ) { @@ -132,11 +125,11 @@ private int extractTokenId( Collection commands ) throws NoSuchE return ((Command.TokenCommand) command).getAfter().getIntId(); } } - throw new NoSuchEntryException( "Expected command not found" ); + throw new IllegalStateException( "Commands did not contain token command" ); } @Override - public synchronized void flush() throws IOException + public synchronized void flush() { // already implicitly flushed to the store } @@ -146,7 +139,7 @@ public long lastAppliedIndex() { if ( commitProcess == null ) { - /** See {@link #installCommitProcess}. */ + /* See {@link #installCommitProcess}. */ throw new IllegalStateException( "Value has not been installed" ); } return lastCommittedIndex;