Skip to content

Commit

Permalink
add more to the GetServersV2 framework
Browse files Browse the repository at this point in the history
Extract common functionality from the V1 code, split
procedure functionality from load balancing logic and
add an injectable strategy.

There is a concrete AllServersStrategy hooked in, but it
is merely a plot device for driving out the framework.
  • Loading branch information
martinfurmanski committed Feb 8, 2017
1 parent 1a32929 commit 4e5bb64
Show file tree
Hide file tree
Showing 29 changed files with 969 additions and 363 deletions.
Expand Up @@ -42,8 +42,11 @@
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.GetServersProcedureV1;
import org.neo4j.causalclustering.load_balancing.GetServersProcedureV2;
import org.neo4j.causalclustering.load_balancing.LoadBalancingStrategy;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV1;
import org.neo4j.causalclustering.load_balancing.procedure.GetServersProcedureV2;
import org.neo4j.causalclustering.load_balancing.strategy.AllServersStrategy;
import org.neo4j.causalclustering.load_balancing.strategy.ServerShufflingStrategy;
import org.neo4j.causalclustering.logging.BetterMessageLogger;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.logging.NullMessageLogger;
Expand Down Expand Up @@ -110,11 +113,14 @@ public enum RaftLogImplementation
@Override
public void registerEditionSpecificProcedures( Procedures procedures ) throws KernelException
{
LoadBalancingStrategy loadBalancingStrategy = new ServerShufflingStrategy(
new AllServersStrategy( topologyService, consensusModule.raftMachine(), config ) );

procedures.registerProcedure( EnterpriseBuiltInDbmsProcedures.class, true );
procedures.register(
new GetServersProcedureV1( topologyService, consensusModule.raftMachine(), config, logProvider ) );
procedures.register(
new GetServersProcedureV2( topologyService, consensusModule.raftMachine(), config, logProvider ) );
new GetServersProcedureV2( loadBalancingStrategy ) );
procedures.register(
new ClusterOverviewProcedure( topologyService, consensusModule.raftMachine(), logProvider ) );
procedures.register( new CoreRoleProcedure( consensusModule.raftMachine() ) );
Expand Down
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

public interface ClientConnector
{
ClientConnectorAddresses connectors();
}
Expand Up @@ -66,7 +66,7 @@ static ClientConnectorAddresses extractFromConfig( Config config )
return new ClientConnectorAddresses( connectorUris );
}

public AdvertisedSocketAddress getBoltAddress()
public AdvertisedSocketAddress boltAddress()
{
return connectorUris.stream().filter( connectorUri -> connectorUri.scheme == bolt ).findFirst().orElseThrow(
() -> new IllegalArgumentException( "A Bolt connector must be configured to run a cluster" ) )
Expand Down
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.helpers.AdvertisedSocketAddress;

public class CoreAddresses
public class CoreAddresses implements ClientConnector
{
private final AdvertisedSocketAddress raftServer;
private final AdvertisedSocketAddress catchupServer;
Expand All @@ -45,7 +45,7 @@ public AdvertisedSocketAddress getCatchupServer()
return catchupServer;
}

public ClientConnectorAddresses getClientConnectorAddresses()
public ClientConnectorAddresses connectors()
{
return clientConnectorAddresses;
}
Expand Down
Expand Up @@ -19,16 +19,16 @@
*/
package org.neo4j.causalclustering.discovery;

public class ReadReplicaAddresses
public class ReadReplicaAddresses implements ClientConnector
{
private final ClientConnectorAddresses clientConnectorAddresses;

public ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses )
ReadReplicaAddresses( ClientConnectorAddresses clientConnectorAddresses )
{
this.clientConnectorAddresses = clientConnectorAddresses;
}

public ClientConnectorAddresses getClientConnectorAddresses()
public ClientConnectorAddresses connectors()
{
return clientConnectorAddresses;
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
for ( MemberId memberId : coreMembers )
{
Optional<ClientConnectorAddresses> clientConnectorAddresses =
coreTopology.find( memberId ).map( CoreAddresses::getClientConnectorAddresses );
coreTopology.find( memberId ).map( CoreAddresses::connectors );
if ( clientConnectorAddresses.isPresent() )
{
Role role = memberId.equals( leader ) ? Role.LEADER : Role.FOLLOWER;
Expand All @@ -101,7 +101,7 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
}
for ( ReadReplicaAddresses readReplicaAddresses : discoveryService.readReplicas().members() )
{
endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.getClientConnectorAddresses(), Role.READ_REPLICA ) );
endpoints.add( new ReadWriteEndPoint( readReplicaAddresses.connectors(), Role.READ_REPLICA ) );
}

Collections.sort( endpoints, ( o1, o2 ) -> o1.addresses().toString().compareTo( o2.addresses().toString() ) );
Expand Down
Expand Up @@ -19,13 +19,15 @@
*/
package org.neo4j.causalclustering.load_balancing;

import java.util.Objects;

import org.neo4j.helpers.AdvertisedSocketAddress;

/**
* This class binds a certain role with an address and
* thus defines a reachable endpoint with defined capabilities.
*/
class EndPoint
public class EndPoint
{
private final AdvertisedSocketAddress address;
private final Role role;
Expand All @@ -35,7 +37,7 @@ public String address()
return address.toString();
}

private EndPoint( AdvertisedSocketAddress address, Role role )
public EndPoint( AdvertisedSocketAddress address, Role role )
{
this.address = address;
this.role = role;
Expand All @@ -51,11 +53,29 @@ public static EndPoint read( AdvertisedSocketAddress address )
return new EndPoint( address, Role.READ );
}

static EndPoint route( AdvertisedSocketAddress address )
public static EndPoint route( AdvertisedSocketAddress address )
{
return new EndPoint( address, Role.ROUTE );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{ return true; }
if ( o == null || getClass() != o.getClass() )
{ return false; }
EndPoint endPoint = (EndPoint) o;
return Objects.equals( address, endPoint.address ) &&
role == endPoint.role;
}

@Override
public int hashCode()
{
return Objects.hash( address, role );
}

@Override
public String toString()
{
Expand Down

This file was deleted.

0 comments on commit 4e5bb64

Please sign in to comment.