Skip to content

Commit

Permalink
Fixing the Shared Discovery Service IT.
Browse files Browse the repository at this point in the history
MemberId is now the primary identifier for cores and RRs across whole codebase.
Retrofitted some unit level testing against RR-to-RR strategies.
IDE reformatted some code sensibly.
  • Loading branch information
jimwebber committed Feb 8, 2017
1 parent 9df16a4 commit 9c448c5
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 111 deletions.
Expand Up @@ -435,8 +435,6 @@ public void start()
}
catch ( Throwable e )
{
System.out.println("problems --> ");
e.printStackTrace();
currentStatus = changedStatus( instance, currentStatus, LifecycleStatus.STOPPED );
if( e instanceof LifecycleException )
{
Expand Down
Expand Up @@ -22,29 +22,25 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.collection.Pair;

import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;

public class CoreTopology
{
public static CoreTopology EMPTY = new CoreTopology( null, false, Collections.emptyMap() );
static CoreTopology EMPTY = new CoreTopology( null, false, Collections.emptyMap() );

private final ClusterId clusterId;
private final boolean canBeBootstrapped;
private final Map<MemberId, CoreAddresses> coreMembers;
private final Map<MemberId,CoreAddresses> coreMembers;

public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberId, CoreAddresses> coreMembers )
public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberId,CoreAddresses> coreMembers )
{
this.clusterId = clusterId;
this.canBeBootstrapped = canBeBootstrapped;
Expand Down Expand Up @@ -79,20 +75,20 @@ public Optional<CoreAddresses> find( MemberId memberId )
@Override
public String toString()
{
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}",
clusterId, canBeBootstrapped(), coreMembers );
return format( "{clusterId=%s, bootstrappable=%s, coreMembers=%s}", clusterId, canBeBootstrapped(),
coreMembers );
}

TopologyDifference difference( CoreTopology other )
{
Set<MemberId> members = coreMembers.keySet();
Set<MemberId> otherMembers = other.coreMembers.keySet();

Set<Difference> added = otherMembers.stream().filter( m -> !members.contains(m) )
Set<Difference> added = otherMembers.stream().filter( m -> !members.contains( m ) )
.map( memberId -> asDifference( other, memberId ) ).collect( toSet() );

Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains(m) )
.map( memberId -> asDifference(CoreTopology.this, memberId ) ).collect( toSet() );
Set<Difference> removed = members.stream().filter( m -> !otherMembers.contains( m ) )
.map( memberId -> asDifference( CoreTopology.this, memberId ) ).collect( toSet() );

return new TopologyDifference( added, removed );
}
Expand All @@ -102,6 +98,18 @@ private Difference asDifference( CoreTopology topology, MemberId memberId )
return new Difference( memberId, topology.find( memberId ).orElse( null ) );
}

public Optional<MemberId> anyCoreMemberId()
{
if ( coreMembers.keySet().size() == 0 )
{
return Optional.empty();
}
else
{
return coreMembers.keySet().stream().findAny();
}
}

class TopologyDifference
{
private Set<Difference> added;
Expand Down
Expand Up @@ -56,4 +56,17 @@ public String toString()
{
return String.format( "{readReplicas=%s}", readReplicaMembers );
}

public Optional<MemberId> anyReadReplicaMemberId()
{
if ( readReplicaMembers.keySet().size() == 0 )
{
return Optional.empty();
}
else
{
return readReplicaMembers.keySet().stream().findAny();
}

}
}
Expand Up @@ -34,7 +34,7 @@ public class ConnectToRandomCoreServer extends UpstreamDatabaseSelectionStrategy

public ConnectToRandomCoreServer()
{
super( "random" );
super( "connect-to-random-core-server" );
}

@Override
Expand Down
Expand Up @@ -186,9 +186,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout );
CatchUpClient catchUpClient = life.add(
new CatchUpClient( readReplicaTopologyService, logProvider, Clocks.systemClock(),
inactivityTimeoutMillis,

monitors ) );
inactivityTimeoutMillis, monitors ) );

final Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );

Expand Down
Expand Up @@ -19,45 +19,57 @@
*/
package org.neo4j.causalclustering.readreplica;

import java.util.Iterator;
import java.util.Optional;
import java.util.Random;

import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class TypicallyConnectToRandomReadReplica extends UpstreamDatabaseSelectionStrategy
{
private final Random random = new Random();
private final ModuloCounter counter = new ModuloCounter( 10 );

public TypicallyConnectToRandomReadReplica()
{
super( "random" );
super( "typically-connect-to-random-read-replica" );
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
final CoreTopology coreTopology = readReplicaTopologyService.coreServers();

if ( coreTopology.members().size() == 0 )
if ( counter.shouldReturnCoreMemberId() )
{
throw new UpstreamDatabaseSelectionException( "No core servers available" );
return readReplicaTopologyService.coreServers().anyCoreMemberId();
}
else
{
return readReplicaTopologyService.readReplicas().anyReadReplicaMemberId();
}
}

int skippedServers = random.nextInt( coreTopology.members().size() );

final Iterator<MemberId> iterator = coreTopology.members().iterator();
private static class ModuloCounter
{
private final int modulo;
private int counter = 0;

MemberId member;
do
ModuloCounter( int modulo )
{
member = iterator.next();
// e.g. every 10th means 0-9
this.modulo = modulo -1;
}
while ( skippedServers-- > 0 );

return Optional.ofNullable( member );
boolean shouldReturnCoreMemberId()
{
if ( counter == modulo )
{
counter = 0;
return true;
}
else
{
counter++;
return false;
}
}
}
}

0 comments on commit 9c448c5

Please sign in to comment.