Skip to content

Commit

Permalink
Refactor network files and fix tx marshalling in RaftV1
Browse files Browse the repository at this point in the history
The Netty dependent Channels that is used in causal clustering now
implements WritableChannel. Before they implemented FlushableChannel
for no apparent reason. They have also been renamed.

Marshalling for RaftV1 broke throughout development. By using the now
introduced ByteBufBacked interface, the tx marhalling for RaftV1 can be
optimized, writing directly to ioBuffer.
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent 2d01553 commit a18a4bc
Show file tree
Hide file tree
Showing 26 changed files with 208 additions and 126 deletions.
Expand Up @@ -26,13 +26,13 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;


public class FileChunkEncoder extends MessageToByteEncoder<FileChunk> public class FileChunkEncoder extends MessageToByteEncoder<FileChunk>
{ {
@Override @Override
protected void encode( ChannelHandlerContext ctx, FileChunk chunk, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, FileChunk chunk, ByteBuf out ) throws Exception
{ {
FileChunk.marshal().marshal( chunk, new NetworkFlushableByteBuf( out ) ); FileChunk.marshal().marshal( chunk, new NetworkWritableChannel( out ) );
} }
} }
Expand Up @@ -34,7 +34,7 @@
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
Expand Down Expand Up @@ -102,7 +102,7 @@ public static class Encoder extends MessageToByteEncoder<GetIndexFilesRequest>
@Override @Override
protected void encode( ChannelHandlerContext ctx, GetIndexFilesRequest msg, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, GetIndexFilesRequest msg, ByteBuf out ) throws Exception
{ {
new IndexSnapshotRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); new IndexSnapshotRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) );
} }
} }


Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
Expand Down Expand Up @@ -108,7 +108,7 @@ public static class Encoder extends MessageToByteEncoder<GetStoreFileRequest>
@Override @Override
protected void encode( ChannelHandlerContext ctx, GetStoreFileRequest msg, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, GetStoreFileRequest msg, ByteBuf out ) throws Exception
{ {
new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkFlushableByteBuf( out ) ); new GetStoreFileRequest.StoreFileRequestMarshall().marshal( msg, new NetworkWritableChannel( out ) );
} }
} }


Expand Down
Expand Up @@ -28,14 +28,14 @@
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;


public class GetStoreIdResponseEncoder extends MessageToByteEncoder<StoreId> public class GetStoreIdResponseEncoder extends MessageToByteEncoder<StoreId>
{ {
@Override @Override
protected void encode( ChannelHandlerContext ctx, StoreId storeId, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, StoreId storeId, ByteBuf out ) throws Exception
{ {
StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkFlushableByteBuf( out ) ); StoreIdMarshal.INSTANCE.marshal( storeId, new NetworkWritableChannel( out ) );
} }
} }
Expand Up @@ -26,14 +26,14 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;


public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder<PrepareStoreCopyRequest> public class PrepareStoreCopyRequestEncoder extends MessageToByteEncoder<PrepareStoreCopyRequest>
{ {
@Override @Override
protected void encode( ChannelHandlerContext channelHandlerContext, PrepareStoreCopyRequest prepareStoreCopyRequest, ByteBuf byteBuf ) throws Exception protected void encode( ChannelHandlerContext channelHandlerContext, PrepareStoreCopyRequest prepareStoreCopyRequest, ByteBuf byteBuf ) throws Exception
{ {
StoreIdMarshal.INSTANCE.marshal( prepareStoreCopyRequest.getStoreId(), new NetworkFlushableChannelNetty4( byteBuf ) ); StoreIdMarshal.INSTANCE.marshal( prepareStoreCopyRequest.getStoreId(), new BoundedNetworkWritableChannel( byteBuf ) );
} }
} }
Expand Up @@ -39,7 +39,7 @@
import java.util.Objects; import java.util.Objects;


import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;
Expand Down Expand Up @@ -216,7 +216,7 @@ public static class Encoder extends MessageToByteEncoder<PrepareStoreCopyRespons
protected void encode( ChannelHandlerContext channelHandlerContext, PrepareStoreCopyResponse prepareStoreCopyResponse, ByteBuf byteBuf ) protected void encode( ChannelHandlerContext channelHandlerContext, PrepareStoreCopyResponse prepareStoreCopyResponse, ByteBuf byteBuf )
throws Exception throws Exception
{ {
new PrepareStoreCopyResponse.StoreListingMarshal().marshal( prepareStoreCopyResponse, new NetworkFlushableChannelNetty4( byteBuf ) ); new PrepareStoreCopyResponse.StoreListingMarshal().marshal( prepareStoreCopyResponse, new BoundedNetworkWritableChannel( byteBuf ) );
} }
} }


Expand Down
Expand Up @@ -26,7 +26,7 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;


public class TxPullRequestEncoder extends MessageToByteEncoder<TxPullRequest> public class TxPullRequestEncoder extends MessageToByteEncoder<TxPullRequest>
Expand All @@ -35,6 +35,6 @@ public class TxPullRequestEncoder extends MessageToByteEncoder<TxPullRequest>
protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception
{ {
out.writeLong( request.previousTxId() ); out.writeLong( request.previousTxId() );
StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) ); StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new BoundedNetworkWritableChannel( out ) );
} }
} }
Expand Up @@ -26,7 +26,7 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;


Expand All @@ -35,7 +35,7 @@ public class TxPullResponseEncoder extends MessageToByteEncoder<TxPullResponse>
@Override @Override
protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception
{ {
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); NetworkWritableChannel channel = new NetworkWritableChannel( out );
StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel ); StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel );
new LogEntryWriter( channel ).serialize( response.tx() ); new LogEntryWriter( channel ).serialize( response.tx() );
} }
Expand Down
Expand Up @@ -31,7 +31,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;


import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.BoundedNetworkWritableChannel;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.StringMarshal; import org.neo4j.causalclustering.messaging.marshalling.StringMarshal;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
Expand Down Expand Up @@ -97,7 +97,7 @@ public static ReplicatedTokenRequest unmarshal( ByteBuf buffer )
public static byte[] commandBytes( Collection<StorageCommand> commands ) public static byte[] commandBytes( Collection<StorageCommand> commands )
{ {
ByteBuf commandBuffer = Unpooled.buffer(); ByteBuf commandBuffer = Unpooled.buffer();
NetworkFlushableChannelNetty4 channel = new NetworkFlushableChannelNetty4( commandBuffer ); BoundedNetworkWritableChannel channel = new BoundedNetworkWritableChannel( commandBuffer );


try try
{ {
Expand Down
@@ -1,3 +1,25 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.core.state.machines.tx; package org.neo4j.causalclustering.core.state.machines.tx;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
Expand All @@ -10,14 +32,14 @@
import java.util.Queue; import java.util.Queue;


import org.neo4j.causalclustering.helper.ErrorHandler; import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.messaging.BoundedNetworkChannel; import org.neo4j.causalclustering.messaging.ChunkingNetworkChannel;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;


class ChunkedTransaction implements ChunkedInput<ByteBuf> class ChunkedTransaction implements ChunkedInput<ByteBuf>
{ {
private static final int CHUNK_SIZE = 32 * 1024; private static final int CHUNK_SIZE = 32 * 1024;
private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter; private final ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter;
private BoundedNetworkChannel channel; private ChunkingNetworkChannel channel;
private Queue<ByteBuf> chunks = new LinkedList<>(); private Queue<ByteBuf> chunks = new LinkedList<>();


ChunkedTransaction( TransactionRepresentation tx ) ChunkedTransaction( TransactionRepresentation tx )
Expand Down Expand Up @@ -80,7 +102,7 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
if ( channel == null ) if ( channel == null )
{ {
// Ensure that the written buffers does not overflow the allocators chunk size. // Ensure that the written buffers does not overflow the allocators chunk size.
channel = new BoundedNetworkChannel( allocator, CHUNK_SIZE, chunks ); channel = new ChunkingNetworkChannel( allocator, CHUNK_SIZE, chunks );
/* /*
Unknown length. The reason for sending this int is to avoid conflicts with Raft V1. Unknown length. The reason for sending this int is to avoid conflicts with Raft V1.
This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1. This way, the serialized result of this object is identical to a serialized byte array. Which is the only type in Raft V1.
Expand Down
Expand Up @@ -28,8 +28,10 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;


import org.neo4j.causalclustering.messaging.ByteBufBacked;
import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder; import org.neo4j.causalclustering.messaging.marshalling.ByteArrayChunkedEncoder;
import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel; import org.neo4j.causalclustering.messaging.marshalling.OutputStreamWritableChannel;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


Expand Down Expand Up @@ -70,20 +72,43 @@ public static void marshal( WritableChannel writableChannel, ByteArrayReplicated


public static void marshal( WritableChannel writableChannel, TransactionRepresentationReplicatedTransaction replicatedTransaction ) throws IOException public static void marshal( WritableChannel writableChannel, TransactionRepresentationReplicatedTransaction replicatedTransaction ) throws IOException
{ {
/* if ( writableChannel instanceof ByteBufBacked )
Unknown length. This method will never be used in production. When a ReplicatedTransaction is serialized it has already passed over the network {
and a more efficient marshalling is used in ByteArrayReplicatedTransaction. /*
*/ * Marshals more efficiently if Channel is going over the network. In practice, this means maintaining support for
ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( replicatedTransaction.tx() ); * RaftV1 without loosing performance
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 ); */
OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream ); ByteBuf buffer = ((ByteBufBacked) writableChannel).byteBuf();
int metaDataIndex = buffer.writerIndex();
int txStartIndex = metaDataIndex + Integer.BYTES;
// leave room for length to be set later.
buffer.writerIndex( txStartIndex );
writeTx( writableChannel, replicatedTransaction.tx() );
int txLength = buffer.writerIndex() - txStartIndex;
buffer.setInt( metaDataIndex, txLength );
}
else
{
/*
* Unknown length. This should only be reached in tests. When a ReplicatedTransaction is marshaled to file it has already passed over the network
* and is of a different type. More efficient marshalling is used in ByteArrayReplicatedTransaction.
*/
ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 1024 );
OutputStreamWritableChannel outputStreamWritableChannel = new OutputStreamWritableChannel( outputStream );
writeTx( outputStreamWritableChannel, replicatedTransaction.tx() );
int length = outputStream.size();
writableChannel.putInt( length );
writableChannel.put( outputStream.toByteArray(), length );
}
}

private static void writeTx( WritableChannel writableChannel, TransactionRepresentation tx ) throws IOException
{
ReplicatedTransactionFactory.TransactionRepresentationWriter txWriter = transactionalRepresentationWriter( tx );
while ( txWriter.canWrite() ) while ( txWriter.canWrite() )
{ {
txWriter.write( outputStreamWritableChannel ); txWriter.write( writableChannel );
} }
int length = outputStream.size();
writableChannel.putInt( length );
writableChannel.put( outputStream.toByteArray(), length );
} }


public static ChunkedInput<ByteBuf> encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction ) public static ChunkedInput<ByteBuf> encode( TransactionRepresentationReplicatedTransaction representationReplicatedTransaction )
Expand Down
Expand Up @@ -26,13 +26,13 @@
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;


import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.NetworkWritableChannel;


public class CoreSnapshotEncoder extends MessageToByteEncoder<CoreSnapshot> public class CoreSnapshotEncoder extends MessageToByteEncoder<CoreSnapshot>
{ {
@Override @Override
protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, ByteBuf out ) throws Exception protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, ByteBuf out ) throws Exception
{ {
new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( out ) ); new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkWritableChannel( out ) );
} }
} }

0 comments on commit a18a4bc

Please sign in to comment.