diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java index 77e9aa478e496..20b072a9a2504 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java @@ -27,10 +27,10 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule; -import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog; import org.neo4j.causalclustering.core.consensus.log.MonitoredRaftLog; import org.neo4j.causalclustering.core.consensus.log.RaftLog; +import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache; import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory; import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategy; import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategyFactory; @@ -43,21 +43,23 @@ import org.neo4j.causalclustering.core.consensus.term.MonitoredTermStateStorage; import org.neo4j.causalclustering.core.consensus.term.TermState; import org.neo4j.causalclustering.core.consensus.vote.VoteState; +import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.core.replication.SendToMyself; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; import org.neo4j.causalclustering.core.state.storage.StateStorage; import org.neo4j.causalclustering.discovery.CoreTopologyService; import org.neo4j.causalclustering.discovery.RaftCoreTopologyConnector; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.PlatformModule; import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; import static org.neo4j.causalclustering.core.CausalClusteringSettings.catchup_batch_size; import static org.neo4j.causalclustering.core.CausalClusteringSettings.join_catch_up_timeout; @@ -90,7 +92,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, LogProvider logProvider = logging.getInternalLogProvider(); - final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal(); + final CoreReplicatedContentSerializer marshal = new CoreReplicatedContentSerializer(); RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider, platformModule.jobScheduler ); @@ -159,8 +161,8 @@ private LeaderAvailabilityTimers createElectionTiming( Config config, TimerServi return new LeaderAvailabilityTimers( electionTimeout, electionTimeout.dividedBy( 3 ), systemClock(), timerService, logProvider ); } - private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, - File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider, + private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, File clusterStateDirectory, + ChannelMarshal marshal, LogProvider logProvider, JobScheduler scheduler ) { EnterpriseCoreEditionModule.RaftLogImplementation raftLogImplementation = diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/DumpSegmentedRaftLog.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/DumpSegmentedRaftLog.java index abc417ca77da6..5cb8ac8f85e52 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/DumpSegmentedRaftLog.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/DumpSegmentedRaftLog.java @@ -29,8 +29,8 @@ import org.neo4j.causalclustering.core.consensus.log.EntryRecord; import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.cursor.IOCursor; import org.neo4j.helpers.Args; import org.neo4j.io.fs.DefaultFileSystemAbstraction; @@ -101,7 +101,7 @@ public static void main( String[] args ) try ( DefaultFileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction() ) { - new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentMarshal() ) + new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentSerializer() ) .dump( fileAsString, printer.getFor( fileAsString ) ); } catch ( IOException | DisposedException | DamagedLogStorageException e ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolClientInstaller.java index 26ea796bf7b78..b3a95f1d50a1f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolClientInstaller.java @@ -28,8 +28,8 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageEncoder; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -69,7 +69,7 @@ public void install( Channel channel ) throws Exception clientPipelineBuilderFactory.client( channel, log ) .modify( modifiers ) .addFraming() - .add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) + .add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentSerializer() ) ) .install(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolServerInstaller.java index c36c10d3e2610..a736ec4a33758 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v1/RaftProtocolServerInstaller.java @@ -30,8 +30,8 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageDecoder; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -73,7 +73,7 @@ public void install( Channel channel ) throws Exception pipelineBuilderFactory.server( channel, log ) .modify( modifiers ) .addFraming() - .add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) + .add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentSerializer(), Clock.systemUTC() ) ) .add( "raft_handler", raftMessageHandler ) .install(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java index 46b0a3d15abd9..40c175f54d295 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolClientInstaller.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.stream.Collectors; -import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder; -import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; +import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ReplicatedContentChunkEncoder; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java index f6026993224e5..166994dffe532 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstaller.java @@ -28,7 +28,7 @@ import java.util.stream.Collectors; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher; -import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol; +import org.neo4j.causalclustering.messaging.marshalling.v2.ContentTypeProtocol; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ReplicatedContentDecoder; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/DistributedOperation.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/DistributedOperation.java index 9ed0074e43ba2..25d50c1e8bde1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/DistributedOperation.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/DistributedOperation.java @@ -26,13 +26,13 @@ import java.util.Objects; import java.util.UUID; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.core.replication.session.GlobalSession; import org.neo4j.causalclustering.core.replication.session.LocalOperationId; -import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.messaging.EndOfStreamException; +import org.neo4j.causalclustering.messaging.marshalling.Serializer; +import org.neo4j.causalclustering.messaging.marshalling.ContentBuilder; import org.neo4j.storageengine.api.ReadableChannel; -import org.neo4j.storageengine.api.WritableChannel; /** * A uniquely identifiable operation. @@ -77,19 +77,25 @@ public long size() return content.size(); } - public void serialize( WritableChannel channel ) throws IOException + /** + * This this consumer ignores the content which is handles by its own serializer. + * + * @return Consumer with instructions for writing to channel. + */ + public Serializer serialize() { - channel.putLong( globalSession().sessionId().getMostSignificantBits() ); - channel.putLong( globalSession().sessionId().getLeastSignificantBits() ); - new MemberId.Marshal().marshal( globalSession().owner(), channel ); - - channel.putLong( operationId.localSessionId() ); - channel.putLong( operationId.sequenceNumber() ); + return Serializer.simple( channel1 -> + { + channel1.putLong( globalSession().sessionId().getMostSignificantBits() ); + channel1.putLong( globalSession().sessionId().getLeastSignificantBits() ); + new MemberId.Marshal().marshal( globalSession().owner(), channel1 ); - new CoreReplicatedContentMarshal().marshal( content, channel ); + channel1.putLong( operationId.localSessionId() ); + channel1.putLong( operationId.sequenceNumber() ); + } ); } - public static DistributedOperation deserialize( ReadableChannel channel ) throws IOException, EndOfStreamException + public static ContentBuilder deserialize( ReadableChannel channel ) throws IOException, EndOfStreamException { long mostSigBits = channel.getLong(); long leastSigBits = channel.getLong(); @@ -100,8 +106,7 @@ public static DistributedOperation deserialize( ReadableChannel channel ) throws long sequenceNumber = channel.getLong(); LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber ); - ReplicatedContent content = new CoreReplicatedContentMarshal().unmarshal( channel ); - return new DistributedOperation( content, globalSession, localOperationId ); + return ContentBuilder.unfinished( subContent -> new DistributedOperation( subContent, globalSession, localOperationId ) ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java index 8dcd1548b7af9..a48e31b8361e1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -28,7 +28,7 @@ import org.neo4j.storageengine.api.WritableChannel; -public class ChunkedReplicatedContent implements SerializableContent, ChunkedInput +public class ChunkedReplicatedContent implements Marshal, ChunkedInput { private static final int DEFAULT_CHUNK_SIZE = 8192; @@ -55,7 +55,7 @@ public ChunkedReplicatedContent( byte contentType, Serializer serializer ) } @Override - public void serialize( WritableChannel channel ) throws IOException + public void marshal( WritableChannel channel ) throws IOException { channel.put( contentType ); serializer.marshal( channel ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ContentBuilder.java similarity index 81% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentBuilder.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ContentBuilder.java index 71c3ad8a4ed28..9e59c1a4ed306 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ContentBuilder.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.messaging.marshalling.v2; +package org.neo4j.causalclustering.messaging.marshalling; import java.util.function.Function; @@ -31,16 +31,20 @@ public static ContentBuilder emptyUnfinished() return new ContentBuilder<>( content -> content, false ); } - ContentBuilder( Function contentFunction, boolean isComplete ) + public static ContentBuilder unfinished( Function contentFunction ) { - this.contentFunction = contentFunction; - this.isComplete = isComplete; + return new ContentBuilder<>( contentFunction, false ); } - ContentBuilder( CONTENT replicatedContent ) + public static ContentBuilder finished( C content ) { - this.isComplete = true; - this.contentFunction = replicatedContent1 -> replicatedContent; + return new ContentBuilder<>( c1 -> content, true ); + } + + private ContentBuilder( Function contentFunction, boolean isComplete ) + { + this.contentFunction = contentFunction; + this.isComplete = isComplete; } public boolean isComplete() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/CoreReplicatedContentSerializer.java similarity index 70% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/CoreReplicatedContentSerializer.java index 17bd2b4dccd49..20b59cc5ec58b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentSerializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/CoreReplicatedContentSerializer.java @@ -17,20 +17,17 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.messaging.marshalling.v2; +package org.neo4j.causalclustering.messaging.marshalling; import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import java.util.UUID; import org.neo4j.causalclustering.core.consensus.NewLeaderBarrier; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetSerializer; import org.neo4j.causalclustering.core.replication.DistributedOperation; import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.core.replication.session.GlobalSession; -import org.neo4j.causalclustering.core.replication.session.LocalOperationId; import org.neo4j.causalclustering.core.state.machines.dummy.DummyRequest; import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequest; import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequestSerializer; @@ -41,10 +38,7 @@ import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionSerializer; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.messaging.marshalling.ChunkedReplicatedContent; -import org.neo4j.causalclustering.messaging.marshalling.SerializableContent; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -62,7 +56,7 @@ public class CoreReplicatedContentSerializer extends SafeChannelMarshal toSerializable( ReplicatedContent content ) + public Collection toSerializable( ReplicatedContent content ) { if ( content instanceof ReplicatedTransaction ) { @@ -96,16 +90,8 @@ else if ( content instanceof ReplicatedLockTokenRequest ) } else if ( content instanceof DistributedOperation ) { - LinkedList list = new LinkedList<>( toSerializable( ((DistributedOperation) content).content() ) ); - list.add( 0, new ChunkedReplicatedContent( DISTRIBUTED_OPERATION, simple( channel -> - { - channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getMostSignificantBits() ); - channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getLeastSignificantBits() ); - new MemberId.Marshal().marshal( ((DistributedOperation) content).globalSession().owner(), channel ); - - channel.putLong( ((DistributedOperation) content).operationId().localSessionId() ); - channel.putLong( ((DistributedOperation) content).operationId().sequenceNumber() ); - } ) ) ); + LinkedList list = new LinkedList<>( toSerializable( ((DistributedOperation) content).content() ) ); + list.add( 0, new ChunkedReplicatedContent( DISTRIBUTED_OPERATION, ((DistributedOperation) content).serialize() ) ); return list; } else if ( content instanceof DummyRequest ) @@ -124,32 +110,23 @@ public ContentBuilder read( byte contentType, ReadableChannel switch ( contentType ) { case TX_CONTENT_TYPE: - return new ContentBuilder<>( ReplicatedTransactionSerializer.unmarshal( channel ) ); + return ContentBuilder.finished( ReplicatedTransactionSerializer.unmarshal( channel ) ); case RAFT_MEMBER_SET_TYPE: - return new ContentBuilder<>( MemberIdSetSerializer.unmarshal( channel ) ); + return ContentBuilder.finished( MemberIdSetSerializer.unmarshal( channel ) ); case ID_RANGE_REQUEST_TYPE: - return new ContentBuilder<>( ReplicatedIdAllocationRequestSerializer.unmarshal( channel ) ); + return ContentBuilder.finished( ReplicatedIdAllocationRequestSerializer.unmarshal( channel ) ); case TOKEN_REQUEST_TYPE: - return new ContentBuilder<>( ReplicatedTokenRequestSerializer.unmarshal( channel ) ); + return ContentBuilder.finished( ReplicatedTokenRequestSerializer.unmarshal( channel ) ); case NEW_LEADER_BARRIER_TYPE: - return new ContentBuilder<>( new NewLeaderBarrier() ); + return ContentBuilder.finished( new NewLeaderBarrier() ); case LOCK_TOKEN_REQUEST: - return new ContentBuilder<>( ReplicatedLockTokenSerializer.unmarshal( channel ) ); + return ContentBuilder.finished( ReplicatedLockTokenSerializer.unmarshal( channel ) ); case DISTRIBUTED_OPERATION: { - long mostSigBits = channel.getLong(); - long leastSigBits = channel.getLong(); - MemberId owner = new MemberId.Marshal().unmarshal( channel ); - GlobalSession globalSession = new GlobalSession( new UUID( mostSigBits, leastSigBits ), owner ); - - long localSessionId = channel.getLong(); - long sequenceNumber = channel.getLong(); - LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber ); - - return new ContentBuilder<>( replicatedContent -> new DistributedOperation( replicatedContent, globalSession, localOperationId ), false ); + return DistributedOperation.deserialize( channel ); } case DUMMY_REQUEST: - return new ContentBuilder<>( DummyRequest.Marshal.INSTANCE.unmarshal( channel ) ); + return ContentBuilder.finished( DummyRequest.Marshal.INSTANCE.unmarshal( channel ) ); default: throw new IllegalStateException( "Not a recognized content type: " + contentType ); } @@ -158,9 +135,9 @@ public ContentBuilder read( byte contentType, ReadableChannel @Override public void marshal( ReplicatedContent coreReplicatedContent, WritableChannel channel ) throws IOException { - for ( SerializableContent serializableContent : toSerializable( coreReplicatedContent ) ) + for ( Marshal marshal : toSerializable( coreReplicatedContent ) ) { - serializableContent.serialize( channel ); + marshal.marshal( channel ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/SerializableContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Marshal.java similarity index 82% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/SerializableContent.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Marshal.java index 35c3f5a9f3d5d..c916621351625 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/SerializableContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Marshal.java @@ -23,7 +23,12 @@ import org.neo4j.storageengine.api.WritableChannel; -public interface SerializableContent +public interface Marshal { - void serialize( WritableChannel channel ) throws IOException; + /** + * Writes all content to the channel + * + * @param channel to where data is written. + */ + void marshal( WritableChannel channel ) throws IOException; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java index fea8c3c9e4c85..cecba611f44a2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/Serializer.java @@ -27,7 +27,7 @@ import org.neo4j.function.ThrowingConsumer; import org.neo4j.storageengine.api.WritableChannel; -public interface Serializer +public interface Serializer extends Marshal { /** May override buffer allocation size. * @param channelConsumer @@ -60,17 +60,10 @@ public void marshal( WritableChannel channel ) throws IOException } /** - * Writes to byteBuf until full. + * Writes to ByteBuf until there is no more left to write. Should write equal to or less the amount of writable bytes in the buffer. * * @param byteBuf where data will be written * @return false if there is no more data left to write after this call. */ boolean encode( ByteBuf byteBuf ) throws IOException; - - /** - * Writes all content to the channel - * - * @param channel to where data is written. - */ - void marshal( WritableChannel channel ) throws IOException; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/CoreReplicatedContentMarshal.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/CoreReplicatedContentMarshal.java deleted file mode 100644 index e8d22e7a4ba91..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v1/CoreReplicatedContentMarshal.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.messaging.marshalling.v1; - -import java.io.IOException; - -import org.neo4j.causalclustering.core.consensus.NewLeaderBarrier; -import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; -import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetSerializer; -import org.neo4j.causalclustering.core.replication.DistributedOperation; -import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequest; -import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequestSerializer; -import org.neo4j.causalclustering.core.state.machines.locks.ReplicatedLockTokenRequest; -import org.neo4j.causalclustering.core.state.machines.locks.ReplicatedLockTokenSerializer; -import org.neo4j.causalclustering.core.state.machines.dummy.DummyRequest; -import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequest; -import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequestSerializer; -import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; -import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionSerializer; -import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; -import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.storageengine.api.ReadableChannel; -import org.neo4j.storageengine.api.WritableChannel; - -public class CoreReplicatedContentMarshal extends SafeChannelMarshal -{ - private static final byte TX_CONTENT_TYPE = 0; - private static final byte RAFT_MEMBER_SET_TYPE = 1; - private static final byte ID_RANGE_REQUEST_TYPE = 2; - private static final byte TOKEN_REQUEST_TYPE = 4; - private static final byte NEW_LEADER_BARRIER_TYPE = 5; - private static final byte LOCK_TOKEN_REQUEST = 6; - private static final byte DISTRIBUTED_OPERATION = 7; - private static final byte DUMMY_REQUEST = 8; - - @Override - public void marshal( ReplicatedContent content, WritableChannel channel ) throws IOException - { - if ( content instanceof ReplicatedTransaction ) - { - channel.put( TX_CONTENT_TYPE ); - ReplicatedTransactionSerializer.marshal( (ReplicatedTransaction) content, channel ); - } - else if ( content instanceof MemberIdSet ) - { - channel.put( RAFT_MEMBER_SET_TYPE ); - MemberIdSetSerializer.marshal( (MemberIdSet) content, channel ); - } - else if ( content instanceof ReplicatedIdAllocationRequest ) - { - channel.put( ID_RANGE_REQUEST_TYPE ); - ReplicatedIdAllocationRequestSerializer.marshal( (ReplicatedIdAllocationRequest) content, channel ); - } - else if ( content instanceof ReplicatedTokenRequest ) - { - channel.put( TOKEN_REQUEST_TYPE ); - ReplicatedTokenRequestSerializer.marshal( (ReplicatedTokenRequest) content, channel ); - } - else if ( content instanceof NewLeaderBarrier ) - { - channel.put( NEW_LEADER_BARRIER_TYPE ); - } - else if ( content instanceof ReplicatedLockTokenRequest ) - { - channel.put( LOCK_TOKEN_REQUEST ); - ReplicatedLockTokenSerializer.marshal( (ReplicatedLockTokenRequest) content, channel ); - } - else if ( content instanceof DistributedOperation ) - { - channel.put( DISTRIBUTED_OPERATION ); - ((DistributedOperation) content).serialize( channel ); - } - else if ( content instanceof DummyRequest ) - { - channel.put( DUMMY_REQUEST ); - DummyRequest.Marshal.INSTANCE.marshal( (DummyRequest) content, channel ); - } - else - { - throw new IllegalArgumentException( "Unknown content type " + content.getClass() ); - } - } - - @Override - public ReplicatedContent unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException - { - byte type = channel.get(); - final ReplicatedContent content; - switch ( type ) - { - case TX_CONTENT_TYPE: - content = ReplicatedTransactionSerializer.unmarshal( channel ); - break; - case RAFT_MEMBER_SET_TYPE: - content = MemberIdSetSerializer.unmarshal( channel ); - break; - case ID_RANGE_REQUEST_TYPE: - content = ReplicatedIdAllocationRequestSerializer.unmarshal( channel ); - break; - case TOKEN_REQUEST_TYPE: - content = ReplicatedTokenRequestSerializer.unmarshal( channel ); - break; - case NEW_LEADER_BARRIER_TYPE: - content = new NewLeaderBarrier(); - break; - case LOCK_TOKEN_REQUEST: - content = ReplicatedLockTokenSerializer.unmarshal( channel ); - break; - case DISTRIBUTED_OPERATION: - content = DistributedOperation.deserialize( channel ); - break; - case DUMMY_REQUEST: - content = DummyRequest.Marshal.INSTANCE.unmarshal( channel ); - break; - default: - throw new IllegalArgumentException( String.format( "Unknown content type 0x%x", type ) ); - } - return content; - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ContentTypeProtocol.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentTypeProtocol.java similarity index 87% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ContentTypeProtocol.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentTypeProtocol.java index 34273566a98ac..bd85a5b2856a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ContentTypeProtocol.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/ContentTypeProtocol.java @@ -17,10 +17,9 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.messaging.marshalling.v2.decoding; +package org.neo4j.causalclustering.messaging.marshalling.v2; import org.neo4j.causalclustering.catchup.Protocol; -import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; public class ContentTypeProtocol extends Protocol { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java index bd719687341df..7fcaf7f64a662 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java @@ -28,7 +28,7 @@ import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; -import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java index bcd3045d9abfa..6d6caec3cd46a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentDecoder.java @@ -26,7 +26,7 @@ import org.neo4j.causalclustering.catchup.Protocol; import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.messaging.marshalling.v2.ContentBuilder; +import org.neo4j.causalclustering.messaging.marshalling.ContentBuilder; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; public class ReplicatedContentDecoder extends MessageToMessageDecoder> diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java index ac93eff03c122..c1916d4d9d514 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/RaftMessageContentEncoder.java @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; -import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; public class RaftMessageContentEncoder extends MessageToMessageEncoder { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java similarity index 88% rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java index 0f1d3eabfba56..1442ad24fd404 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ReplicatedContentChunkEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/encoding/ReplicatedContentChunkEncoder.java @@ -17,12 +17,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.causalclustering.messaging.marshalling; +package org.neo4j.causalclustering.messaging.marshalling.v2.encoding; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk; + public class ReplicatedContentChunkEncoder extends MessageToByteEncoder { @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/RaftContentByteBufferMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/RaftContentByteBufferMarshalTest.java deleted file mode 100644 index 2c95d400999f8..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/RaftContentByteBufferMarshalTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.core.consensus.log; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; - -import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; -import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; -import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; -import org.neo4j.causalclustering.core.replication.ReplicatedContent; -import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequest; -import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; -import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionFactory; -import org.neo4j.causalclustering.messaging.EndOfStreamException; -import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.kernel.impl.index.IndexCommand; -import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.transaction.TransactionRepresentation; -import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; -import org.neo4j.storageengine.api.StorageCommand; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.neo4j.helpers.collection.Iterators.asSet; - -public class RaftContentByteBufferMarshalTest -{ - private MemberId memberId = new MemberId( UUID.randomUUID() ); - - @Test - public void shouldSerializeMemberSet() throws Exception - { - // given - CoreReplicatedContentMarshal serializer = new CoreReplicatedContentMarshal(); - MemberIdSet in = new MemberIdSet( asSet( - new MemberId( UUID.randomUUID() ), - new MemberId( UUID.randomUUID() ) - ) ); - - // when - ByteBuf buf = Unpooled.buffer(); - assertMarshalingEquality( serializer, buf, in ); - } - - @Test - public void shouldSerializeTransactionRepresentation() throws Exception - { - // given - CoreReplicatedContentMarshal serializer = new CoreReplicatedContentMarshal(); - Collection commands = new ArrayList<>(); - - IndexCommand.AddNodeCommand addNodeCommand = new IndexCommand.AddNodeCommand(); - addNodeCommand.init( 0, 0, 0, 0 ); - - commands.add( addNodeCommand ); - - byte[] extraHeader = new byte[0]; - - PhysicalTransactionRepresentation txIn = new PhysicalTransactionRepresentation( commands ); - txIn.setHeader( extraHeader, -1, -1, 0, 0, 0, 0 ); - ReplicatedTransaction in = ReplicatedTransactionFactory.createImmutableReplicatedTransaction( txIn ); - - // when - ByteBuf buf = Unpooled.buffer(); - serializer.marshal( in, new NetworkFlushableByteBuf( buf ) ); - ReplicatedTransaction out = - (ReplicatedTransaction) serializer.unmarshal( new NetworkReadableClosableChannelNetty4( buf ) ); - - TransactionRepresentation txOut = ReplicatedTransactionFactory.extractTransactionRepresentation( out, - extraHeader ); - - // then - assertEquals( in, out ); - assertEquals( txIn, txOut ); - } - - @Test - public void txSerializationShouldNotResultInExcessZeroes() - { - /* - * This test ensures that the buffer used to serialize a transaction and then extract the byte array for - * sending over the wire is trimmed properly. Not doing so will result in sending too many trailing garbage - * (zeroes) that will be ignored from the other side, as zeros are interpreted as null entries from the - * LogEntryReader and stop the deserialization process. - * The test creates a single transaction which has just a header, no commands. That should amount to 40 bytes - * as ReplicatedTransactionFactory.TransactionSerializer.write() makes it out at the time of this writing. If - * that code changes, this test will break. - */ - byte[] extraHeader = new byte[0]; - - PhysicalTransactionRepresentation txIn = new PhysicalTransactionRepresentation( new ArrayList<>() ); - txIn.setHeader( extraHeader, -1, -1, 0, 0, 0, 0 ); - - // when - ReplicatedTransaction in = ReplicatedTransactionFactory.createImmutableReplicatedTransaction( txIn ); - - // then - assertEquals( 40, in.getTxBytes().length ); - } - - @Test - public void shouldSerializeIdRangeRequest() throws Exception - { - // given - CoreReplicatedContentMarshal serializer = new CoreReplicatedContentMarshal(); - ReplicatedContent in = new ReplicatedIdAllocationRequest( memberId, IdType.NODE, 100, 200 ); - - // when - ByteBuf buf = Unpooled.buffer(); - assertMarshalingEquality( serializer, buf, in ); - } - - private void assertMarshalingEquality( CoreReplicatedContentMarshal marshal, - ByteBuf buffer, - ReplicatedContent replicatedTx ) throws IOException, EndOfStreamException - { - marshal.marshal( replicatedTx, new NetworkFlushableByteBuf( buffer ) ); - - assertThat( marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) ), equalTo( replicatedTx ) ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/debug/ReplayRaftLog.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/debug/ReplayRaftLog.java index f5405ece8dce3..bbfaf01958dc6 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/debug/ReplayRaftLog.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/debug/ReplayRaftLog.java @@ -31,7 +31,7 @@ import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionFactory; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.helpers.Args; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; @@ -71,7 +71,7 @@ public static void main( String[] args ) throws IOException CoreLogPruningStrategy pruningStrategy = new CoreLogPruningStrategyFactory( config.get( raft_log_pruning_strategy ), logProvider ).newInstance(); SegmentedRaftLog log = new SegmentedRaftLog( fileSystem, logDirectory, config.get( raft_log_rotation_size ), - new CoreReplicatedContentMarshal(), logProvider, config.get( raft_log_reader_pool_size ), + new CoreReplicatedContentSerializer(), logProvider, config.get( raft_log_reader_pool_size ), Clocks.systemClock(), new OnDemandJobScheduler(), pruningStrategy ); long totalCommittedEntries = log.appendIndex(); // Not really, but we need to have a way to pass in the commit index diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLogPartialEntryRecoveryTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLogPartialEntryRecoveryTest.java index e5f1e72b83e24..fa2b1cb11c688 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLogPartialEntryRecoveryTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLogPartialEntryRecoveryTest.java @@ -39,7 +39,7 @@ import org.neo4j.causalclustering.core.state.machines.token.TokenType; import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.io.fs.OpenMode; import org.neo4j.io.fs.StoreChannel; import org.neo4j.kernel.impl.store.id.IdType; @@ -79,7 +79,7 @@ private SegmentedRaftLog createRaftLog( long rotateAtSize ) LogProvider logProvider = getInstance(); CoreLogPruningStrategy pruningStrategy = new CoreLogPruningStrategyFactory( "100 entries", logProvider ).newInstance(); - return new SegmentedRaftLog( fsRule.get(), logDirectory, rotateAtSize, new CoreReplicatedContentMarshal(), + return new SegmentedRaftLog( fsRule.get(), logDirectory, rotateAtSize, new CoreReplicatedContentSerializer(), logProvider, 8, Clocks.fakeClock(), new OnDemandJobScheduler(), pruningStrategy ); } @@ -88,7 +88,7 @@ private RecoveryProtocol createRecoveryProtocol() FileNames fileNames = new FileNames( logDirectory ); return new RecoveryProtocol( fsRule.get(), fileNames, new ReaderPool( 8, getInstance(), fileNames, fsRule.get(), Clocks.fakeClock() ), - new CoreReplicatedContentMarshal(), getInstance() ); + new CoreReplicatedContentSerializer(), getInstance() ); } @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java index 38d83a05c3c5e..b92effc614aff 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/CoreReplicatedContentMarshalTest.java @@ -33,7 +33,6 @@ import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet; -import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequest; import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequest; @@ -43,6 +42,7 @@ import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.store.record.LabelTokenRecord; import org.neo4j.kernel.impl.transaction.command.Command; @@ -55,7 +55,7 @@ public class CoreReplicatedContentMarshalTest { - private final ChannelMarshal marshal = new CoreReplicatedContentMarshal(); + private final ChannelMarshal marshal = new CoreReplicatedContentSerializer(); @Test public void shouldMarshalTransactionReference() throws Exception diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java index 5a910129403c6..bff5a3bc66c0a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/v2/CoreReplicatedContentMarshallingTest.java @@ -46,6 +46,7 @@ import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; +import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer; import static org.junit.Assert.assertEquals;