Skip to content

Commit

Permalink
Protocol defined by a category and an implementation.
Browse files Browse the repository at this point in the history
Implementation for Raft is Integer, for modifier String.
  • Loading branch information
andrewkerr9000 committed Mar 12, 2018
1 parent 71f7d07 commit 91a2a43
Show file tree
Hide file tree
Showing 47 changed files with 613 additions and 450 deletions.
Expand Up @@ -68,15 +68,14 @@
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocol;
import org.neo4j.causalclustering.protocol.Protocol.ModifierProtocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.causalclustering.protocol.handshake.SupportedProtocols;
import org.neo4j.com.storecopy.StoreUtil;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -241,8 +240,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();

SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config );
SupportedProtocols<ApplicationProtocol> supportedApplicationProtocol = supportedProtocolCreator.createSupportedRaftProtocol();
Collection<SupportedProtocols<ModifierProtocol>> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();
ApplicationSupportedProtocols supportedApplicationProtocol = supportedProtocolCreator.createSupportedRaftProtocol();
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();

ApplicationProtocolRepository applicationProtocolRepository =
new ApplicationProtocolRepository( Protocol.ApplicationProtocols.values(), supportedApplicationProtocol );
Expand Down
Expand Up @@ -43,14 +43,13 @@
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocol;
import org.neo4j.causalclustering.protocol.Protocol.ModifierProtocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.SupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
Expand All @@ -62,19 +61,19 @@ class RaftServerModule
private final PlatformModule platformModule;
private final ConsensusModule consensusModule;
private final IdentityModule identityModule;
private final SupportedProtocols<ApplicationProtocol> supportedApplicationProtocol;
private final ApplicationSupportedProtocols supportedApplicationProtocol;
private final LocalDatabase localDatabase;
private final MessageLogger<MemberId> messageLogger;
private final LogProvider logProvider;
private final NettyPipelineBuilderFactory pipelineBuilderFactory;
private final TopologyService topologyService;
private final Collection<SupportedProtocols<ModifierProtocol>> supportedModifierProtocols;
private final Collection<ModifierSupportedProtocols> supportedModifierProtocols;
private final RaftServer raftServer;

private RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule,
LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger<MemberId> messageLogger,
CoreTopologyService topologyService, SupportedProtocols<ApplicationProtocol> supportedApplicationProtocol,
Collection<SupportedProtocols<ModifierProtocol>> supportedModifierProtocols )
CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol,
Collection<ModifierSupportedProtocols> supportedModifierProtocols )
{
this.platformModule = platformModule;
this.consensusModule = consensusModule;
Expand All @@ -94,8 +93,8 @@ private RaftServerModule( PlatformModule platformModule, ConsensusModule consens

static RaftServerModule createAndStart( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule,
CoreServerModule coreServerModule, LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory,
MessageLogger<MemberId> messageLogger, CoreTopologyService topologyService, SupportedProtocols<ApplicationProtocol> supportedApplicationProtocol,
Collection<SupportedProtocols<ModifierProtocol>> supportedModifierProtocols )
MessageLogger<MemberId> messageLogger, CoreTopologyService topologyService, ApplicationSupportedProtocols supportedApplicationProtocol,
Collection<ModifierSupportedProtocols> supportedModifierProtocols )
{
return new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineBuilderFactory, messageLogger,
topologyService, supportedApplicationProtocol, supportedModifierProtocols );
Expand Down
Expand Up @@ -25,7 +25,8 @@
import java.util.stream.Stream;

import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.handshake.SupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.stream.Streams;
Expand All @@ -39,37 +40,37 @@ public SupportedProtocolCreator( Config config )
this.config = config;
}

public SupportedProtocols<Protocol.ApplicationProtocol> createSupportedRaftProtocol()
public ApplicationSupportedProtocols createSupportedRaftProtocol()
{
List<Integer> raftVersions = config.get( CausalClusteringSettings.raft_versions );
return new SupportedProtocols<>( Protocol.ApplicationProtocolIdentifier.RAFT, raftVersions );
return new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, raftVersions );
}

public List<SupportedProtocols<Protocol.ModifierProtocol>> createSupportedModifierProtocols()
public List<ModifierSupportedProtocols> createSupportedModifierProtocols()
{
SupportedProtocols<Protocol.ModifierProtocol> supportedCompression = compressionProtocolVersions();
ModifierSupportedProtocols supportedCompression = compressionProtocolVersions();

return Stream.of( supportedCompression )
.filter( supportedProtocols -> !supportedProtocols.versions().isEmpty() )
.collect( Collectors.toList() );
}

private SupportedProtocols<Protocol.ModifierProtocol> compressionProtocolVersions()
private ModifierSupportedProtocols compressionProtocolVersions()
{
return modifierProtocolVersions( CausalClusteringSettings.compression_versions, Protocol.ModifierProtocolIdentifier.COMPRESSION );
return modifierProtocolVersions( CausalClusteringSettings.compression_versions, Protocol.ModifierProtocolCategory.COMPRESSION );
}

private SupportedProtocols<Protocol.ModifierProtocol> modifierProtocolVersions(
Setting<List<String>> compressionVersions, Protocol.ModifierProtocolIdentifier identifier )
private ModifierSupportedProtocols modifierProtocolVersions(
Setting<List<String>> compressionVersions, Protocol.ModifierProtocolCategory identifier )
{
List<String> compressionAlgorithms = config.get( compressionVersions );
List<Integer> versions = compressionAlgorithms.stream()
List<String> versions = compressionAlgorithms.stream()
.map( Protocol.ModifierProtocols::fromFriendlyName )
.flatMap( Streams::ofOptional )
.filter( protocol -> Objects.equals( protocol.identifier(), identifier.canonicalName() ) )
.map( Protocol.ModifierProtocols::version )
.filter( protocol -> Objects.equals( protocol.category(), identifier.canonicalName() ) )
.map( Protocol.ModifierProtocols::implementation )
.collect( Collectors.toList() );

return new SupportedProtocols<>( identifier, versions );
return new ModifierSupportedProtocols( identifier, versions );
}
}
Expand Up @@ -93,8 +93,8 @@ private <T extends SocketAddress> Object[] buildRow( Pair<T,ProtocolStack> conne
{
orientation,
socketAddress.toString(),
protocolStack.applicationProtocol().identifier(),
(long) protocolStack.applicationProtocol().version(),
protocolStack.applicationProtocol().category(),
(long) protocolStack.applicationProtocol().implementation(),
modifierString( protocolStack )
};
}
Expand All @@ -104,7 +104,7 @@ private String modifierString( ProtocolStack protocolStack )
return protocolStack
.modifierProtocols()
.stream()
.map( Protocol.ModifierProtocol::friendlyName )
.map( Protocol.ModifierProtocol::implementation )
.collect( Collectors.joining( ",", "[", "]") );
}
}
Expand Up @@ -23,22 +23,22 @@
import java.util.Optional;
import java.util.stream.Stream;

public interface Protocol
public interface Protocol<IMPL extends Comparable<IMPL>>
{
String identifier();
String category();

int version();
IMPL implementation();

interface Identifier<T extends Protocol>
interface Category<T extends Protocol>
{
String canonicalName();
}

interface ApplicationProtocol extends Protocol
interface ApplicationProtocol extends Protocol<Integer>
{
}

enum ApplicationProtocolIdentifier implements Identifier<ApplicationProtocol>
enum ApplicationProtocolCategory implements Category<ApplicationProtocol>
{
RAFT,
CATCHUP;
Expand All @@ -52,43 +52,39 @@ public String canonicalName()

enum ApplicationProtocols implements ApplicationProtocol
{
RAFT_1( ApplicationProtocolIdentifier.RAFT, 1 ),
CATCHUP_1( ApplicationProtocolIdentifier.CATCHUP, 1 );
RAFT_1( ApplicationProtocolCategory.RAFT, 1 ),
CATCHUP_1( ApplicationProtocolCategory.CATCHUP, 1 );

private final int version;
private final ApplicationProtocolIdentifier identifier;
private final Integer version;
private final ApplicationProtocolCategory identifier;

ApplicationProtocols( ApplicationProtocolIdentifier identifier, int version )
ApplicationProtocols( ApplicationProtocolCategory identifier, int version )
{
this.identifier = identifier;
this.version = version;
}

@Override
public String identifier()
public String category()
{
return identifier.canonicalName();
}

@Override
public int version()
public Integer implementation()
{
return version;
}
}

interface ModifierProtocol extends Protocol
interface ModifierProtocol extends Protocol<String>
{
/**
* Should be human readable when in a comma separated list
*/
String friendlyName();
}

enum ModifierProtocolIdentifier implements Identifier<ModifierProtocol>
enum ModifierProtocolCategory implements Category<ModifierProtocol>
{
COMPRESSION,
// Need a second Identifier for testing purposes.
// Need a second Category for testing purposes.
GRATUITOUS_OBFUSCATION;

@Override
Expand All @@ -100,43 +96,36 @@ public String canonicalName()

enum ModifierProtocols implements ModifierProtocol
{
COMPRESSION_GZIP( ModifierProtocolIdentifier.COMPRESSION, 1, "Gzip" ),
COMPRESSION_SNAPPY( ModifierProtocolIdentifier.COMPRESSION, 2, "Snappy" ),
COMPRESSION_SNAPPY_VALIDATING( ModifierProtocolIdentifier.COMPRESSION, 3, "Snappy_validating" ),
COMPRESSION_LZ4( ModifierProtocolIdentifier.COMPRESSION, 4, "LZ4" ),
COMPRESSION_LZ4_HIGH_COMPRESSION( ModifierProtocolIdentifier.COMPRESSION, 5, "LZ4_high_compression" ),
COMPRESSION_LZ4_VALIDATING( ModifierProtocolIdentifier.COMPRESSION, 6, "LZ_validating" ),
COMPRESSION_LZ4_HIGH_COMPRESSION_VALIDATING( ModifierProtocolIdentifier.COMPRESSION, 7, "LZ4_high_compression_validating" );

private final int version;
private final ModifierProtocolIdentifier identifier;
COMPRESSION_GZIP( ModifierProtocolCategory.COMPRESSION, "Gzip" ),
COMPRESSION_SNAPPY( ModifierProtocolCategory.COMPRESSION, "Snappy" ),
COMPRESSION_SNAPPY_VALIDATING( ModifierProtocolCategory.COMPRESSION, "Snappy_validating" ),
COMPRESSION_LZ4( ModifierProtocolCategory.COMPRESSION, "LZ4" ),
COMPRESSION_LZ4_HIGH_COMPRESSION( ModifierProtocolCategory.COMPRESSION, "LZ4_high_compression" ),
COMPRESSION_LZ4_VALIDATING( ModifierProtocolCategory.COMPRESSION, "LZ_validating" ),
COMPRESSION_LZ4_HIGH_COMPRESSION_VALIDATING( ModifierProtocolCategory.COMPRESSION, "LZ4_high_compression_validating" );

// Should be human writable into a comma separated list
private final String friendlyName;
private final ModifierProtocolCategory identifier;

ModifierProtocols( ModifierProtocolIdentifier identifier, int version, String friendlyName )
ModifierProtocols( ModifierProtocolCategory identifier, String friendlyName )
{
this.version = version;
this.identifier = identifier;
this.friendlyName = friendlyName;
}

@Override
public int version()
public String implementation()
{
return version;
return friendlyName;
}

@Override
public String identifier()
public String category()
{
return identifier.canonicalName();
}

@Override
public String friendlyName()
{
return friendlyName;
}

public static Optional<ModifierProtocols> fromFriendlyName( String friendlyName )
{
return Stream.of( ModifierProtocols.values() )
Expand Down
Expand Up @@ -89,10 +89,10 @@ private void ensureNotDuplicate( List<ModifierProtocolInstaller<O>> modifierProt
boolean duplicateIdentifier = modifierProtocolInstallers
.stream()
.flatMap( modifier -> modifier.protocols().stream() )
.anyMatch( protocol -> protocol.identifier().equals( modifierProtocol.identifier() ) );
.anyMatch( protocol -> protocol.category().equals( modifierProtocol.category() ) );
if ( duplicateIdentifier )
{
throw new IllegalArgumentException( "Attempted to install multiple versions of " + modifierProtocol.identifier() );
throw new IllegalArgumentException( "Attempted to install multiple versions of " + modifierProtocol.category() );
}
}

Expand Down
Expand Up @@ -21,17 +21,17 @@

import org.neo4j.causalclustering.protocol.Protocol;

public class ApplicationProtocolRepository extends ProtocolRepository<Protocol.ApplicationProtocol>
public class ApplicationProtocolRepository extends ProtocolRepository<Integer,Protocol.ApplicationProtocol>
{
private final SupportedProtocols<Protocol.ApplicationProtocol> supportedProtocol;
private final ApplicationSupportedProtocols supportedProtocol;

public ApplicationProtocolRepository( Protocol.ApplicationProtocol[] protocols, SupportedProtocols<Protocol.ApplicationProtocol> supportedProtocol )
public ApplicationProtocolRepository( Protocol.ApplicationProtocol[] protocols, ApplicationSupportedProtocols supportedProtocol )
{
super( protocols, ignored -> versionNumberComparator() );
super( protocols, ignored -> versionNumberComparator(), ApplicationProtocolSelection::new );
this.supportedProtocol = supportedProtocol;
}

public SupportedProtocols<Protocol.ApplicationProtocol> supportedProtocol()
public ApplicationSupportedProtocols supportedProtocol()
{
return supportedProtocol;
}
Expand Down
Expand Up @@ -21,7 +21,7 @@

import java.util.Set;

public class ApplicationProtocolRequest extends BaseProtocolRequest implements ServerMessage
public class ApplicationProtocolRequest extends BaseProtocolRequest<Integer>
{
ApplicationProtocolRequest( String protocolName, Set<Integer> versions )
{
Expand Down
Expand Up @@ -21,7 +21,7 @@

import static org.neo4j.causalclustering.protocol.handshake.StatusCode.FAILURE;

public class ApplicationProtocolResponse extends BaseProtocolResponse implements ClientMessage
public class ApplicationProtocolResponse extends BaseProtocolResponse<Integer>
{
public static final ApplicationProtocolResponse NO_PROTOCOL = new ApplicationProtocolResponse( FAILURE, "", 0 );

Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.protocol.handshake;

import java.util.Set;

import org.neo4j.causalclustering.protocol.Protocol;

public class ApplicationProtocolSelection extends ProtocolSelection<Integer,Protocol.ApplicationProtocol>
{
public ApplicationProtocolSelection( String identifier, Set<Integer> versions )
{
super( identifier, versions );
}
}

0 comments on commit 91a2a43

Please sign in to comment.