Skip to content

Commit

Permalink
Chunk replicated transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent 47c5ad1 commit c66f8e6
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 71 deletions.
Expand Up @@ -67,7 +67,7 @@ public class ClusterSeedingIT
private DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule(); private DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule();


@Rule @Rule
public RuleChain rules = RuleChain.outerRule( fileSystemRule ).around( testDir ).around( suppressOutput ); public RuleChain rules = RuleChain.outerRule( fileSystemRule ).around( testDir );


private Cluster backupCluster; private Cluster backupCluster;
private Cluster cluster; private Cluster cluster;
Expand Down
Expand Up @@ -20,11 +20,13 @@
package org.neo4j.causalclustering.core.consensus.protocol.v2; package org.neo4j.causalclustering.core.consensus.protocol.v2;


import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedWriteHandler;


import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer; import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder; import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
Expand Down Expand Up @@ -72,6 +74,8 @@ public void install( Channel channel ) throws Exception
.addFraming() .addFraming()
.add( "raft_message_encoder", new RaftMessageEncoder() ) .add( "raft_message_encoder", new RaftMessageEncoder() )
.add( "raft_content_type_encoder", new ContentTypeEncoder() ) .add( "raft_content_type_encoder", new ContentTypeEncoder() )
.add( "raft_chunked_replicated_content", new ReplicatedContentChunkEncoder() )
.add( "raft_chunked_writer", new ChunkedWriteHandler( ) )
.add( "raft_content_encoder", new SerializableContentEncoder() ) .add( "raft_content_encoder", new SerializableContentEncoder() )
.add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() ) .add( "raft_log_entry_encoder", new RaftLogEntryTermEncoder() )
.add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) ) .add( "raft_message_content_encoder", new RaftMessageContentEncoder( new CoreReplicatedContentSerializer() ) )
Expand Down
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.machines.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedStream;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunk;
import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent;
import org.neo4j.storageengine.api.WritableChannel;

public class ChunkedReplicatedTransactionInput implements SerializableContent, ChunkedInput<ReplicatedContentChunk>
{

private final ChunkedStream chunkedStream;
private final byte txContentType;
private final ReplicatedTransaction replicatedTransaction;

public ChunkedReplicatedTransactionInput( byte txContentType, ReplicatedTransaction replicatedTransaction, int chunkSize )
{
if ( chunkSize < 4 )
{
throw new IllegalArgumentException( "Chunk size must be at least 4 bytes" );
}
this.txContentType = txContentType;
this.replicatedTransaction = replicatedTransaction;
chunkedStream = new ChunkedStream( new ByteArrayInputStream( replicatedTransaction.getTxBytes() ), chunkSize - 3 );
}

public ChunkedReplicatedTransactionInput( byte txContentType, ReplicatedTransaction replicatedTransaction )
{
this.txContentType = txContentType;
this.replicatedTransaction = replicatedTransaction;
chunkedStream = new ChunkedStream( new ByteArrayInputStream( replicatedTransaction.getTxBytes() ) );
}

@Override
public void serialize( WritableChannel channel ) throws IOException
{
channel.put( txContentType );
ReplicatedTransactionSerializer.marshal( replicatedTransaction, channel );
}

@Override
public boolean isEndOfInput() throws Exception
{
return chunkedStream.isEndOfInput();
}

@Override
public void close() throws Exception
{
chunkedStream.close();
}

@Override
public ReplicatedContentChunk readChunk( ChannelHandlerContext ctx ) throws Exception
{
return readChunk( ctx.alloc() );
}

@Override
public ReplicatedContentChunk readChunk( ByteBufAllocator allocator ) throws Exception
{
boolean isFirst = progress() == 0;
ByteBuf byteBuf = chunkedStream.readChunk( allocator );
if ( byteBuf == null )
{
byteBuf = new EmptyByteBuf( allocator );
}
return new ReplicatedContentChunk( txContentType, isFirst, isEndOfInput(), byteBuf );
}

@Override
public long length()
{
return -1;
}

@Override
public long progress()
{
return chunkedStream.progress();
}
}
Expand Up @@ -26,6 +26,7 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.causalclustering.messaging.marshalling.v2.SerializableContent;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


Expand Down
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;

public class ReplicatedContentChunk extends DefaultByteBufHolder
{
private final byte txContentType;
private final boolean isFirst;
private final boolean isLast;

public ReplicatedContentChunk( byte txContentType, boolean isFirst, boolean isLast, ByteBuf data )
{
super( data );
this.txContentType = txContentType;
this.isFirst = isFirst;
this.isLast = isLast;
}

public boolean isFirst()
{
return isFirst;
}

public boolean isLast()
{
return isLast;
}

private byte txContentType()
{
return txContentType;
}

public void serialize( ByteBuf out )
{
out.writeByte( txContentType() );
out.writeByte( isFirstByte() );
out.writeByte( isLastByte() );
out.writeBytes( content() );
}

public static ReplicatedContentChunk deSerialize( ByteBuf in )
{
byte txContentType = in.readByte();
boolean isFirst = in.readByte() == (byte) 1;
boolean isLast = in.readByte() == (byte) 1;
return new ReplicatedContentChunk( txContentType, isFirst, isLast, in.readRetainedSlice( in.readableBytes() ) );
}

private byte isLastByte()
{
return isLast ? (byte) 1 : (byte) 0;
}

private byte isFirstByte()
{
return isFirst ? (byte) 1 : (byte) 0;
}
}
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class ReplicatedContentChunkEncoder extends MessageToByteEncoder<ReplicatedContentChunk>
{
@Override
protected void encode( ChannelHandlerContext ctx, ReplicatedContentChunk msg, ByteBuf out )
{
msg.serialize( out );
}
}
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling.v2;

import java.util.function.Function;

public class ContentBuilder<CONTENT>
{
private boolean isComplete;
private Function<CONTENT,CONTENT> contentFunction;

public static <C> ContentBuilder<C> emptyUnfinished()
{
return new ContentBuilder<>( content -> content, false );
}

ContentBuilder( Function<CONTENT,CONTENT> contentFunction, boolean isComplete )
{
this.contentFunction = contentFunction;
this.isComplete = isComplete;
}

ContentBuilder( CONTENT replicatedContent )
{
this.isComplete = true;
this.contentFunction = replicatedContent1 -> replicatedContent;
}

public boolean isComplete()
{
return isComplete;
}

public ContentBuilder<CONTENT> combine( ContentBuilder<CONTENT> replicatedContentBuilder )
{
if ( isComplete )
{
throw new IllegalStateException( "This content builder has already completed and cannot be combined." );
}
contentFunction = contentFunction.compose( replicatedContentBuilder.contentFunction );
isComplete = replicatedContentBuilder.isComplete;
return this;
}

public CONTENT build()
{
if ( !isComplete )
{
throw new IllegalStateException( "Cannot build unfinished content" );
}
return contentFunction.apply( null );
}
}

0 comments on commit c66f8e6

Please sign in to comment.