Skip to content

Commit

Permalink
Multi-DC switch for Core Instances
Browse files Browse the repository at this point in the history
Core instances now have their load balancing enabled/disabled by the
same config switch.
Previous behaviour is upheld if config is left out.
  • Loading branch information
jimwebber committed Mar 14, 2017
1 parent bf70f51 commit 6aaa633
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 35 deletions.
Expand Up @@ -45,8 +45,9 @@
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV2;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureForSingleDC;
import org.neo4j.causalclustering.load_balancing.procedure.LegacyGetServersProcedure;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureForMultiDC;
import org.neo4j.causalclustering.logging.BetterMessageLogger;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.logging.NullMessageLogger;
Expand Down Expand Up @@ -131,9 +132,18 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
{
procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true );
procedures.register(
new GetServersProcedureV1( topologyService, consensusModule.raftMachine(), config, logProvider ) );
procedures.register(
new GetServersProcedureV2( getLoadBalancingProcessor() ) );
new LegacyGetServersProcedure( topologyService, consensusModule.raftMachine(), config, logProvider ) );

if ( config.get( CausalClusteringSettings.multi_dc_license ) )
{
procedures.register( new GetServersProcedureForMultiDC( getLoadBalancingProcessor() ) );
}
else
{
procedures.register( new GetServersProcedureForSingleDC( topologyService, consensusModule.raftMachine(),
config, logProvider ) );
}

procedures.register(
new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine() ) );
Expand Down
Expand Up @@ -43,7 +43,7 @@
* key-value pairs to be supplied to and used by the concrete load
* balancing strategies.
*/
public class GetServersProcedureV2 implements CallableProcedure
public class GetServersProcedureForMultiDC implements CallableProcedure
{
private final String DESCRIPTION = "Returns cluster endpoints and their capabilities.";

Expand All @@ -57,7 +57,7 @@ public class GetServersProcedureV2 implements CallableProcedure

private final LoadBalancingProcessor loadBalancingProcessor;

public GetServersProcedureV2( LoadBalancingProcessor loadBalancingProcessor )
public GetServersProcedureForMultiDC( LoadBalancingProcessor loadBalancingProcessor )
{
this.loadBalancingProcessor = loadBalancingProcessor;
}
Expand Down
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.load_balancing.procedure;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.discovery.CoreServerInfo;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.Endpoint;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
import org.neo4j.causalclustering.load_balancing.LoadBalancingResult;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.api.proc.Neo4jTypes;
import org.neo4j.kernel.api.proc.ProcedureSignature;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.cluster_allow_reads_on_followers;
import static org.neo4j.causalclustering.load_balancing.Util.asList;
import static org.neo4j.causalclustering.load_balancing.Util.extractBoltAddress;
import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.CONTEXT;
import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.SERVERS;
import static org.neo4j.causalclustering.load_balancing.procedure.ParameterNames.TTL;
import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V1;
import static org.neo4j.causalclustering.load_balancing.procedure.ProcedureNames.GET_SERVERS_V2;
import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature;

/**
* Returns endpoints and their capabilities.
*
* GetServersV2 extends upon V1 by allowing a client context consisting of
* key-value pairs to be supplied to and used by the concrete load
* balancing strategies.
*/
public class GetServersProcedureForSingleDC implements CallableProcedure
{
private final String DESCRIPTION = "Returns cluster endpoints and their capabilities for single data center setup.";

private final ProcedureSignature procedureSignature =
ProcedureSignature.procedureSignature( GET_SERVERS_V2.fullyQualifiedProcedureName() )
.in( CONTEXT.parameterName(), Neo4jTypes.NTMap )
.out( TTL.parameterName(), Neo4jTypes.NTInteger )
.out( SERVERS.parameterName(), Neo4jTypes.NTMap )
.description( DESCRIPTION )
.build();

private final TopologyService topologyService;
private final LeaderLocator leaderLocator;
private final Config config;
private final Log log;

public GetServersProcedureForSingleDC( TopologyService topologyService, LeaderLocator leaderLocator,
Config config, LogProvider logProvider )
{
this.topologyService = topologyService;
this.leaderLocator = leaderLocator;
this.config = config;
this.log = logProvider.getLog( getClass() );
}

@Override
public ProcedureSignature signature()
{
return procedureSignature;
}

@Override
public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{
List<Endpoint> routeEndpoints = routeEndpoints();
List<Endpoint> writeEndpoints = writeEndpoints();
List<Endpoint> readEndpoints = readEndpoints();

return RawIterator.<Object[],ProcedureException>of( ResultFormatV1.build(
new LoadBalancingResult( routeEndpoints, writeEndpoints, readEndpoints,
config.get( CausalClusteringSettings.cluster_routing_ttl ) ) ) );
}

private Optional<AdvertisedSocketAddress> leaderBoltAddress()
{
MemberId leader;
try
{
leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
log.debug( "No leader server found. This can happen during a leader switch. No write end points available" );
return Optional.empty();
}

return topologyService.coreServers().find( leader ).map( extractBoltAddress() );
}

private List<Endpoint> routeEndpoints()
{
Stream<AdvertisedSocketAddress> routers = topologyService.coreServers()
.members().values().stream().map( extractBoltAddress() );
List<Endpoint> routeEndpoints = routers.map( Endpoint::route ).collect( toList() );
Collections.shuffle( routeEndpoints );
return routeEndpoints;
}

private List<Endpoint> writeEndpoints()
{
return asList( leaderBoltAddress().map( Endpoint::write ) );
}

private List<Endpoint> readEndpoints()
{
List<AdvertisedSocketAddress> readReplicas = topologyService.readReplicas().allMemberInfo().stream()
.map( extractBoltAddress() ).collect( toList() );
boolean addFollowers = readReplicas.isEmpty() || config.get( cluster_allow_reads_on_followers );
Stream<AdvertisedSocketAddress> readCore = addFollowers ? coreReadEndPoints() : Stream.empty();
List<Endpoint> readEndPoints =
concat( readReplicas.stream(), readCore ).map( Endpoint::read ).collect( toList() );
Collections.shuffle( readEndPoints );
return readEndPoints;
}

private Stream<AdvertisedSocketAddress> coreReadEndPoints()
{
Optional<AdvertisedSocketAddress> leader = leaderBoltAddress();
Collection<CoreServerInfo> coreServerInfo = topologyService.coreServers().members().values();
Stream<AdvertisedSocketAddress> boltAddresses = topologyService.coreServers()
.members().values().stream().map( extractBoltAddress() );

// if the leader is present and it is not alone filter it out from the read end points
if ( leader.isPresent() && coreServerInfo.size() > 1 )
{
AdvertisedSocketAddress advertisedSocketAddress = leader.get();
return boltAddresses.filter( address -> !advertisedSocketAddress.equals( address ) );
}

// if there is only the leader return it as read end point
// or if we cannot locate the leader return all cores as read end points
return boltAddresses;
}
}
Expand Up @@ -56,7 +56,8 @@
/**
* Returns endpoints and their capabilities.
*/
public class GetServersProcedureV1 implements CallableProcedure
@Deprecated
public class LegacyGetServersProcedure implements CallableProcedure
{
private final String DESCRIPTION = "Returns cluster endpoints and their capabilities.";

Expand All @@ -72,7 +73,7 @@ public class GetServersProcedureV1 implements CallableProcedure
private final Config config;
private final Log log;

public GetServersProcedureV1( TopologyService topologyService, LeaderLocator leaderLocator,
public LegacyGetServersProcedure( TopologyService topologyService, LeaderLocator leaderLocator,
Config config, LogProvider logProvider )
{
this.topologyService = topologyService;
Expand Down
Expand Up @@ -267,7 +267,6 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector =
new UpstreamDatabaseStrategySelector( defaultStrategy, loader, myself, logProvider );


CatchupPollingProcess catchupProcess =
new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient,
upstreamDatabaseStrategySelector, catchupTimeoutService,
Expand Down
Expand Up @@ -83,8 +83,8 @@ public void shouldReturnEndpointsInDifferentOrders() throws Exception
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) );

final GetServersProcedureV1 proc =
new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() );
final LegacyGetServersProcedure proc =
new LegacyGetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
Object[] endpoints = getEndpoints( proc );
Expand All @@ -106,7 +106,7 @@ public void shouldReturnEndpointsInDifferentOrders() throws Exception
assertFalse( Arrays.deepEquals( endpoints, endpointsInDifferentOrder ) );
}

private Object[] getEndpoints( GetServersProcedureV1 proc )
private Object[] getEndpoints( LegacyGetServersProcedure proc )
throws org.neo4j.kernel.api.exceptions.ProcedureException
{
List<Object[]> results = asList( proc.apply( null, new Object[0] ) );
Expand Down
Expand Up @@ -109,8 +109,8 @@ public void ttlShouldBeInSeconds() throws Exception
// set the TTL in minutes
config = config.augment( stringMap( cluster_routing_ttl.name(), "10m" ) );

final GetServersProcedureV1 proc =
new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() );
final LegacyGetServersProcedure proc =
new LegacyGetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
List<Object[]> results = asList( proc.apply( null, new Object[0] ) );
Expand All @@ -125,7 +125,7 @@ public void ttlShouldBeInSeconds() throws Exception
public void shouldHaveCorrectSignature() throws Exception
{
// given
final GetServersProcedureV1 proc = new GetServersProcedureV1( null, null, config, getInstance() );
final LegacyGetServersProcedure proc = new LegacyGetServersProcedure( null, null, config, getInstance() );

// when
ProcedureSignature signature = proc.signature();
Expand All @@ -151,8 +151,8 @@ public void shouldProvideReaderAndRouterForSingleCoreSetup() throws Exception
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) );

final GetServersProcedureV1 proc =
new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() );
final LegacyGetServersProcedure proc =
new LegacyGetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( proc );
Expand Down Expand Up @@ -183,8 +183,8 @@ public void shouldReturnCoreServersWithRouteAllCoresButLeaderAsReadAndSingleWrit
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) );

final GetServersProcedureV1 proc =
new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() );
final LegacyGetServersProcedure proc =
new LegacyGetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( proc );
Expand Down Expand Up @@ -217,8 +217,8 @@ public void shouldReturnSelfIfOnlyMemberOfTheCluster() throws Exception
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( emptyMap() ) );

final GetServersProcedureV1 proc =
new GetServersProcedureV1( coreTopologyService, leaderLocator, config, getInstance() );
final LegacyGetServersProcedure proc =
new LegacyGetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( proc );
Expand Down Expand Up @@ -248,8 +248,8 @@ public void shouldReturnTheCoreLeaderForWriteAndReadReplicasAndCoresForReads() t
LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( theLeader );

GetServersProcedureV1 procedure =
new GetServersProcedureV1( topologyService, leaderLocator, config, getInstance() );
LegacyGetServersProcedure procedure =
new LegacyGetServersProcedure( topologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( procedure );
Expand Down Expand Up @@ -283,8 +283,8 @@ public void shouldReturnCoreMemberAsReadServerIfNoReadReplicasAvailable() throws
LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( theLeader );

GetServersProcedureV1 procedure =
new GetServersProcedureV1( topologyService, leaderLocator, config, getInstance() );
LegacyGetServersProcedure procedure =
new LegacyGetServersProcedure( topologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( procedure );
Expand Down Expand Up @@ -313,8 +313,8 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoLeader() throws Exception
LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenThrow( new NoLeaderFoundException() );

GetServersProcedureV1 procedure =
new GetServersProcedureV1( topologyService, leaderLocator, config, getInstance() );
LegacyGetServersProcedure procedure =
new LegacyGetServersProcedure( topologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( procedure );
Expand Down Expand Up @@ -342,8 +342,8 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws
LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( member( 1 ) );

GetServersProcedureV1 procedure =
new GetServersProcedureV1( topologyService, leaderLocator, config, getInstance() );
LegacyGetServersProcedure procedure =
new LegacyGetServersProcedure( topologyService, leaderLocator, config, getInstance() );

// when
ClusterView clusterView = run( procedure );
Expand All @@ -358,7 +358,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws
}

@SuppressWarnings( "unchecked" )
private ClusterView run( GetServersProcedureV1 proc ) throws ProcedureException
private ClusterView run( LegacyGetServersProcedure proc ) throws ProcedureException
{
final Object[] rows = asList( proc.apply( null, new Object[0] ) ).get( 0 );
assertEquals( config.get( cluster_routing_ttl ) / 1000, /* ttl */(long) rows[0] );
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class GetServersProcedureV2Test
public void shouldHaveCorrectSignature() throws Exception
{
// given
GetServersProcedureV2 proc = new GetServersProcedureV2( null );
GetServersProcedureForMultiDC proc = new GetServersProcedureForMultiDC( null );

// when
ProcedureSignature signature = proc.signature();
Expand All @@ -63,7 +63,7 @@ public void shouldPassClientContextToPlugin() throws Exception
// given
LoadBalancingPlugin plugin = mock( LoadBalancingPlugin.class );
when( plugin.run( anyMap() ) ).thenReturn( mock( LoadBalancingPlugin.Result.class ) );
GetServersProcedureV2 getServers = new GetServersProcedureV2( plugin );
GetServersProcedureForMultiDC getServers = new GetServersProcedureForMultiDC( plugin );
Map<String,String> clientContext = stringMap( "key", "value", "key2", "value2" );

// when
Expand Down

0 comments on commit 6aaa633

Please sign in to comment.