Skip to content

Commit

Permalink
Use Marshal in naming
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent 9f52d5d commit 7f035ca
Show file tree
Hide file tree
Showing 19 changed files with 56 additions and 58 deletions.
Expand Up @@ -52,7 +52,7 @@
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
Expand Down Expand Up @@ -92,7 +92,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,

LogProvider logProvider = logging.getInternalLogProvider();

final CoreReplicatedContentSerializer marshal = new CoreReplicatedContentSerializer();
final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal();

RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider,
platformModule.jobScheduler );
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.causalclustering.core.consensus.log.EntryRecord;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.cursor.IOCursor;
import org.neo4j.helpers.Args;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand Down Expand Up @@ -101,7 +101,7 @@ public static void main( String[] args )

try ( DefaultFileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction() )
{
new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentSerializer() )
new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentMarshal() )
.dump( fileAsString, printer.getFor( fileAsString ) );
}
catch ( IOException | DisposedException | DamagedLogStorageException e )
Expand Down
Expand Up @@ -29,7 +29,7 @@
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageEncoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void install( Channel channel ) throws Exception
clientPipelineBuilderFactory.client( channel, log )
.modify( modifiers )
.addFraming()
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentSerializer() ) )
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
.install();
}

Expand Down
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageDecoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void install( Channel channel ) throws Exception
pipelineBuilderFactory.server( channel, log )
.modify( modifiers )
.addFraming()
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentSerializer(), Clock.systemUTC() ) )
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
.add( "raft_handler", raftMessageHandler )
.install();
}
Expand Down
Expand Up @@ -29,7 +29,7 @@
import java.util.List;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void install( Channel channel ) throws Exception
.add( "raft_content_type_encoder", new ContentTypeEncoder() )
.add( "raft_chunked_writer", new ChunkedWriteHandler( ) )
.add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() )
.add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) )
.add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentMarshal() ) )
.install();
}

Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.causalclustering.core.replication.session.LocalOperationId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.causalclustering.messaging.marshalling.ByteBufAwareMarshal;
import org.neo4j.causalclustering.messaging.marshalling.ContentBuilder;
import org.neo4j.storageengine.api.ReadableChannel;

Expand Down Expand Up @@ -82,9 +82,9 @@ public long size()
*
* @return Consumer with instructions for writing to channel.
*/
public Serializer serialize()
public ByteBufAwareMarshal serialize()
{
return Serializer.simple( channel1 ->
return ByteBufAwareMarshal.simple( channel1 ->
{
channel1.putLong( globalSession().sessionId().getMostSignificantBits() );
channel1.putLong( globalSession().sessionId().getLeastSignificantBits() );
Expand Down
Expand Up @@ -26,14 +26,12 @@
import java.util.Arrays;
import java.util.function.Consumer;

import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.state.CommandDispatcher;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.core.state.machines.tx.CoreReplicatedContent;
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.ByteArraySerializer;
import org.neo4j.causalclustering.messaging.marshalling.ContentBuilder;
import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.causalclustering.messaging.marshalling.ByteArrayByteBufAwareMarshal;
import org.neo4j.causalclustering.messaging.marshalling.ByteBufAwareMarshal;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

Expand Down Expand Up @@ -69,15 +67,15 @@ public void dispatch( CommandDispatcher commandDispatcher, long commandIndex, Co
commandDispatcher.dispatch( this, commandIndex, callback );
}

public Serializer serializer()
public ByteBufAwareMarshal serializer()
{
if ( data != null )
{
return new ByteArraySerializer( data );
return new ByteArrayByteBufAwareMarshal( data );
}
else
{
return Serializer.simple( channel -> channel.putInt( 0 ) );
return ByteBufAwareMarshal.simple( channel -> channel.putInt( 0 ) );
}
}

Expand Down
Expand Up @@ -26,8 +26,8 @@

import java.io.IOException;

import org.neo4j.causalclustering.messaging.marshalling.ByteArraySerializer;
import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.causalclustering.messaging.marshalling.ByteArrayByteBufAwareMarshal;
import org.neo4j.causalclustering.messaging.marshalling.ByteBufAwareMarshal;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

Expand Down Expand Up @@ -69,8 +69,8 @@ public static ReplicatedTransaction unmarshal( ByteBuf buffer )
return new ReplicatedTransaction( txBytes );
}

public static Serializer serializer( ReplicatedTransaction replicatedTransaction )
public static ByteBufAwareMarshal serializer( ReplicatedTransaction replicatedTransaction )
{
return new ByteArraySerializer( replicatedTransaction.getTxBytes() );
return new ByteArrayByteBufAwareMarshal( replicatedTransaction.getTxBytes() );
}
}
Expand Up @@ -29,12 +29,12 @@

import org.neo4j.storageengine.api.WritableChannel;

public class ByteArraySerializer implements Serializer
public class ByteArrayByteBufAwareMarshal implements ByteBufAwareMarshal
{
private final byte[] content;
private final ByteArrayInputStream inputStream;

public ByteArraySerializer( byte[] content )
public ByteArrayByteBufAwareMarshal( byte[] content )
{
inputStream = new ByteArrayInputStream( content );
this.content = content;
Expand Down
Expand Up @@ -30,15 +30,15 @@
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.storageengine.api.WritableChannel;

public interface Serializer extends Marshal
public interface ByteBufAwareMarshal extends Marshal
{
/** May override buffer allocation size.
* @param channelConsumer used by both encode and marshal to serialize the object.
* @return a simple serializer that encodes all the content at once.
*/
static Serializer simple( ThrowingConsumer<WritableChannel,IOException> channelConsumer )
static ByteBufAwareMarshal simple( ThrowingConsumer<WritableChannel,IOException> channelConsumer )
{
return new Serializer()
return new ByteBufAwareMarshal()
{
private boolean consumed;

Expand Down
Expand Up @@ -36,14 +36,14 @@ public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ByteBuf>

private static final int DEFAULT_CHUNK_SIZE = 8192;
private final byte contentType;
private final Serializer serializer;
private final ByteBufAwareMarshal byteBufAwareMarshal;
private final int chunkSize;
private boolean lastByteWasWritten;
private int progress;

public ChunkedReplicatedContent( byte contentType, Serializer serializer, int chunkSize )
public ChunkedReplicatedContent( byte contentType, ByteBufAwareMarshal byteBufAwareMarshal, int chunkSize )
{
this.serializer = serializer;
this.byteBufAwareMarshal = byteBufAwareMarshal;
this.chunkSize = chunkSize;
if ( chunkSize < 7 )
{
Expand All @@ -52,16 +52,16 @@ public ChunkedReplicatedContent( byte contentType, Serializer serializer, int ch
this.contentType = contentType;
}

public ChunkedReplicatedContent( byte contentType, Serializer serializer )
public ChunkedReplicatedContent( byte contentType, ByteBufAwareMarshal byteBufAwareMarshal )
{
this( contentType, serializer, DEFAULT_CHUNK_SIZE );
this( contentType, byteBufAwareMarshal, DEFAULT_CHUNK_SIZE );
}

@Override
public void marshal( WritableChannel channel ) throws IOException
{
channel.put( contentType );
serializer.marshal( channel );
byteBufAwareMarshal.marshal( channel );
}

@Override
Expand Down Expand Up @@ -98,9 +98,9 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException
{
// extra metadata on first chunk
buffer.writeByte( contentType );
buffer.writeInt( serializer.length() );
buffer.writeInt( byteBufAwareMarshal.length() );
}
if ( !serializer.encode( buffer ) )
if ( !byteBufAwareMarshal.encode( buffer ) )
{
lastByteWasWritten = true;
}
Expand Down
Expand Up @@ -46,9 +46,9 @@
import org.neo4j.storageengine.api.WritableChannel;

import static java.util.Collections.singleton;
import static org.neo4j.causalclustering.messaging.marshalling.Serializer.simple;
import static org.neo4j.causalclustering.messaging.marshalling.ByteBufAwareMarshal.simple;

public class CoreReplicatedContentSerializer extends SafeChannelMarshal<ReplicatedContent>
public class CoreReplicatedContentMarshal extends SafeChannelMarshal<ReplicatedContent>
{
private static final byte TX_CONTENT_TYPE = 0;
private static final byte RAFT_MEMBER_SET_TYPE = 1;
Expand Down
Expand Up @@ -30,12 +30,12 @@
import java.util.List;

import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;

public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder implements AutoCloseable
{
private UnfinishedChunk unfinishedChunk;
private final CoreReplicatedContentSerializer coreReplicatedContentSerializer = new CoreReplicatedContentSerializer();
private final CoreReplicatedContentMarshal coreReplicatedContentMarshal = new CoreReplicatedContentMarshal();
private boolean closed;

@Override
Expand All @@ -55,7 +55,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
int allocationSize = in.readInt();
if ( isLast )
{
out.add( coreReplicatedContentSerializer.read( contentType,
out.add( coreReplicatedContentMarshal.read( contentType,
new NetworkReadableClosableChannelNetty4( in.readSlice( in.readableBytes() ) ) ) );
}
else
Expand All @@ -79,7 +79,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )

if ( isLast )
{
out.add( coreReplicatedContentSerializer.read( unfinishedChunk.contentType,
out.add( coreReplicatedContentMarshal.read( unfinishedChunk.contentType,
new NetworkReadableClosableChannelNetty4( unfinishedChunk.content() ) ) );
unfinishedChunk.release();
unfinishedChunk = null;
Expand Down
Expand Up @@ -32,17 +32,17 @@
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType;

import static org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder.serializable;

public class RaftMessageContentEncoder extends MessageToMessageEncoder<RaftMessages.ClusterIdAwareMessage>
{

private final CoreReplicatedContentSerializer serializer;
private final CoreReplicatedContentMarshal serializer;

public RaftMessageContentEncoder( CoreReplicatedContentSerializer serializer )
public RaftMessageContentEncoder( CoreReplicatedContentMarshal serializer )
{
this.serializer = serializer;
}
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionFactory;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.helpers.Args;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main( String[] args ) throws IOException
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( config.get( raft_log_pruning_strategy ), logProvider ).newInstance();
SegmentedRaftLog log = new SegmentedRaftLog( fileSystem, logDirectory, config.get( raft_log_rotation_size ),
new CoreReplicatedContentSerializer(), logProvider, config.get( raft_log_reader_pool_size ),
new CoreReplicatedContentMarshal(), logProvider, config.get( raft_log_reader_pool_size ),
Clocks.systemClock(), new OnDemandJobScheduler(), pruningStrategy );

long totalCommittedEntries = log.appendIndex(); // Not really, but we need to have a way to pass in the commit index
Expand Down
Expand Up @@ -39,7 +39,7 @@
import org.neo4j.causalclustering.core.state.machines.token.TokenType;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.id.IdType;
Expand Down Expand Up @@ -79,7 +79,7 @@ private SegmentedRaftLog createRaftLog( long rotateAtSize )
LogProvider logProvider = getInstance();
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( "100 entries", logProvider ).newInstance();
return new SegmentedRaftLog( fsRule.get(), logDirectory, rotateAtSize, new CoreReplicatedContentSerializer(),
return new SegmentedRaftLog( fsRule.get(), logDirectory, rotateAtSize, new CoreReplicatedContentMarshal(),
logProvider, 8, Clocks.fakeClock(), new OnDemandJobScheduler(), pruningStrategy );
}

Expand All @@ -88,7 +88,7 @@ private RecoveryProtocol createRecoveryProtocol()
FileNames fileNames = new FileNames( logDirectory );
return new RecoveryProtocol( fsRule.get(), fileNames,
new ReaderPool( 8, getInstance(), fileNames, fsRule.get(), Clocks.fakeClock() ),
new CoreReplicatedContentSerializer(), getInstance() );
new CoreReplicatedContentMarshal(), getInstance() );
}

@Test
Expand Down
Expand Up @@ -42,7 +42,7 @@
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentMarshal;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
import org.neo4j.kernel.impl.transaction.command.Command;
Expand All @@ -55,7 +55,7 @@

public class CoreReplicatedContentMarshalTest
{
private final ChannelMarshal<ReplicatedContent> marshal = new CoreReplicatedContentSerializer();
private final ChannelMarshal<ReplicatedContent> marshal = new CoreReplicatedContentMarshal();

@Test
public void shouldMarshalTransactionReference() throws Exception
Expand Down

0 comments on commit 7f035ca

Please sign in to comment.