Skip to content

Commit

Permalink
Only log changes in read replica topology, not the whole thing
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Mar 15, 2017
1 parent 34c899a commit 893f2f4
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 13 deletions.
Expand Up @@ -69,11 +69,10 @@ public Optional<CoreServerInfo> find( MemberId memberId )
@Override @Override
public String toString() public String toString()
{ {
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers );
coreMembers );
} }


TopologyDifference difference( CoreTopology other ) CoreTopologyDifference difference( CoreTopology other )
{ {
Set<MemberId> members = coreMembers.keySet(); Set<MemberId> members = coreMembers.keySet();
Set<MemberId> otherMembers = other.coreMembers.keySet(); Set<MemberId> otherMembers = other.coreMembers.keySet();
Expand All @@ -84,7 +83,7 @@ TopologyDifference difference( CoreTopology other )
Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) ) Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( CoreTopology.this, memberId ) ).collect( toSet() ); .map( memberId -> asDifference( CoreTopology.this, memberId ) ).collect( toSet() );


return new TopologyDifference( added, removed ); return new CoreTopologyDifference( added, removed );
} }


private Difference asDifference( CoreTopology topology, MemberId memberId ) private Difference asDifference( CoreTopology topology, MemberId memberId )
Expand All @@ -97,12 +96,12 @@ public Optional<MemberId> anyCoreMemberId()
return coreMembers.keySet().stream().findAny(); return coreMembers.keySet().stream().findAny();
} }


class TopologyDifference class CoreTopologyDifference
{ {
private Set<Difference> added; private Set<Difference> added;
private Set<Difference> removed; private Set<Difference> removed;


TopologyDifference( Set<Difference> added, Set<Difference> removed ) CoreTopologyDifference( Set<Difference> added, Set<Difference> removed )
{ {
this.added = added; this.added = added;
this.removed = removed; this.removed = removed;
Expand Down
Expand Up @@ -253,7 +253,7 @@ private void refreshCoreTopology()
{ {
CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log ); CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log );


CoreTopology.TopologyDifference difference = coreTopology.difference( newCoreTopology ); CoreTopology.CoreTopologyDifference difference = coreTopology.difference( newCoreTopology );
if ( difference.hasChanges() ) if ( difference.hasChanges() )
{ {
log.info( "Core topology changed %s", difference ); log.info( "Core topology changed %s", difference );
Expand All @@ -265,8 +265,15 @@ private void refreshCoreTopology()


private void refreshReadReplicaTopology() private void refreshReadReplicaTopology()
{ {
readReplicaTopology = getReadReplicaTopology( hazelcastInstance, log ); ReadReplicaTopology newReadReplicaTopology = getReadReplicaTopology( hazelcastInstance, log );
log.info( "Current read replica topology is %s", readReplicaTopology );
ReadReplicaTopology.ReadReplicaTopologyDifference difference = readReplicaTopology.difference( newReadReplicaTopology );
if ( difference.hasChanges() )
{
log.info( "Read replica topology changed %s", difference );
}

this.readReplicaTopology = newReadReplicaTopology;
} }


private class OurMembershipListener implements MembershipListener private class OurMembershipListener implements MembershipListener
Expand Down
Expand Up @@ -22,10 +22,12 @@
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toSet;




public class ReadReplicaTopology public class ReadReplicaTopology
Expand Down Expand Up @@ -70,6 +72,75 @@ public Optional<MemberId> anyReadReplicaMemberId()
{ {
return readReplicaMembers.keySet().stream().findAny(); return readReplicaMembers.keySet().stream().findAny();
} }
}

ReadReplicaTopologyDifference difference( ReadReplicaTopology other )
{
Set<MemberId> members = readReplicaMembers.keySet();
Set<MemberId> otherMembers = other.readReplicaMembers.keySet();

Set<ReadReplicaTopology.Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() );

Set<ReadReplicaTopology.Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( ReadReplicaTopology.this, memberId ) ).collect( toSet() );

return new ReadReplicaTopologyDifference( added, removed );
}

private ReadReplicaTopology.Difference asDifference( ReadReplicaTopology topology, MemberId memberId )
{
return new ReadReplicaTopology.Difference( memberId, topology.find( memberId ).orElse( null ) );
}

class ReadReplicaTopologyDifference
{
private Set<ReadReplicaTopology.Difference> added;
private Set<ReadReplicaTopology.Difference> removed;

ReadReplicaTopologyDifference( Set<ReadReplicaTopology.Difference> added, Set<ReadReplicaTopology.Difference> removed )
{
this.added = added;
this.removed = removed;
}

Set<ReadReplicaTopology.Difference> added()
{
return added;
}


Set<ReadReplicaTopology.Difference> removed()
{
return removed;
}

boolean hasChanges()
{
return added.size() > 0 || removed.size() > 0;
}

@Override
public String toString()
{
return String.format( "{added=%s, removed=%s}", added, removed );
}
}

private class Difference
{
private MemberId memberId;
private ReadReplicaInfo readReplicaInfo;

Difference( MemberId memberId, ReadReplicaInfo readReplicaInfo )
{
this.memberId = memberId;
this.readReplicaInfo = readReplicaInfo;
}

@Override
public String toString()
{
return String.format( "{memberId=%s, readReplicaInfo=%s}", memberId, readReplicaInfo );
}
} }
} }
Expand Up @@ -48,7 +48,7 @@ public void identicalTopologiesShouldHaveNoDifference() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers );


// when // when
CoreTopology.TopologyDifference diff = topology.difference(topology); CoreTopology.CoreTopologyDifference diff = topology.difference(topology);


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -74,7 +74,7 @@ public void shouldDetectAddedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); assertThat( diff.added().size(), Matchers.equalTo( 1 ) );
Expand All @@ -98,7 +98,7 @@ public void shouldDetectRemovedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -121,7 +121,7 @@ public void shouldDetectAddedAndRemovedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); assertThat( diff.added().size(), Matchers.equalTo( 2 ) );
Expand Down
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.hamcrest.Matchers;
import org.junit.Test;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

public class ReadReplicaTopologyTest
{
@Test
public void identicalTopologiesShouldHaveNoDifference() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId,ReadReplicaInfo> readReplicaMembers = new HashMap<>();
readReplicaMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) );
readReplicaMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) );

ReadReplicaTopology topology = new ReadReplicaTopology( readReplicaMembers );

// when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(topology);

// then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 0 ) );
}

@Test
public void shouldDetectAddedMembers() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId,ReadReplicaInfo> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) );
initialMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) );

Map<MemberId,ReadReplicaInfo> newMembers = new HashMap<>();
newMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) );
newMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) );
newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) );

ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );

// when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 1 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 0 ) );
}

@Test
public void shouldDetectRemovedMembers() throws Exception
{
// given
UUID one = UUID.randomUUID();
UUID two = UUID.randomUUID();

Map<MemberId,ReadReplicaInfo> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( one ), mock(ReadReplicaInfo.class) );
initialMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) );

Map<MemberId,ReadReplicaInfo> newMembers = new HashMap<>();
newMembers.put( new MemberId( two ), mock(ReadReplicaInfo.class) );

ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );

// when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 1 ) );
}

@Test
public void shouldDetectAddedAndRemovedMembers() throws Exception
{
// given

Map<MemberId,ReadReplicaInfo> initialMembers = new HashMap<>();
initialMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) );
initialMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) );

Map<MemberId,ReadReplicaInfo> newMembers = new HashMap<>();
newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) );
newMembers.put( new MemberId( UUID.randomUUID() ), mock(ReadReplicaInfo.class) );

ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );

// when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));

// then
assertThat( diff.added().size(), Matchers.equalTo( 2 ) );
assertThat( diff.removed().size(), Matchers.equalTo( 2 ) );
}
}

0 comments on commit 893f2f4

Please sign in to comment.