Skip to content

Commit

Permalink
more logging to make it easier to see what's happening in membership …
Browse files Browse the repository at this point in the history
…land
  • Loading branch information
Mark Needham committed Jan 25, 2017
1 parent fb5b3dc commit 800e17c
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 10 deletions.
Expand Up @@ -113,9 +113,15 @@ public void start() throws IOException

public void setTargetMembershipSet( Set<MemberId> targetMembers )
{
boolean targetMembershipChanged = !targetMembers.equals( this.targetMembers );

this.targetMembers = new HashSet<>( targetMembers );

log.info( "Target membership: " + targetMembers );
if ( targetMembershipChanged )
{
log.info( "Target membership: " + targetMembers );
}

membershipChanger.onTargetChanged( targetMembers );

checkForStartCondition();
Expand Down Expand Up @@ -175,7 +181,20 @@ void removeAdditionalReplicationMember( MemberId member )

private boolean isSafeToRemoveMember()
{
return votingMembers() != null && votingMembers().size() > expectedClusterSize;
Set<MemberId> votingMembers = votingMembers();
boolean safeToRemoveMember = votingMembers != null && votingMembers.size() > expectedClusterSize;

if ( !safeToRemoveMember )
{
Set<MemberId> membersToRemove = superfluousMembers();

log.info( "Not safe to remove %s %s because it would reduce the number of voting members below the expected " +
"cluster size of %d. Voting members: %s",
membersToRemove.size() > 1 ? "members" : "member",
membersToRemove, expectedClusterSize, votingMembers );
}

return safeToRemoveMember;
}

private Set<MemberId> superfluousMembers()
Expand All @@ -196,7 +215,7 @@ private void checkForStartCondition()
{
membershipChanger.onMissingMember( first( missingMembers() ) );
}
else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 )
else if ( superfluousMembers().size() > 0 && isSafeToRemoveMember( ) )
{
membershipChanger.onSuperfluousMember( first( superfluousMembers() ) );
}
Expand All @@ -209,6 +228,7 @@ else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 )
*/
void doConsensus( Set<MemberId> newVotingMemberSet )
{
log.info( "Getting consensus on new voting member set %s", newVotingMemberSet );
sendToMyself.replicate( memberSetBuilder.build( newVotingMemberSet ) );
}

Expand Down Expand Up @@ -288,12 +308,14 @@ public void append( long baseIndex, RaftLogEntry... entries ) throws IOException

if ( state.append( baseIndex, new HashSet<>( raftGroup.getMembers() ) ) )
{
log.info( "Appending new member set %s", state );
storage.persistStoreData( state );
updateMemberSets();
}
else
{
log.warn( "Appending member set was ignored. Current state: %s, Appended set: %s, Log index: %d%n", state, raftGroup, baseIndex );
log.warn( "Appending member set was ignored. Current state: %s, Appended set: %s, Log index: %d%n",
state, raftGroup, baseIndex );
}
}
baseIndex++;
Expand Down
Expand Up @@ -22,12 +22,19 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.collection.Pair;

import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;

public class CoreTopology
{
Expand Down Expand Up @@ -72,7 +79,77 @@ public Optional<CoreAddresses> find( MemberId memberId )
@Override
public String toString()
{
return String.format( "{coreMembers=%s, bootstrappable=%s}", coreMembers, canBeBootstrapped() );
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}",
clusterId, canBeBootstrapped(), coreMembers );
}

TopologyDifference difference( CoreTopology other )
{
Set<MemberId> members = coreMembers.keySet();
Set<MemberId> otherMembers = other.coreMembers.keySet();

Set<Difference> added = otherMembers.stream().filter( m -> !members.contains(m) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() );

Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains(m) )
.map( memberId -> asDifference(CoreTopology.this, memberId ) ).collect( toSet() );

return new TopologyDifference( added, removed );
}

private Difference asDifference( CoreTopology topology, MemberId memberId )
{
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
}

class TopologyDifference
{
private Set<Difference> added;
private Set<Difference> removed;

TopologyDifference( Set<Difference> added, Set<Difference> removed )
{
this.added = added;
this.removed = removed;
}

Set<Difference> added()
{
return added;
}

Set<Difference> removed()
{
return removed;
}

boolean hasChanges()
{
return added.size() > 0 || removed.size() > 0;
}

@Override
public String toString()
{
return String.format( "{added=%s, removed=%s}", added, removed );
}
}

private class Difference
{
private MemberId memberId;
private CoreAddresses coreAddresses;

Difference( MemberId memberId, CoreAddresses coreAddresses )
{
this.memberId = memberId;
this.coreAddresses = coreAddresses;
}

@Override
public String toString()
{
return String.format( "{memberId=%s, coreAddresses=%s}", memberId, coreAddresses );
}
}
}
Expand Up @@ -232,8 +232,23 @@ public CoreTopology coreServers()
@Override
public void refreshCoreTopology()
{
latestCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log );
log.info( "Current core topology is %s", coreServers() );
CoreTopology newCoreTopology = HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log );

if ( coreServers() != null )
{
CoreTopology.TopologyDifference difference = coreServers().difference( newCoreTopology );
if ( difference.hasChanges() )
{
log.info( "Core topology changed %s", difference );
}
}
else
{
log.info( "Initial Core topology %s", newCoreTopology );
}

latestCoreTopology = newCoreTopology;

listenerService.notifyListeners( coreServers() );
}

Expand Down
Expand Up @@ -19,9 +19,12 @@
*/
package org.neo4j.causalclustering.discovery;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.helpers.collection.Pair;

public class ReadReplicaTopology
{
Expand All @@ -37,9 +40,31 @@ public Set<ReadReplicaAddresses> members()
return readReplicaMembers;
}

public Set<ReadReplicaAddresses> difference( ReadReplicaTopology other )
{
Pair<Set<ReadReplicaAddresses>, Set<ReadReplicaAddresses>> split = split( readReplicaMembers, other.members() );
Set<ReadReplicaAddresses> big = split.first();
Set<ReadReplicaAddresses> small = split.other();

return big.stream().filter( n -> !small.contains( n ) ).collect( Collectors.toSet() );
}

private Pair<Set<ReadReplicaAddresses>, Set<ReadReplicaAddresses>> split(
Set<ReadReplicaAddresses> one, Set<ReadReplicaAddresses> two )
{
if ( one.size() > two.size() )
{
return Pair.pair( one, two );
}
else
{
return Pair.pair( two, one );
}
}

@Override
public String toString()
{
return String.format( "ReadReplicaTopology{readReplicas=%s}", readReplicaMembers );
return String.format( "{readReplicas=%s}", readReplicaMembers );
}
}
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.hamcrest.Matchers;
import org.junit.Test;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

public class CoreTopologyTest
{
@Test
public void identicalTopologiesShouldHaveNoDifference() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId, CoreAddresses> coreMembers = new HashMap<>();
coreMembers.put( new MemberId( one ), mock(CoreAddresses.class) );
coreMembers.put( new MemberId( two ), mock(CoreAddresses.class) );

CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers );

// when
CoreTopology.TopologyDifference diff = topology.difference(topology);

// then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 0 ) );
}

@Test
public void shouldDetectAddedMembers() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId, CoreAddresses> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) );
initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) );

Map<MemberId, CoreAddresses> newMembers = new HashMap<>();
newMembers.put( new MemberId( one ), mock(CoreAddresses.class) );
newMembers.put( new MemberId( two ), mock(CoreAddresses.class) );
newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) );

CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );

// when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 1 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 0 ) );
}

@Test
public void shouldDetectRemovedMembers() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId, CoreAddresses> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( one ), mock(CoreAddresses.class) );
initialMembers.put( new MemberId( two ), mock(CoreAddresses.class) );

Map<MemberId, CoreAddresses> newMembers = new HashMap<>();
newMembers.put( new MemberId( two ), mock(CoreAddresses.class) );

CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );

// when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 1 ) );
}

@Test
public void shouldDetectAddedAndRemovedMembers() throws Exception
{
// given

Map<MemberId, CoreAddresses> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) );
initialMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) );

Map<MemberId, CoreAddresses> newMembers = new HashMap<>();
newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) );
newMembers.put( new MemberId( UUID.randomUUID() ), mock(CoreAddresses.class) );

CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );

// when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 2 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 2 ) );
}
}
Expand Up @@ -106,8 +106,7 @@ private ReadReplicaTopology readReplicaTopology()
lock.lock();
try
{
return new ReadReplicaTopology( unmodifiableSet( readReplicaAddresses )
);
return new ReadReplicaTopology( unmodifiableSet( readReplicaAddresses ) );
}
finally
{
Expand Down

0 comments on commit 800e17c

Please sign in to comment.