Skip to content

Commit

Permalink
Remove old replicated content marshal
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Jun 11, 2018
1 parent b888f22 commit 6f08d1e
Show file tree
Hide file tree
Showing 23 changed files with 87 additions and 390 deletions.
Expand Up @@ -27,10 +27,10 @@

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog;
import org.neo4j.causalclustering.core.consensus.log.MonitoredRaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategy;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
Expand All @@ -43,21 +43,23 @@
import org.neo4j.causalclustering.core.consensus.term.MonitoredTermStateStorage;
import org.neo4j.causalclustering.core.consensus.term.TermState;
import org.neo4j.causalclustering.core.consensus.vote.VoteState;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.SendToMyself;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.core.state.storage.StateStorage;
import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.RaftCoreTopologyConnector;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

import static org.neo4j.causalclustering.core.CausalClusteringSettings.catchup_batch_size;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.join_catch_up_timeout;
Expand Down Expand Up @@ -90,7 +92,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,

LogProvider logProvider = logging.getInternalLogProvider();

final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal();
final CoreReplicatedContentSerializer marshal = new CoreReplicatedContentSerializer();

RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider,
platformModule.jobScheduler );
Expand Down Expand Up @@ -159,8 +161,8 @@ private LeaderAvailabilityTimers createElectionTiming( Config config, TimerServi
return new LeaderAvailabilityTimers( electionTimeout, electionTimeout.dividedBy( 3 ), systemClock(), timerService, logProvider );
}

private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem,
File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider,
private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, File clusterStateDirectory,
ChannelMarshal<ReplicatedContent> marshal, LogProvider logProvider,
JobScheduler scheduler )
{
EnterpriseCoreEditionModule.RaftLogImplementation raftLogImplementation =
Expand Down
Expand Up @@ -29,8 +29,8 @@

import org.neo4j.causalclustering.core.consensus.log.EntryRecord;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.cursor.IOCursor;
import org.neo4j.helpers.Args;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
Expand Down Expand Up @@ -101,7 +101,7 @@ public static void main( String[] args )

try ( DefaultFileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction() )
{
new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentMarshal() )
new DumpSegmentedRaftLog( fileSystem, new CoreReplicatedContentSerializer() )
.dump( fileAsString, printer.getFor( fileAsString ) );
}
catch ( IOException | DisposedException | DamagedLogStorageException e )
Expand Down
Expand Up @@ -28,8 +28,8 @@
import java.util.List;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageEncoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void install( Channel channel ) throws Exception
clientPipelineBuilderFactory.client( channel, log )
.modify( modifiers )
.addFraming()
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentSerializer() ) )
.install();
}

Expand Down
Expand Up @@ -30,8 +30,8 @@
import java.util.List;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.v1.RaftMessageDecoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void install( Channel channel ) throws Exception
pipelineBuilderFactory.server( channel, log )
.modify( modifiers )
.addFraming()
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentSerializer(), Clock.systemUTC() ) )
.add( "raft_handler", raftMessageHandler )
.install();
}
Expand Down
Expand Up @@ -26,8 +26,8 @@
import java.util.List;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.ReplicatedContentChunkEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ReplicatedContentChunkEncoder;
import org.neo4j.causalclustering.messaging.marshalling.CoreReplicatedContentSerializer;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.ContentTypeEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftLogEntryTermEncoder;
import org.neo4j.causalclustering.messaging.marshalling.v2.encoding.RaftMessageContentEncoder;
Expand Down
Expand Up @@ -28,7 +28,7 @@
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeProtocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.ContentTypeProtocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ReplicatedContentDecoder;
Expand Down
Expand Up @@ -26,13 +26,13 @@
import java.util.Objects;
import java.util.UUID;

import org.neo4j.causalclustering.messaging.marshalling.v1.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalOperationId;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.marshalling.Serializer;
import org.neo4j.causalclustering.messaging.marshalling.ContentBuilder;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

/**
* A uniquely identifiable operation.
Expand Down Expand Up @@ -77,19 +77,25 @@ public long size()
return content.size();
}

public void serialize( WritableChannel channel ) throws IOException
/**
* This this consumer ignores the content which is handles by its own serializer.
*
* @return Consumer with instructions for writing to channel.
*/
public Serializer serialize()
{
channel.putLong( globalSession().sessionId().getMostSignificantBits() );
channel.putLong( globalSession().sessionId().getLeastSignificantBits() );
new MemberId.Marshal().marshal( globalSession().owner(), channel );

channel.putLong( operationId.localSessionId() );
channel.putLong( operationId.sequenceNumber() );
return Serializer.simple( channel1 ->
{
channel1.putLong( globalSession().sessionId().getMostSignificantBits() );
channel1.putLong( globalSession().sessionId().getLeastSignificantBits() );
new MemberId.Marshal().marshal( globalSession().owner(), channel1 );

new CoreReplicatedContentMarshal().marshal( content, channel );
channel1.putLong( operationId.localSessionId() );
channel1.putLong( operationId.sequenceNumber() );
} );
}

public static DistributedOperation deserialize( ReadableChannel channel ) throws IOException, EndOfStreamException
public static ContentBuilder<ReplicatedContent> deserialize( ReadableChannel channel ) throws IOException, EndOfStreamException
{
long mostSigBits = channel.getLong();
long leastSigBits = channel.getLong();
Expand All @@ -100,8 +106,7 @@ public static DistributedOperation deserialize( ReadableChannel channel ) throws
long sequenceNumber = channel.getLong();
LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber );

ReplicatedContent content = new CoreReplicatedContentMarshal().unmarshal( channel );
return new DistributedOperation( content, globalSession, localOperationId );
return ContentBuilder.unfinished( subContent -> new DistributedOperation( subContent, globalSession, localOperationId ) );
}

@Override
Expand Down
Expand Up @@ -28,7 +28,7 @@

import org.neo4j.storageengine.api.WritableChannel;

public class ChunkedReplicatedContent implements SerializableContent, ChunkedInput<ReplicatedContentChunk>
public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ReplicatedContentChunk>
{

private static final int DEFAULT_CHUNK_SIZE = 8192;
Expand All @@ -55,7 +55,7 @@ public ChunkedReplicatedContent( byte contentType, Serializer serializer )
}

@Override
public void serialize( WritableChannel channel ) throws IOException
public void marshal( WritableChannel channel ) throws IOException
{
channel.put( contentType );
serializer.marshal( channel );
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling.v2;
package org.neo4j.causalclustering.messaging.marshalling;

import java.util.function.Function;

Expand All @@ -31,16 +31,20 @@ public static <C> ContentBuilder<C> emptyUnfinished()
return new ContentBuilder<>( content -> content, false );
}

ContentBuilder( Function<CONTENT,CONTENT> contentFunction, boolean isComplete )
public static <C> ContentBuilder<C> unfinished( Function<C,C> contentFunction )
{
this.contentFunction = contentFunction;
this.isComplete = isComplete;
return new ContentBuilder<>( contentFunction, false );
}

ContentBuilder( CONTENT replicatedContent )
public static <C> ContentBuilder<C> finished( C content )
{
this.isComplete = true;
this.contentFunction = replicatedContent1 -> replicatedContent;
return new ContentBuilder<>( c1 -> content, true );
}

private ContentBuilder( Function<CONTENT,CONTENT> contentFunction, boolean isComplete )
{
this.contentFunction = contentFunction;
this.isComplete = isComplete;
}

public boolean isComplete()
Expand Down
Expand Up @@ -17,20 +17,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging.marshalling.v2;
package org.neo4j.causalclustering.messaging.marshalling;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;

import org.neo4j.causalclustering.core.consensus.NewLeaderBarrier;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetSerializer;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalOperationId;
import org.neo4j.causalclustering.core.state.machines.dummy.DummyRequest;
import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequest;
import org.neo4j.causalclustering.core.state.machines.id.ReplicatedIdAllocationRequestSerializer;
Expand All @@ -41,10 +38,7 @@
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransactionSerializer;
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.marshalling.ChunkedReplicatedContent;
import org.neo4j.causalclustering.messaging.marshalling.SerializableContent;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;

Expand All @@ -62,7 +56,7 @@ public class CoreReplicatedContentSerializer extends SafeChannelMarshal<Replicat
private static final byte DISTRIBUTED_OPERATION = 7;
private static final byte DUMMY_REQUEST = 8;

public Collection<SerializableContent> toSerializable( ReplicatedContent content )
public Collection<Marshal> toSerializable( ReplicatedContent content )
{
if ( content instanceof ReplicatedTransaction )
{
Expand Down Expand Up @@ -96,16 +90,8 @@ else if ( content instanceof ReplicatedLockTokenRequest )
}
else if ( content instanceof DistributedOperation )
{
LinkedList<SerializableContent> list = new LinkedList<>( toSerializable( ((DistributedOperation) content).content() ) );
list.add( 0, new ChunkedReplicatedContent( DISTRIBUTED_OPERATION, simple( channel ->
{
channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getMostSignificantBits() );
channel.putLong( ((DistributedOperation) content).globalSession().sessionId().getLeastSignificantBits() );
new MemberId.Marshal().marshal( ((DistributedOperation) content).globalSession().owner(), channel );

channel.putLong( ((DistributedOperation) content).operationId().localSessionId() );
channel.putLong( ((DistributedOperation) content).operationId().sequenceNumber() );
} ) ) );
LinkedList<Marshal> list = new LinkedList<>( toSerializable( ((DistributedOperation) content).content() ) );
list.add( 0, new ChunkedReplicatedContent( DISTRIBUTED_OPERATION, ((DistributedOperation) content).serialize() ) );
return list;
}
else if ( content instanceof DummyRequest )
Expand All @@ -124,32 +110,23 @@ public ContentBuilder<ReplicatedContent> read( byte contentType, ReadableChannel
switch ( contentType )
{
case TX_CONTENT_TYPE:
return new ContentBuilder<>( ReplicatedTransactionSerializer.unmarshal( channel ) );
return ContentBuilder.finished( ReplicatedTransactionSerializer.unmarshal( channel ) );
case RAFT_MEMBER_SET_TYPE:
return new ContentBuilder<>( MemberIdSetSerializer.unmarshal( channel ) );
return ContentBuilder.finished( MemberIdSetSerializer.unmarshal( channel ) );
case ID_RANGE_REQUEST_TYPE:
return new ContentBuilder<>( ReplicatedIdAllocationRequestSerializer.unmarshal( channel ) );
return ContentBuilder.finished( ReplicatedIdAllocationRequestSerializer.unmarshal( channel ) );
case TOKEN_REQUEST_TYPE:
return new ContentBuilder<>( ReplicatedTokenRequestSerializer.unmarshal( channel ) );
return ContentBuilder.finished( ReplicatedTokenRequestSerializer.unmarshal( channel ) );
case NEW_LEADER_BARRIER_TYPE:
return new ContentBuilder<>( new NewLeaderBarrier() );
return ContentBuilder.finished( new NewLeaderBarrier() );
case LOCK_TOKEN_REQUEST:
return new ContentBuilder<>( ReplicatedLockTokenSerializer.unmarshal( channel ) );
return ContentBuilder.finished( ReplicatedLockTokenSerializer.unmarshal( channel ) );
case DISTRIBUTED_OPERATION:
{
long mostSigBits = channel.getLong();
long leastSigBits = channel.getLong();
MemberId owner = new MemberId.Marshal().unmarshal( channel );
GlobalSession globalSession = new GlobalSession( new UUID( mostSigBits, leastSigBits ), owner );

long localSessionId = channel.getLong();
long sequenceNumber = channel.getLong();
LocalOperationId localOperationId = new LocalOperationId( localSessionId, sequenceNumber );

return new ContentBuilder<>( replicatedContent -> new DistributedOperation( replicatedContent, globalSession, localOperationId ), false );
return DistributedOperation.deserialize( channel );
}
case DUMMY_REQUEST:
return new ContentBuilder<>( DummyRequest.Marshal.INSTANCE.unmarshal( channel ) );
return ContentBuilder.finished( DummyRequest.Marshal.INSTANCE.unmarshal( channel ) );
default:
throw new IllegalStateException( "Not a recognized content type: " + contentType );
}
Expand All @@ -158,9 +135,9 @@ public ContentBuilder<ReplicatedContent> read( byte contentType, ReadableChannel
@Override
public void marshal( ReplicatedContent coreReplicatedContent, WritableChannel channel ) throws IOException
{
for ( SerializableContent serializableContent : toSerializable( coreReplicatedContent ) )
for ( Marshal marshal : toSerializable( coreReplicatedContent ) )
{
serializableContent.serialize( channel );
marshal.marshal( channel );
}
}

Expand Down
Expand Up @@ -23,7 +23,12 @@

import org.neo4j.storageengine.api.WritableChannel;

public interface SerializableContent
public interface Marshal
{
void serialize( WritableChannel channel ) throws IOException;
/**
* Writes all content to the channel
*
* @param channel to where data is written.
*/
void marshal( WritableChannel channel ) throws IOException;
}

0 comments on commit 6f08d1e

Please sign in to comment.