Skip to content

Commit

Permalink
Return servers in a shuffled order in the getServers proc.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Oct 21, 2016
1 parent 862485b commit 47d7646
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
Expand Up @@ -20,9 +20,9 @@
package org.neo4j.causalclustering.discovery.procedures;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -45,9 +45,9 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.Collections.emptySet;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature;

Expand Down Expand Up @@ -87,9 +87,9 @@ public GetServersProcedure( CoreTopologyService discoveryService, LeaderLocator
@Override
public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] input ) throws ProcedureException
{
Set<ReadWriteRouteEndPoint> writeEndpoints = emptySet();
Set<ReadWriteRouteEndPoint> readEndpoints = readEndpoints();
Set<ReadWriteRouteEndPoint> routeEndpoints = routeEndpoints();
List<ReadWriteRouteEndPoint> writeEndpoints = emptyList();
List<ReadWriteRouteEndPoint> readEndpoints = readEndpoints();
List<ReadWriteRouteEndPoint> routeEndpoints = routeEndpoints();
try
{
AdvertisedSocketAddress leaderAddress =
Expand All @@ -105,21 +105,26 @@ public RawIterator<Object[],ProcedureException> apply( Context ctx, Object[] inp
return wrapUpEndpoints( routeEndpoints, writeEndpoints, readEndpoints );
}

private Set<ReadWriteRouteEndPoint> routeEndpoints()
private List<ReadWriteRouteEndPoint> routeEndpoints()
{
Stream<AdvertisedSocketAddress> routers =
discoveryService.coreServers().addresses().stream()
.map( server -> server.getClientConnectorAddresses().getBoltAddress() );

return routers.map( ReadWriteRouteEndPoint::route ).collect( toSet() );
List<ReadWriteRouteEndPoint> routeEndpoints = routers.map( ReadWriteRouteEndPoint::route ).collect( toList() );
Collections.shuffle( routeEndpoints );
return routeEndpoints;
}

private Set<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
private List<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
{
return Stream.of( leader ).map( ReadWriteRouteEndPoint::write ).collect( Collectors.toSet() );
List<ReadWriteRouteEndPoint> writeEndPoints =
Stream.of( leader ).map( ReadWriteRouteEndPoint::write ).collect( Collectors.toList() );
Collections.shuffle( writeEndPoints );
return writeEndPoints;

}

private Set<ReadWriteRouteEndPoint> readEndpoints()
private List<ReadWriteRouteEndPoint> readEndpoints()
{
Stream<AdvertisedSocketAddress> readReplica =
discoveryService.readReplicas().members().stream()
Expand All @@ -128,16 +133,18 @@ private Set<ReadWriteRouteEndPoint> readEndpoints()
Stream<AdvertisedSocketAddress> readCore =
discoveryService.coreServers().addresses().stream()
.map( server -> server.getClientConnectorAddresses().getBoltAddress() );

return concat( readReplica, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toSet() );
List<ReadWriteRouteEndPoint> readEndPoints =
concat( readReplica, readCore ).map( ReadWriteRouteEndPoint::read ).collect( toList() );
Collections.shuffle(readEndPoints);
return readEndPoints;
}

private RawIterator<Object[],ProcedureException> wrapUpEndpoints( Set<ReadWriteRouteEndPoint> routeEndpoints,
Set<ReadWriteRouteEndPoint> writeEndpoints, Set<ReadWriteRouteEndPoint> readEndpoints )
private RawIterator<Object[],ProcedureException> wrapUpEndpoints( List<ReadWriteRouteEndPoint> routeEndpoints,
List<ReadWriteRouteEndPoint> writeEndpoints, List<ReadWriteRouteEndPoint> readEndpoints )
{
Object[] routers = routeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
Object[] readers = readEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
Object[] writers = writeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).sorted().toArray();
Object[] routers = routeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).toArray();
Object[] readers = readEndpoints.stream().map( ReadWriteRouteEndPoint::address ).toArray();
Object[] writers = writeEndpoints.stream().map( ReadWriteRouteEndPoint::address ).toArray();

List<Map<String,Object>> servers = new ArrayList<>();

Expand Down
Expand Up @@ -49,7 +49,6 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -408,7 +407,7 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws
Object[] rows = results.get( 0 );

long ttl = (long) rows[0];
assertEquals( (long) config.get( cluster_routing_ttl) / 1000, ttl );
assertEquals( (long) config.get( cluster_routing_ttl ) / 1000, ttl );

List<Map<String,Object[]>> servers = (List<Map<String,Object[]>>) rows[1];

Expand All @@ -423,6 +422,57 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws
containsInAnyOrder( coreAddresses( 0 ).getRaftServer().toString() ) );
}

@Test
public void shouldReturnReadEndpointsInDifferentOrders() throws Exception
{
// given
final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class );

LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( member( 0 ) );

Map<MemberId,CoreAddresses> coreMembers = new HashMap<>();
coreMembers.put( member( 0 ), coreAddresses( 0 ) );
coreMembers.put( member( 1 ), coreAddresses( 1 ) );
coreMembers.put( member( 2 ), coreAddresses( 2 ) );

final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers );
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.edgeServers() ).thenReturn( new EdgeTopology( clusterId, emptySet() ) );

final GetServersProcedure proc =
new GetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
Object[] readServers = getReadServers( proc );

//then
Object[] readServersDifferentOrder = getReadServers( proc );
for ( int i = 0; i < 100; i++ )
{
if ( Arrays.deepEquals( readServersDifferentOrder, readServers ) )
{
readServersDifferentOrder = getReadServers( proc );
}
else
{
//Different order of servers, no need to retry.
break;
}
}
assertFalse( Arrays.deepEquals( readServers, readServersDifferentOrder ) );
}

private Object[] getReadServers( GetServersProcedure proc )
throws org.neo4j.kernel.api.exceptions.ProcedureException
{
List<Object[]> results = asList( proc.apply( null, new Object[0] ) );
Object[] rows = results.get( 0 );
List<Map<String,Object[]>> servers = (List<Map<String,Object[]>>) rows[1];
Map<String,Object[]> readServers = servers.get( 1 );
return readServers.get( "addresses" );
}

static Set<ReadReplicaAddresses> addresses( int... ids )
{
return Arrays.stream( ids ).mapToObj( GetServersProcedureTest::readReplicaAddresses ).collect( Collectors.toSet() );
Expand Down

0 comments on commit 47d7646

Please sign in to comment.