Skip to content

Commit

Permalink
Cluster compression IT and minor other things
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Mar 15, 2018
1 parent 1099902 commit 7301615
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
Expand Up @@ -502,6 +502,8 @@ public Map<String, String> validate( Map<String, String> rawConfig, Consumer<Str


@Description( "Network compression algorithms that this instance will allow in negotiation as a comma-separated list." + @Description( "Network compression algorithms that this instance will allow in negotiation as a comma-separated list." +
" Listed in descending order of preference for incoming connections. An empty list implies no compression." + " Listed in descending order of preference for incoming connections. An empty list implies no compression." +
" For outgoing connections this merely specifies the allowed set of algorithms and the preference of the " +
" remote peer will be used for making the decision." +
" Allowable values: [" + GZIP + "," + SNAPPY + "," + SNAPPY_VALIDATING + "," + " Allowable values: [" + GZIP + "," + SNAPPY + "," + SNAPPY_VALIDATING + "," +
LZ4 + "," + LZ4_HIGH_COMPRESSION + "," + LZ_VALIDATING + "," + LZ4_HIGH_COMPRESSION_VALIDATING + "]" ) LZ4 + "," + LZ4_HIGH_COMPRESSION + "," + LZ_VALIDATING + "," + LZ4_HIGH_COMPRESSION_VALIDATING + "]" )
public static final Setting<List<String>> compression_implementations = public static final Setting<List<String>> compression_implementations =
Expand Down
Expand Up @@ -26,15 +26,23 @@


import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;


/**
* Keeps track of protocols which are supported by this instance. This is later used when
* matching for mutually supported versions during a protocol negotiation.
*
* @param <U> Comparable version type.
* @param <T> Protocol type.
*/
public abstract class SupportedProtocols<U extends Comparable<U>,T extends Protocol<U>> public abstract class SupportedProtocols<U extends Comparable<U>,T extends Protocol<U>>
{ {
private final Protocol.Category<T> category; private final Protocol.Category<T> category;
private final List<U> versions; private final List<U> versions;


/** /**
* @param versions Empty means support everything * @param category The protocol category.
* @param versions List of supported versions. An empty list means that every version is supported.
*/ */
public SupportedProtocols( Protocol.Category<T> category, List<U> versions ) SupportedProtocols( Protocol.Category<T> category, List<U> versions )
{ {
this.category = category; this.category = category;
this.versions = Collections.unmodifiableList( versions ); this.versions = Collections.unmodifiableList( versions );
Expand Down
Expand Up @@ -38,15 +38,15 @@ private DataCreator()
} }


public static CoreClusterMember createLabelledNodesWithProperty( Cluster cluster, int numberOfNodes, public static CoreClusterMember createLabelledNodesWithProperty( Cluster cluster, int numberOfNodes,
Label label, Supplier<Pair<String,Object>> supplier ) throws Exception Label label, Supplier<Pair<String,Object>> propertyPair ) throws Exception
{ {
CoreClusterMember last = null; CoreClusterMember last = null;
for ( int i = 0; i < numberOfNodes; i++ ) for ( int i = 0; i < numberOfNodes; i++ )
{ {
last = cluster.coreTx( ( db, tx ) -> last = cluster.coreTx( ( db, tx ) ->
{ {
Node node = db.createNode( label ); Node node = db.createNode( label );
node.setProperty( supplier.get().first(), supplier.get().other() ); node.setProperty( propertyPair.get().first(), propertyPair.get().other() );
tx.success(); tx.success();
} ); } );
} }
Expand Down
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.scenarios;

import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.protocol.Protocol.ModifierProtocols;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.test.causalclustering.ClusterRule;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.compression_implementations;
import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.causalclustering.helpers.DataCreator.countNodes;
import static org.neo4j.causalclustering.helpers.DataCreator.createLabelledNodesWithProperty;
import static org.neo4j.graphdb.Label.label;

@RunWith( Parameterized.class )
public class ClusterCompressionIT
{
@Parameterized.Parameter
public ModifierProtocols modifierProtocol;

@Parameterized.Parameters( name = "{0}" )
public static Collection<Object[]> params()
{
return Arrays.stream( ModifierProtocols.values() ).map( mp -> new Object[]{mp} ).collect( Collectors.toList() );
}

@Rule
public final ClusterRule clusterRule =
new ClusterRule()
.withNumberOfCoreMembers( 3 )
.withNumberOfReadReplicas( 3 )
.withTimeout( 1000, SECONDS );

@Test
public void shouldReplicateWithCompression() throws Exception
{
// given
clusterRule
.withSharedCoreParam( compression_implementations, modifierProtocol.implementation() )
.withSharedReadReplicaParam( compression_implementations, modifierProtocol.implementation() );

Cluster cluster = clusterRule.startCluster();

// when
int numberOfNodes = 10;
CoreClusterMember leader = createLabelledNodesWithProperty( cluster, numberOfNodes, label( "Foo" ),
() -> Pair.of( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ) );

// then
assertEquals( numberOfNodes, countNodes( leader ) );
dataMatchesEventually( leader, cluster.coreMembers() );
}
}

0 comments on commit 7301615

Please sign in to comment.