Skip to content

Commit

Permalink
Replacing ClusterTopology with CoreTopology and EdgeTopology
Browse files Browse the repository at this point in the history
We're doing this so that we don't make a call to HZ on the
Raft critical path.

It was previously looking up edge servers every time we sent
a message out which is pointless as we're never going to send
a Raft message to them anyway
  • Loading branch information
Mark Needham committed Sep 6, 2016
1 parent 572826b commit 338112d
Show file tree
Hide file tree
Showing 31 changed files with 274 additions and 160 deletions.
Expand Up @@ -110,7 +110,7 @@ private synchronized void release( CatchUpChannel channel )
private synchronized CatchUpChannel acquireChannel( MemberId memberId ) throws NoKnownAddressesException
{
AdvertisedSocketAddress catchUpAddress =
discoveryService.currentTopology().coreAddresses( memberId ).getCatchupServer();
discoveryService.coreServers().find( memberId ).getCatchupServer();
CatchUpChannel channel = idleChannels.remove( catchUpAddress );
if ( channel == null )
{
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.discovery.ClusterTopology;
import org.neo4j.coreedge.discovery.CoreTopology;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.logging.Log;

Expand All @@ -37,7 +37,7 @@ class BindingProcess
this.log = log;
}

ClusterId attempt( ClusterTopology topology ) throws InterruptedException, TimeoutException, BindingException
ClusterId attempt( CoreTopology topology ) throws InterruptedException, TimeoutException, BindingException
{
ClusterId commonClusterId = topology.clusterId();

Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.discovery.ClusterTopology;
import org.neo4j.coreedge.discovery.CoreTopology;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.function.ThrowingAction;
Expand Down Expand Up @@ -84,15 +84,15 @@ private ClusterId bindToCluster() throws IOException, InterruptedException, Time

long endTime = clock.millis() + timeoutMillis;

ClusterTopology topology = topologyService.currentTopology();
CoreTopology topology = topologyService.coreServers();
ClusterId commonClusterId;

while ( (commonClusterId = binder.attempt( topology )) == null )
{
if ( clock.millis() < endTime )
{
retryWaiter.apply();
topology = topologyService.currentTopology();
topology = topologyService.coreServers();
}
else
{
Expand Down
Expand Up @@ -20,52 +20,50 @@
package org.neo4j.coreedge.discovery;

import java.util.Collection;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;

public class ClusterTopology
public class CoreTopology
{
private ClusterId clusterId;
private final Map<MemberId, CoreAddresses> coreMembers;
private final Set<EdgeAddresses> edgeAddresses;
public static CoreTopology EMPTY = new CoreTopology( null, false, Collections.emptyMap() );

private final ClusterId clusterId;
private final boolean canBeBootstrapped;
private final Map<MemberId, CoreAddresses> coreMembers;

public ClusterTopology( ClusterId clusterId, boolean canBeBootstrapped,
Map<MemberId,CoreAddresses> coreMembers,
Set<EdgeAddresses> edgeAddresses )
public CoreTopology( ClusterId clusterId, boolean canBeBootstrapped, Map<MemberId, CoreAddresses> coreMembers )
{

this.clusterId = clusterId;
this.canBeBootstrapped = canBeBootstrapped;
this.edgeAddresses = edgeAddresses;
this.coreMembers = new HashMap<>( coreMembers );
this.coreMembers = coreMembers;
}

public Set<MemberId> coreMembers()
public Set<MemberId> members()
{
return coreMembers.keySet();
}

public Collection<CoreAddresses> coreMemberAddresses()
public ClusterId clusterId()
{
return coreMembers.values();
return clusterId;
}

public Collection<EdgeAddresses> edgeMemberAddresses()
public Collection<CoreAddresses> addresses()
{
return edgeAddresses;
return coreMembers.values();
}

public boolean canBeBootstrapped()
{
return canBeBootstrapped;
}

public CoreAddresses coreAddresses( MemberId memberId ) throws NoKnownAddressesException
public CoreAddresses find( MemberId memberId ) throws NoKnownAddressesException
{
CoreAddresses coreAddresses = coreMembers.get( memberId );
if ( coreAddresses == null )
Expand All @@ -78,12 +76,7 @@ public CoreAddresses coreAddresses( MemberId memberId ) throws NoKnownAddressesE
@Override
public String toString()
{
return String.format( "{coreMembers=%s, bootstrappable=%s, edgeMemberAddresses=%s}",
coreMembers.keySet(), canBeBootstrapped(), edgeAddresses );
return String.format( "{coreMembers=%s, bootstrappable=%s}", coreMembers.keySet(), canBeBootstrapped() );
}

public ClusterId clusterId()
{
return clusterId;
}
}
Expand Up @@ -31,11 +31,11 @@ void addCoreTopologyListener( CoreTopologyService.Listener listener )
listeners.add( listener );
}

void notifyListeners( ClusterTopology clusterTopology )
void notifyListeners( CoreTopology coreTopology )
{
for ( CoreTopologyService.Listener listener : listeners )
{
listener.onCoreTopologyChange( clusterTopology );
listener.onCoreTopologyChange( coreTopology );
}
}
}
Expand Up @@ -37,6 +37,6 @@ public interface CoreTopologyService extends TopologyService

interface Listener
{
void onCoreTopologyChange( ClusterTopology clusterTopology );
void onCoreTopologyChange( CoreTopology coreTopology );
}
}
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import java.util.Collections;
import java.util.Set;

import org.neo4j.coreedge.identity.ClusterId;

public class EdgeTopology
{
public static EdgeTopology EMPTY = new EdgeTopology( null, Collections.emptySet() );

private final ClusterId clusterId;
private final Set<EdgeAddresses> edgeMembers;

public EdgeTopology( ClusterId clusterId, Set<EdgeAddresses> edgeMembers )
{

this.clusterId = clusterId;
this.edgeMembers = edgeMembers;
}

public Set<EdgeAddresses> members()
{
return edgeMembers;
}
}
Expand Up @@ -31,8 +31,6 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.EDGE_SERVER_BOLT_ADDRESS_MAP_NAME;
Expand Down Expand Up @@ -61,18 +59,34 @@ class HazelcastClient extends LifecycleAdapter implements TopologyService
}

@Override
public ClusterTopology currentTopology()
public EdgeTopology edgeServers()
{
try
{
return retry( ( hazelcastInstance ) ->
HazelcastClusterTopology.getClusterTopology( hazelcastInstance, log ) );
HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log ) );
}
catch ( Exception e )
{
log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. "
+ "Connection will be reattempted on next polling attempt.", e );
return new ClusterTopology( null /* TODO */, false, emptyMap(), emptySet() );
return EdgeTopology.EMPTY;
}
}

@Override
public CoreTopology coreServers()
{
try
{
return retry( ( hazelcastInstance ) ->
HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log ) );
}
catch ( Exception e )
{
log.info( "Failed to read cluster topology from Hazelcast. Continuing with empty (disconnected) topology. "
+ "Connection will be reattempted on next polling attempt.", e );
return CoreTopology.EMPTY;
}
}

Expand Down
Expand Up @@ -63,10 +63,27 @@ class HazelcastClusterTopology
static final String RAFT_SERVER = "raft_server";
static final String BOLT_SERVER = "bolt_server";

static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance, Log log )
static EdgeTopology getEdgeTopology( HazelcastInstance hazelcastInstance, Log log )
{
Map<MemberId,CoreAddresses> coreMembers = emptyMap();
Set<EdgeAddresses> edgeMembers = emptySet();
ClusterId clusterId = null;

if ( hazelcastInstance != null )
{
edgeMembers = edgeMembers( hazelcastInstance, log );
clusterId = getClusterId( hazelcastInstance );
}
else
{
log.info( "Cannot currently bind to distributed discovery service." );
}

return new EdgeTopology( clusterId, edgeMembers );
}

static CoreTopology getCoreTopology( HazelcastInstance hazelcastInstance, Log log )
{
Map<MemberId,CoreAddresses> coreMembers = emptyMap();
boolean canBeBootstrapped = false;
ClusterId clusterId = null;

Expand All @@ -76,7 +93,6 @@ static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance,
canBeBootstrapped = canBeBootstrapped( hzMembers );

coreMembers = toCoreMemberMap( hzMembers, log );
edgeMembers = edgeMembers( hazelcastInstance, log );

clusterId = getClusterId( hazelcastInstance );
}
Expand All @@ -85,7 +101,7 @@ static ClusterTopology getClusterTopology( HazelcastInstance hazelcastInstance,
log.info( "Cannot currently bind to distributed discovery service." );
}

return new ClusterTopology( clusterId, canBeBootstrapped, coreMembers, edgeMembers );
return new CoreTopology( clusterId, canBeBootstrapped, coreMembers );
}

private static ClusterId getClusterId( HazelcastInstance hazelcastInstance )
Expand Down
Expand Up @@ -67,7 +67,7 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
public void addCoreTopologyListener( Listener listener )
{
listenerService.addCoreTopologyListener( listener );
listener.onCoreTopologyChange( currentTopology() );
listener.onCoreTopologyChange( coreServers() );
}

@Override
Expand All @@ -80,16 +80,16 @@ public boolean casClusterId( ClusterId clusterId )
public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
listenerService.notifyListeners(currentTopology());
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
listenerService.notifyListeners(currentTopology());
log.info( "Current core topology is %s", coreServers() );
listenerService.notifyListeners( coreServers());
}

@Override
Expand All @@ -103,7 +103,7 @@ public void start()
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this );
listenerService.notifyListeners(currentTopology());
listenerService.notifyListeners( coreServers());
}

@Override
Expand Down Expand Up @@ -165,8 +165,14 @@ private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize(
}

@Override
public ClusterTopology currentTopology()
public EdgeTopology edgeServers()
{
return HazelcastClusterTopology.getClusterTopology( hazelcastInstance, log );
return HazelcastClusterTopology.getEdgeTopology( hazelcastInstance, log );
}

@Override
public CoreTopology coreServers()
{
return HazelcastClusterTopology.getCoreTopology( hazelcastInstance, log );
}
}
Expand Up @@ -41,8 +41,8 @@ public RaftDiscoveryServiceConnector( CoreTopologyService discoveryService, Raft
@Override
public void start() throws BootstrapException
{
ClusterTopology clusterTopology = discoveryService.currentTopology();
Set<MemberId> initialMembers = clusterTopology.coreMembers();
CoreTopology clusterTopology = discoveryService.coreServers();
Set<MemberId> initialMembers = clusterTopology.members();

if ( clusterTopology.canBeBootstrapped() )
{
Expand All @@ -53,9 +53,9 @@ public void start() throws BootstrapException
}

@Override
public synchronized void onCoreTopologyChange( ClusterTopology clusterTopology )
public synchronized void onCoreTopologyChange( CoreTopology coreTopology )
{
Set<MemberId> targetMembers = clusterTopology.coreMembers();
Set<MemberId> targetMembers = coreTopology.members();
raftMachine.setTargetMembershipSet( targetMembers );
}
}
Expand Up @@ -23,5 +23,7 @@

public interface TopologyService extends Lifecycle
{
ClusterTopology currentTopology();
EdgeTopology edgeServers();

CoreTopology coreServers();
}

0 comments on commit 338112d

Please sign in to comment.