Skip to content

Commit

Permalink
server tags integration test and refresh
Browse files Browse the repository at this point in the history
An integration test which proves that tags are updated
appropriately on start and restart, using the cluster
overview procedure.
  • Loading branch information
martinfurmanski committed Feb 28, 2017
1 parent c867aff commit 279bfe8
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 22 deletions.
Expand Up @@ -21,7 +21,6 @@

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MultiMap;

import java.util.List;
import java.util.function.Function;
Expand All @@ -39,9 +38,9 @@
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_BOLT_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_MEMBER_ID_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getCoreTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.getReadReplicaTopology;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags;

class HazelcastClient extends LifecycleAdapter implements TopologyService
{
Expand Down Expand Up @@ -140,8 +139,7 @@ private Void addReadReplica( HazelcastInstance hazelcastInstance )
hazelcastInstance.getMap( READ_REPLICA_MEMBER_ID_MAP_NAME )
.put( uuid, myself.getUuid().toString(), readReplicaTimeToLiveTimeout, MILLISECONDS );

MultiMap<String,String> tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );
tags.forEach( tag -> tagsMap.put( uuid, tag ) );
refreshTags( hazelcastInstance, uuid, tags );

return null; // return value not used.
}
Expand Down
Expand Up @@ -26,10 +26,13 @@
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiMap;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.identity.ClusterId;
Expand All @@ -56,9 +59,8 @@ class HazelcastClusterTopology
// cluster-wide attributes
private static final String CLUSTER_UUID = "cluster_uuid";
static final String SERVER_TAGS_MULTIMAP_NAME = "tags";
static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas";
// hz client uuid string -> boltAddress string
static final String READ_REPLICA_TRANSACTION_SERVER_ADDRESS_MAP_NAME = "read-replica-transaction-servers";
static final String READ_REPLICA_BOLT_ADDRESS_MAP_NAME = "read_replicas"; // hz client uuid string -> boltAddress string
static final String READ_REPLICA_MEMBER_ID_MAP_NAME = "read-replica-member-ids";

static ReadReplicaTopology getReadReplicaTopology( HazelcastInstance hazelcastInstance, Log log )
Expand Down Expand Up @@ -196,6 +198,18 @@ static Map<MemberId,CoreServerInfo> toCoreMemberMap( Set<Member> members, Log lo
return coreMembers;
}

static void refreshTags( HazelcastInstance hazelcastInstance, String memberId, List<String> tags )
{
MultiMap<String,String> tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );
Collection<String> existing = tagsMap.get( memberId );

Set<String> superfluous = existing.stream().filter( t -> !tags.contains( t ) ).collect( Collectors.toSet() );
Set<String> missing = tags.stream().filter( t -> !existing.contains( t ) ).collect( Collectors.toSet() );

missing.forEach( tag -> tagsMap.put( memberId, tag ) );
superfluous.forEach( tag -> tagsMap.remove( memberId, tag ) );
}

static MemberAttributeConfig buildMemberAttributesForCore( MemberId myself, Config config )
{
MemberAttributeConfig memberAttributeConfig = new MemberAttributeConfig();
Expand Down
Expand Up @@ -55,6 +55,7 @@
import static com.hazelcast.spi.properties.GroupProperty.WAIT_SECONDS_BEFORE_JOIN;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.SERVER_TAGS_MULTIMAP_NAME;
import static org.neo4j.causalclustering.discovery.HazelcastClusterTopology.refreshTags;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopologyService
Expand Down Expand Up @@ -99,7 +100,7 @@ public boolean setClusterId( ClusterId clusterId )
}

@Override
public void start()
public void start() throws Throwable
{
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
Expand All @@ -108,18 +109,9 @@ public void start()
refreshReadReplicaTopology();
listenerService.notifyListeners( coreServers() );

try
{
scheduler.start();
}
catch ( Throwable throwable )
{
log.debug( "Failed to start job scheduler." );
return;
}

JobScheduler.Group group = new JobScheduler.Group( "Scheduler", POOLED );
jobHandle = this.scheduler.scheduleRecurring( group, () ->
scheduler.start();
JobScheduler.Group group = new JobScheduler.Group( "TopologyRefresh", POOLED );
jobHandle = scheduler.scheduleRecurring( group, () ->
{
refreshCoreTopology();
refreshReadReplicaTopology();
Expand Down Expand Up @@ -211,9 +203,7 @@ private HazelcastInstance createHazelcastInstance()
}

List<String> tags = config.get( CausalClusteringSettings.server_tags );

MultiMap<String,String> tagsMap = hazelcastInstance.getMultiMap( SERVER_TAGS_MULTIMAP_NAME );
tags.forEach( tag -> tagsMap.put( myself.getUuid().toString(), tag ) );
refreshTags( hazelcastInstance, myself.getUuid().toString(), tags );

return hazelcastInstance;
}
Expand Down
Expand Up @@ -44,6 +44,11 @@ public Collection<ReadReplicaInfo> allMemberInfo()
return readReplicaMembers.values();
}

public Map<MemberId,ReadReplicaInfo> replicaMembers()
{
return readReplicaMembers;
}

Optional<ReadReplicaInfo> find( MemberId memberId )
{
return Optional.ofNullable( readReplicaMembers.get( memberId ) );
Expand Down
Expand Up @@ -298,10 +298,12 @@ public void shouldRegisterReadReplicaInTopology() throws Throwable
when( clientService.getConnectedClients() ).thenReturn( asSet( client ) );

HazelcastMap hazelcastMap = new HazelcastMap();
HazelcastMultiMap hazelcastMultiMap = new HazelcastMultiMap();

HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class );
when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) );
when( hazelcastInstance.getMap( anyString() ) ).thenReturn( hazelcastMap );
when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( hazelcastMultiMap );
when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint );
when( hazelcastInstance.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() );
when( hazelcastInstance.getCluster() ).thenReturn( cluster );
Expand Down Expand Up @@ -346,6 +348,7 @@ public void shouldRemoveReadReplicasOnGracefulShutdown() throws Throwable
HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class );
when( hazelcastInstance.getAtomicReference( anyString() ) ).thenReturn( mock( IAtomicReference.class ) );
when( hazelcastInstance.getMap( anyString() ) ).thenReturn( hazelcastMap );
when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() );
when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint );
when( hazelcastInstance.getExecutorService( anyString() ) ).thenReturn( new StubExecutorService() );
when( hazelcastInstance.getCluster() ).thenReturn( cluster );
Expand Down Expand Up @@ -379,6 +382,7 @@ public void shouldSwallowNPEFromHazelcast() throws Throwable
HazelcastInstance hazelcastInstance = mock( HazelcastInstance.class );
when( hazelcastInstance.getLocalEndpoint() ).thenReturn( endpoint );
when( hazelcastInstance.getMap( anyString() ) ).thenReturn( new HazelcastMap() );
when( hazelcastInstance.getMultiMap( anyString() ) ).thenReturn( new HazelcastMultiMap() );
doThrow( new NullPointerException( "boom!!!" ) ).when( hazelcastInstance ).shutdown();

HazelcastConnector connector = mock( HazelcastConnector.class );
Expand Down
@@ -0,0 +1,205 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.scenarios;

import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntFunction;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.enterprise.api.security.EnterpriseSecurityContext;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.store.format.standard.Standard;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;

import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class ServerTagsIT
{
@Rule
public TestDirectory testDir = TestDirectory.testDirectory();
@Rule
public DefaultFileSystemRule fsRule = new DefaultFileSystemRule();

private Cluster cluster;

@After
public void after() throws Exception
{
if ( cluster != null )
{
cluster.shutdown();
}
}

@Test
public void shouldUpdateTagsOnStart() throws Exception
{
AtomicReference<String> suffix = new AtomicReference<>( "before" );
List<List<String>> expected;

Map<String,IntFunction<String>> instanceCoreParams = new HashMap<>();
instanceCoreParams.put( CausalClusteringSettings.server_tags.name(), ( id ) -> String.join( ", ", makeCoreTags( suffix.get(), id ) ) );

Map<String,IntFunction<String>> instanceReplicaParams = new HashMap<>();
instanceReplicaParams.put( CausalClusteringSettings.server_tags.name(), ( id ) -> String.join( ", ", makeReplicaTags( suffix.get(), id ) ) );

int nServers = 3;
cluster = new Cluster( testDir.directory( "cluster" ), nServers, nServers,
new HazelcastDiscoveryServiceFactory(), emptyMap(), instanceCoreParams,
emptyMap(), instanceReplicaParams, Standard.LATEST_NAME );

// when
cluster.start();

// then
expected = new ArrayList<>();
for ( CoreClusterMember core : cluster.coreMembers() )
{
expected.add( makeCoreTags( suffix.get(), core.serverId() ) );
expected.add( makeReplicaTags( suffix.get(), core.serverId() ) );
}

for ( CoreClusterMember core : cluster.coreMembers() )
{
assertEventually( core + " should have tags", () -> getServerTags( core.database() ),
new TagsMatcher( expected ), 30, SECONDS );
}

// when
expected.remove( makeCoreTags( suffix.get(), 1 ) );
expected.remove( makeReplicaTags( suffix.get(), 2 ) );
cluster.getCoreMemberById( 1 ).shutdown();
cluster.getReadReplicaById( 2 ).shutdown();

suffix.set( "after" ); // should update tags of restarted servers
cluster.addCoreMemberWithId( 1 ).start();
cluster.addReadReplicaWithId( 2 ).start();
expected.add( makeCoreTags( suffix.get(), 1 ) );
expected.add( makeReplicaTags( suffix.get(), 2 ) );

// then
for ( CoreClusterMember core : cluster.coreMembers() )
{
assertEventually( core + " should have tags", () -> getServerTags( core.database() ),
new TagsMatcher( expected ), 30, SECONDS );
}
}

class TagsMatcher extends TypeSafeMatcher<List<List<String>>>
{
private final List<List<String>> expected;

TagsMatcher( List<List<String>> expected )
{
this.expected = expected;
}

@Override
protected boolean matchesSafely( List<List<String>> actual )
{
if ( actual.size() != expected.size() )
{
return false;
}

for ( List<String> actualTags : actual )
{
boolean matched = false;
for ( List<String> expectedTags : expected )
{
if ( actualTags.size() != expectedTags.size() )
{
continue;
}

if ( !actualTags.containsAll( expectedTags ) )
{
continue;
}

matched = true;
break;
}

if ( !matched )
{
return false;
}
}

return true;
}

@Override
public void describeTo( Description description )
{
description.appendText( expected.toString() );
}
}

private List<String> makeCoreTags( String suffix, int id )
{
return asList( format( "core-%d-%s", id, suffix ), "core" );
}

private List<String> makeReplicaTags( String suffix, int id )
{
return asList( format( "replica-%d-%s", id, suffix ), "replica" );
}

private List<List<String>> getServerTags( CoreGraphDatabase db )
{
List<List<String>> serverTags = new ArrayList<>();
try ( InternalTransaction tx = db.beginTransaction( KernelTransaction.Type.explicit, EnterpriseSecurityContext.AUTH_DISABLED ) )
{
try ( Result result = db.execute( tx, "CALL dbms.cluster.overview", emptyMap() ) )
{
while ( result.hasNext() )
{
@SuppressWarnings( "unchecked" )
List<String> tags = (List<String>) result.next().get( "tags" );
serverTags.add( tags );
}
}
}
return serverTags;
}
}

0 comments on commit 279bfe8

Please sign in to comment.