Skip to content

Commit

Permalink
Consolidating the Core and Replica topology differences into one happ…
Browse files Browse the repository at this point in the history
…y shared difference
  • Loading branch information
Mark Needham committed Mar 15, 2017
1 parent 893f2f4 commit bec8770
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 130 deletions.
Expand Up @@ -72,78 +72,23 @@ public String toString()
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers ); return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(), coreMembers );
} }


CoreTopologyDifference difference( CoreTopology other ) TopologyDifference difference( CoreTopology other )
{ {
Set<MemberId> members = coreMembers.keySet(); Set<MemberId> members = coreMembers.keySet();
Set<MemberId> otherMembers = other.coreMembers.keySet(); Set<MemberId> otherMembers = other.coreMembers.keySet();


Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) ) Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() ); .map( memberId -> Difference.asDifference( other, memberId ) ).collect( toSet() );


Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) ) Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( CoreTopology.this, memberId ) ).collect( toSet() ); .map( memberId -> Difference.asDifference( CoreTopology.this, memberId ) ).collect( toSet() );


return new CoreTopologyDifference( added, removed ); return new TopologyDifference( added, removed );
}

private Difference asDifference( CoreTopology topology, MemberId memberId )
{
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
} }


public Optional<MemberId> anyCoreMemberId() public Optional<MemberId> anyCoreMemberId()
{ {
return coreMembers.keySet().stream().findAny(); return coreMembers.keySet().stream().findAny();
} }


class CoreTopologyDifference
{
private Set<Difference> added;
private Set<Difference> removed;

CoreTopologyDifference( Set<Difference> added, Set<Difference> removed )
{
this.added = added;
this.removed = removed;
}

Set<Difference> added()
{
return added;
}

Set<Difference> removed()
{
return removed;
}

boolean hasChanges()
{
return added.size() > 0 || removed.size() > 0;
}

@Override
public String toString()
{
return String.format( "{added=%s, removed=%s}", added, removed );
}
}

private class Difference
{
private MemberId memberId;
private CoreServerInfo coreServerInfo;

Difference( MemberId memberId, CoreServerInfo coreServerInfo )
{
this.memberId = memberId;
this.coreServerInfo = coreServerInfo;
}

@Override
public String toString()
{
return String.format( "{memberId=%s, coreServerInfo=%s}", memberId, coreServerInfo );
}
}
} }
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import org.neo4j.causalclustering.identity.MemberId;

class Difference
{
private MemberId memberId;
private CatchupServerAddress server;

private Difference( MemberId memberId, CatchupServerAddress server )
{
this.memberId = memberId;
this.server = server;
}

static Difference asDifference( CoreTopology topology, MemberId memberId )
{
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
}

static Difference asDifference( ReadReplicaTopology topology, MemberId memberId )
{
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
}

@Override
public String toString()
{
return String.format( "{memberId=%s, info=%s}", memberId, server );
}
}
Expand Up @@ -253,7 +253,7 @@ private void refreshCoreTopology()
{ {
CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log ); CoreTopology newCoreTopology = getCoreTopology( hazelcastInstance, config, log );


CoreTopology.CoreTopologyDifference difference = coreTopology.difference( newCoreTopology ); TopologyDifference difference = coreTopology.difference( newCoreTopology );
if ( difference.hasChanges() ) if ( difference.hasChanges() )
{ {
log.info( "Core topology changed %s", difference ); log.info( "Core topology changed %s", difference );
Expand All @@ -267,7 +267,7 @@ private void refreshReadReplicaTopology()
{ {
ReadReplicaTopology newReadReplicaTopology = getReadReplicaTopology( hazelcastInstance, log ); ReadReplicaTopology newReadReplicaTopology = getReadReplicaTopology( hazelcastInstance, log );


ReadReplicaTopology.ReadReplicaTopologyDifference difference = readReplicaTopology.difference( newReadReplicaTopology ); TopologyDifference difference = readReplicaTopology.difference( newReadReplicaTopology );
if ( difference.hasChanges() ) if ( difference.hasChanges() )
{ {
log.info( "Read replica topology changed %s", difference ); log.info( "Read replica topology changed %s", difference );
Expand Down
Expand Up @@ -29,6 +29,8 @@
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;


import static org.neo4j.causalclustering.discovery.Difference.asDifference;



public class ReadReplicaTopology public class ReadReplicaTopology
{ {
Expand Down Expand Up @@ -74,73 +76,17 @@ public Optional<MemberId> anyReadReplicaMemberId()
} }
} }


ReadReplicaTopologyDifference difference( ReadReplicaTopology other ) TopologyDifference difference( ReadReplicaTopology other )
{ {
Set<MemberId> members = readReplicaMembers.keySet(); Set<MemberId> members = readReplicaMembers.keySet();
Set<MemberId> otherMembers = other.readReplicaMembers.keySet(); Set<MemberId> otherMembers = other.readReplicaMembers.keySet();


Set<ReadReplicaTopology.Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) ) Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() ); .map( memberId -> asDifference( other, memberId ) ).collect( toSet() );


Set<ReadReplicaTopology.Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) ) Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( ReadReplicaTopology.this, memberId ) ).collect( toSet() ); .map( memberId -> asDifference( ReadReplicaTopology.this, memberId ) ).collect( toSet() );


return new ReadReplicaTopologyDifference( added, removed ); return new TopologyDifference( added, removed );
}

private ReadReplicaTopology.Difference asDifference( ReadReplicaTopology topology, MemberId memberId )
{
return new ReadReplicaTopology.Difference( memberId, topology.find( memberId ).orElse( null ) );
}

class ReadReplicaTopologyDifference
{
private Set<ReadReplicaTopology.Difference> added;
private Set<ReadReplicaTopology.Difference> removed;

ReadReplicaTopologyDifference( Set<ReadReplicaTopology.Difference> added, Set<ReadReplicaTopology.Difference> removed )
{
this.added = added;
this.removed = removed;
}

Set<ReadReplicaTopology.Difference> added()
{
return added;
}

Set<ReadReplicaTopology.Difference> removed()
{
return removed;
}

boolean hasChanges()
{
return added.size() > 0 || removed.size() > 0;
}

@Override
public String toString()
{
return String.format( "{added=%s, removed=%s}", added, removed );
}
}

private class Difference
{
private MemberId memberId;
private ReadReplicaInfo readReplicaInfo;

Difference( MemberId memberId, ReadReplicaInfo readReplicaInfo )
{
this.memberId = memberId;
this.readReplicaInfo = readReplicaInfo;
}

@Override
public String toString()
{
return String.format( "{memberId=%s, readReplicaInfo=%s}", memberId, readReplicaInfo );
}
} }
} }
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.util.Set;

class TopologyDifference
{
private Set<Difference> added;
private Set<Difference> removed;

TopologyDifference( Set<Difference> added, Set<Difference> removed )
{
this.added = added;
this.removed = removed;
}

Set<Difference> added()
{
return added;
}

Set<Difference> removed()
{
return removed;
}

boolean hasChanges()
{
return added.size() > 0 || removed.size() > 0;
}

@Override
public String toString()
{
return String.format( "{added=%s, removed=%s}", added, removed );
}
}
Expand Up @@ -48,7 +48,7 @@ public void identicalTopologiesShouldHaveNoDifference() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, coreMembers );


// when // when
CoreTopology.CoreTopologyDifference diff = topology.difference(topology); TopologyDifference diff = topology.difference(topology);


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -74,7 +74,7 @@ public void shouldDetectAddedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); assertThat( diff.added().size(), Matchers.equalTo( 1 ) );
Expand All @@ -98,7 +98,7 @@ public void shouldDetectRemovedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -121,7 +121,7 @@ public void shouldDetectAddedAndRemovedMembers() throws Exception
CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers ); CoreTopology topology = new CoreTopology( new ClusterId( UUID.randomUUID() ), true, initialMembers );


// when // when
CoreTopology.CoreTopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers )); TopologyDifference diff = topology.difference(new CoreTopology( new ClusterId( UUID.randomUUID() ), true, newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); assertThat( diff.added().size(), Matchers.equalTo( 2 ) );
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Test; import org.junit.Test;


import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -48,7 +47,7 @@ public void identicalTopologiesShouldHaveNoDifference() throws Exception
ReadReplicaTopology topology = new ReadReplicaTopology( readReplicaMembers ); ReadReplicaTopology topology = new ReadReplicaTopology( readReplicaMembers );


// when // when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(topology); TopologyDifference diff = topology.difference(topology);


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -74,7 +73,7 @@ public void shouldDetectAddedMembers() throws Exception
ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );


// when // when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 1 ) ); assertThat( diff.added().size(), Matchers.equalTo( 1 ) );
Expand All @@ -98,7 +97,7 @@ public void shouldDetectRemovedMembers() throws Exception
ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );


// when // when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 0 ) ); assertThat( diff.added().size(), Matchers.equalTo( 0 ) );
Expand All @@ -121,7 +120,7 @@ public void shouldDetectAddedAndRemovedMembers() throws Exception
ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers ); ReadReplicaTopology topology = new ReadReplicaTopology( initialMembers );


// when // when
ReadReplicaTopology.ReadReplicaTopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers )); TopologyDifference diff = topology.difference(new ReadReplicaTopology( newMembers ));


// then // then
assertThat( diff.added().size(), Matchers.equalTo( 2 ) ); assertThat( diff.added().size(), Matchers.equalTo( 2 ) );
Expand Down

0 comments on commit bec8770

Please sign in to comment.