Skip to content

Commit

Permalink
Remove ByteBuf and ByteBuffer marshalling.
Browse files Browse the repository at this point in the history
In favor of ReadableChannel and WritableChannel.
  • Loading branch information
apcj authored and Max Sumrall committed Jun 16, 2016
1 parent 2e82349 commit d8dbe0c
Show file tree
Hide file tree
Showing 29 changed files with 285 additions and 823 deletions.
Expand Up @@ -27,7 +27,6 @@

public class FileHeaderEncoder extends MessageToMessageEncoder<FileHeader>
{

@Override
protected void encode( ChannelHandlerContext ctx, FileHeader msg, List<Object> out ) throws Exception
{
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.netty.handler.codec.MessageToMessageDecoder;

import org.neo4j.coreedge.catchup.CatchupClientProtocol;
import org.neo4j.coreedge.raft.net.NetworkReadableClosableChannelNetty4;
import org.neo4j.coreedge.raft.state.CoreSnapshot;

import static org.neo4j.coreedge.catchup.CatchupClientProtocol.NextMessage;
Expand All @@ -45,7 +46,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out
{
if ( protocol.isExpecting( NextMessage.CORE_SNAPSHOT ) )
{
out.add( new CoreSnapshot.Marshal().unmarshal( msg ) );
out.add( new CoreSnapshot.Marshal().unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ) );
}
else
{
Expand Down
Expand Up @@ -33,7 +33,7 @@ public class CoreSnapshotEncoder extends MessageToMessageEncoder<CoreSnapshot>
protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, List<Object> out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
new CoreSnapshot.Marshal().marshal( coreSnapshot, encoded );
new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( encoded ) );
out.add( encoded );
}
}
Expand Up @@ -29,7 +29,6 @@

public class StoreCopyFinishedResponseEncoder extends MessageToMessageEncoder<StoreCopyFinishedResponse>
{

@Override
protected void encode( ChannelHandlerContext ctx, StoreCopyFinishedResponse msg, List<Object> out ) throws Exception
{
Expand Down
Expand Up @@ -19,12 +19,12 @@
*/
package org.neo4j.coreedge.catchup.tx.core;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

import org.neo4j.com.CommittedTransactionSerializer;
import org.neo4j.coreedge.catchup.storecopy.core.NetworkFlushableByteBuf;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponse;
Expand All @@ -36,8 +36,9 @@ public class TxPullResponseEncoder extends MessageToMessageEncoder<TxPullRespons
protected void encode( ChannelHandlerContext ctx, TxPullResponse response, List<Object> out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
StoreIdMarshal.marshal( response.storeId(), encoded );
new CommittedTransactionSerializer( new NetworkFlushableByteBuf( encoded ) ).visit( response.tx() );
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( encoded );
StoreIdMarshal.marshal( response.storeId(), channel );
new CommittedTransactionSerializer( channel ).visit( response.tx() );
out.add( encoded );
}
}

This file was deleted.

Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.coreedge.catchup.tx.edge;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

import org.neo4j.coreedge.catchup.CatchupClientProtocol;
import org.neo4j.coreedge.raft.net.NetworkReadableClosableChannelNetty4;
import org.neo4j.coreedge.raft.replication.storeid.StoreIdMarshal;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.store.StoreId;
Expand All @@ -51,12 +52,11 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out
{
if ( protocol.isExpecting( NextMessage.TX_PULL_RESPONSE ) )
{
StoreId storeId = StoreIdMarshal.unmarshal( msg );

NetworkReadableClosableByteBuf logChannel = new NetworkReadableClosableByteBuf( msg );
LogEntryReader<NetworkReadableClosableByteBuf> reader = new VersionAwareLogEntryReader<>(
NetworkReadableClosableChannelNetty4 logChannel = new NetworkReadableClosableChannelNetty4( msg );
StoreId storeId = StoreIdMarshal.unmarshal( logChannel );
LogEntryReader<NetworkReadableClosableChannelNetty4> reader = new VersionAwareLogEntryReader<>(
new RecordStorageCommandReaderFactory() );
PhysicalTransactionCursor<NetworkReadableClosableByteBuf> transactionCursor =
PhysicalTransactionCursor<NetworkReadableClosableChannelNetty4> transactionCursor =
new PhysicalTransactionCursor<>( logChannel, reader );

transactionCursor.next();
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.coreedge.raft;

import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -33,14 +35,12 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.server.ByteBufMarshal;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
Expand All @@ -50,14 +50,14 @@ public class RaftServer<MEMBER> extends LifecycleAdapter implements Inbound<Raft
{
private final ListenSocketAddress listenAddress;
private final Log log;
private final ByteBufMarshal<ReplicatedContent> marshal;
private final ChannelMarshal<ReplicatedContent> marshal;
private MessageHandler<RaftMessages.RaftMessage<MEMBER>> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

public RaftServer( ByteBufMarshal<ReplicatedContent> marshal, ListenSocketAddress listenAddress, LogProvider logProvider )
public RaftServer( ChannelMarshal<ReplicatedContent> marshal, ListenSocketAddress listenAddress, LogProvider logProvider )
{
this.marshal = marshal;
this.listenAddress = listenAddress;
Expand Down

0 comments on commit d8dbe0c

Please sign in to comment.