Skip to content

Commit

Permalink
Throw if Raft version configured but not known. Log ignored versions.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkerr9000 committed Mar 12, 2018
1 parent b831b5c commit 71cd12f
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 33 deletions.
Expand Up @@ -239,7 +239,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();

SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config );
SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider );
ApplicationSupportedProtocols supportedApplicationProtocol = supportedProtocolCreator.createSupportedRaftProtocol();
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();

Expand Down
Expand Up @@ -19,31 +19,56 @@
*/
package org.neo4j.causalclustering.core;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.stream.Streams;

import static org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocolCategory.RAFT;
import static org.neo4j.causalclustering.protocol.Protocol.ModifierProtocolCategory.COMPRESSION;

public class SupportedProtocolCreator
{
private final Config config;
private final Log log;

public SupportedProtocolCreator( Config config )
public SupportedProtocolCreator( Config config, LogProvider logProvider )
{
this.config = config;
this.log = logProvider.getLog( getClass() );
}

public ApplicationSupportedProtocols createSupportedRaftProtocol()
{
List<Integer> raftVersions = config.get( CausalClusteringSettings.raft_versions );
return new ApplicationSupportedProtocols( Protocol.ApplicationProtocolCategory.RAFT, raftVersions );
List<Integer> configVersions = config.get( CausalClusteringSettings.raft_versions );
if ( configVersions.isEmpty() )
{
return new ApplicationSupportedProtocols( RAFT, Collections.emptyList() );
}
else
{
List<Integer> knownVersions =
protocolsForConfig( RAFT, configVersions, version -> Protocol.ApplicationProtocols.find( RAFT, version ) );
if ( knownVersions.isEmpty() )
{
throw new IllegalArgumentException( String.format( "None of configured Raft implementations %s are known", configVersions ) );
}
else
{
return new ApplicationSupportedProtocols( RAFT, knownVersions );
}
}
}

public List<ModifierSupportedProtocols> createSupportedModifierProtocols()
Expand All @@ -57,20 +82,30 @@ public List<ModifierSupportedProtocols> createSupportedModifierProtocols()

private ModifierSupportedProtocols compressionProtocolVersions()
{
return modifierProtocolVersions( CausalClusteringSettings.compression_versions, Protocol.ModifierProtocolCategory.COMPRESSION );
List<String> implementations = protocolsForConfig( COMPRESSION, config.get( CausalClusteringSettings.compression_versions ),
implementation -> Protocol.ModifierProtocols.find( COMPRESSION, implementation ) );

return new ModifierSupportedProtocols( COMPRESSION, implementations );
}

private ModifierSupportedProtocols modifierProtocolVersions(
Setting<List<String>> compressionVersions, Protocol.ModifierProtocolCategory identifier )
private <IMPL extends Comparable<IMPL>, T extends Protocol<IMPL>> List<IMPL> protocolsForConfig( Protocol.Category<T> category, List<IMPL> implementations,
Function<IMPL,Optional<T>> finder )
{
List<String> compressionAlgorithms = config.get( compressionVersions );
List<String> versions = compressionAlgorithms.stream()
.map( Protocol.ModifierProtocols::fromFriendlyName )
return implementations.stream()
.map( impl -> Pair.of( impl, finder.apply( impl ) ) )
.peek( protocolWithImplementation -> logUnknownProtocol( category, protocolWithImplementation ) )
.map( Pair::other )
.flatMap( Streams::ofOptional )
.filter( protocol -> Objects.equals( protocol.category(), identifier.canonicalName() ) )
.map( Protocol.ModifierProtocols::implementation )
.map( Protocol::implementation )
.collect( Collectors.toList() );
}

return new ModifierSupportedProtocols( identifier, versions );
private <IMPL extends Comparable<IMPL>, T extends Protocol<IMPL>> void logUnknownProtocol( Protocol.Category<T> category,
Pair<IMPL,Optional<T>> protocolWithImplementation )
{
if ( !protocolWithImplementation.other().isPresent() )
{
log.warn( "Configured %s protocol implementation %s unknown. Ignoring.", category, protocolWithImplementation.first() );
}
}
}
Expand Up @@ -21,6 +21,7 @@

import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

public interface Protocol<IMPL extends Comparable<IMPL>>
Expand All @@ -29,6 +30,15 @@ public interface Protocol<IMPL extends Comparable<IMPL>>

IMPL implementation();

static <IMPL extends Comparable<IMPL>, T extends Protocol<IMPL>> Optional<T> find(
T[] values, Category<T> category, IMPL implementation, Function<IMPL,IMPL> normalise )
{
return Stream.of( values )
.filter( protocol -> Objects.equals( protocol.category(), category.canonicalName() ) )
.filter( protocol -> Objects.equals( normalise.apply( protocol.implementation() ), normalise.apply( implementation ) ) )
.findFirst();
}

interface Category<T extends Protocol>
{
String canonicalName();
Expand Down Expand Up @@ -75,6 +85,11 @@ public Integer implementation()
{
return version;
}

public static Optional<ApplicationProtocol> find( ApplicationProtocolCategory category, Integer version )
{
return Protocol.find( ApplicationProtocols.values(), category, version, Function.identity() );
}
}

interface ModifierProtocol extends Protocol<String>
Expand Down Expand Up @@ -126,11 +141,9 @@ public String category()
return identifier.canonicalName();
}

public static Optional<ModifierProtocols> fromFriendlyName( String friendlyName )
public static Optional<ModifierProtocol> find( ModifierProtocolCategory category, String friendlyName )
{
return Stream.of( ModifierProtocols.values() )
.filter( protocol -> Objects.equals( protocol.friendlyName.toLowerCase(), friendlyName.toLowerCase() ) )
.findFirst();
return Protocol.find( ModifierProtocols.values(), category, friendlyName, String::toLowerCase );
}

public static class Implementations
Expand Down
Expand Up @@ -30,22 +30,27 @@
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.SupportedProtocols;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.NullLogProvider;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.neo4j.causalclustering.protocol.Protocol.ModifierProtocols.COMPRESSION_SNAPPY;

public class SupportedProtocolCreatorTest
{

private NullLogProvider log = NullLogProvider.getInstance();

@Test
public void shouldReturnRAftProtocol() throws Throwable
public void shouldReturnRaftProtocol() throws Throwable
{
// given
Config config = Config.defaults();

// when
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config ).createSupportedRaftProtocol();
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config, log ).createSupportedRaftProtocol();

// then
assertThat( supportedRaftProtocol.identifier(), equalTo( Protocol.ApplicationProtocolCategory.RAFT ) );
Expand All @@ -58,23 +63,49 @@ public void shouldReturnEmptyVersionSupportedRaftProtocolIfNoVersionsConfigured(
Config config = Config.defaults();

// when
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config ).createSupportedRaftProtocol();
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config, log ).createSupportedRaftProtocol();

// then
assertThat( supportedRaftProtocol.versions(), empty() );
}

@Test
public void shouldFilterUnknownRaftImplementations() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.raft_versions, "1, 2, 3" );

// when
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config, log ).createSupportedRaftProtocol();

// then
assertThat( supportedRaftProtocol.versions(), contains( 1 ) );

}

@Test
public void shouldReturnConfiguredRaftProtocolVersions() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.raft_versions, "2,3,1" );
Config config = Config.defaults( CausalClusteringSettings.raft_versions, "1" );

// when
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config ).createSupportedRaftProtocol();
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config, log ).createSupportedRaftProtocol();

// then
assertThat( supportedRaftProtocol.versions(), contains( 2,3,1 ) );
assertThat( supportedRaftProtocol.versions(), contains( 1 ) );
}

@Test( expected = IllegalArgumentException.class )
public void shouldThrowIfVersionsSpecifiedButAllUnknown() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.raft_versions, String.valueOf( Integer.MAX_VALUE ) );

// when
ApplicationSupportedProtocols supportedRaftProtocol = new SupportedProtocolCreator( config, log ).createSupportedRaftProtocol();

// then throw
}

@Test
Expand All @@ -85,39 +116,54 @@ public void shouldNotReturnModifiersIfNoVersionsSpecified() throws Throwable

// when
List<ModifierSupportedProtocols> supportedModifierProtocols =
new SupportedProtocolCreator( config ).createSupportedModifierProtocols();
new SupportedProtocolCreator( config, log ).createSupportedModifierProtocols();

// then
assertThat( supportedModifierProtocols, empty() );
}

@Test
public void shouldReturnCompressionIfVersionsSpecified() throws Throwable
public void shouldReturnACompressionModifierIfCompressionVersionsSpecified() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.compression_versions, "snappy" );
Config config = Config.defaults( CausalClusteringSettings.compression_versions, COMPRESSION_SNAPPY.implementation() );

// when
List<ModifierSupportedProtocols> supportedModifierProtocols =
new SupportedProtocolCreator( config ).createSupportedModifierProtocols();
new SupportedProtocolCreator( config, log ).createSupportedModifierProtocols();

// then
Stream<Protocol.Category<Protocol.ModifierProtocol>> identifiers = supportedModifierProtocols.stream().map( SupportedProtocols::identifier );
assertThat( identifiers, StreamMatchers.contains( Protocol.ModifierProtocolCategory.COMPRESSION ) );
}

@Test
public void shouldReturnCompressionVersionsSpecified() throws Throwable
public void shouldReturnCompressionWithVersionsSpecified() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.compression_versions, COMPRESSION_SNAPPY.implementation() );

// when
List<ModifierSupportedProtocols> supportedModifierProtocols =
new SupportedProtocolCreator( config, log ).createSupportedModifierProtocols();

// then
List<String> versions = supportedModifierProtocols.get( 0 ).versions();
assertThat( versions, contains( COMPRESSION_SNAPPY.implementation() ) );
}

@Test
public void shouldReturnCompressionWithVersionsSpecifiedCaseInsensitive() throws Throwable
{
// given
Config config = Config.defaults( CausalClusteringSettings.compression_versions, "snappy" );
Config config = Config.defaults( CausalClusteringSettings.compression_versions, COMPRESSION_SNAPPY.implementation().toLowerCase() );

// when
List<ModifierSupportedProtocols> supportedModifierProtocols =
new SupportedProtocolCreator( config ).createSupportedModifierProtocols();
new SupportedProtocolCreator( config, log ).createSupportedModifierProtocols();

// then
List<String> versions = supportedModifierProtocols.get( 0 ).versions();
assertThat( versions, contains( Protocol.ModifierProtocols.COMPRESSION_SNAPPY.implementation() ) );
assertThat( versions, contains( COMPRESSION_SNAPPY.implementation() ) );
}
}

0 comments on commit 71cd12f

Please sign in to comment.