Skip to content

Commit

Permalink
Return both read and route endpoints in shuffled order.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Oct 21, 2016
1 parent 47d7646 commit 46167dd
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 55 deletions.
Expand Up @@ -117,10 +117,7 @@ private List<ReadWriteRouteEndPoint> routeEndpoints()

private List<ReadWriteRouteEndPoint> writeEndpoints( AdvertisedSocketAddress leader )
{
List<ReadWriteRouteEndPoint> writeEndPoints =
Stream.of( leader ).map( ReadWriteRouteEndPoint::write ).collect( Collectors.toList() );
Collections.shuffle( writeEndPoints );
return writeEndPoints;
return Stream.of( leader ).map( ReadWriteRouteEndPoint::write ).collect( Collectors.toList() );

}

Expand Down
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery.procedures;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.discovery.CoreAddresses;
import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.ReadReplicaTopology;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;

import static java.util.Collections.emptySet;
import static org.junit.Assert.assertFalse;
import static org.junit.runners.Parameterized.Parameters;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.causalclustering.discovery.procedures.GetServersProcedureTest.coreAddresses;
import static org.neo4j.causalclustering.identity.RaftTestMember.member;
import static org.neo4j.helpers.collection.Iterators.asList;
import static org.neo4j.logging.NullLogProvider.getInstance;

@RunWith( Parameterized.class )
public class GetServersProcedureRoutingTest
{
@Parameters
public static Collection<Object> data()
{
return Arrays.asList( 1, 2 );
} //the write endpoints are always index 0

@Parameter
public int serverClass;

private ClusterId clusterId = new ClusterId( UUID.randomUUID() );
private Config config = Config.defaults();

@Test
public void shouldReturnEndpointsInDifferentOrders() throws Exception
{
// given
final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class );

LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( member( 0 ) );

Map<MemberId,CoreAddresses> coreMembers = new HashMap<>();
coreMembers.put( member( 0 ), coreAddresses( 0 ) );
coreMembers.put( member( 1 ), coreAddresses( 1 ) );
coreMembers.put( member( 2 ), coreAddresses( 2 ) );

final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers );
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.readReplicas() ).thenReturn( new ReadReplicaTopology( clusterId, emptySet() ) );

final GetServersProcedure proc =
new GetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
Object[] endpoints = getEndpoints( proc );

//then
Object[] endpointsInDifferentOrder = getEndpoints( proc );
for ( int i = 0; i < 100; i++ )
{
if ( Arrays.deepEquals( endpointsInDifferentOrder, endpoints ) )
{
endpointsInDifferentOrder = getEndpoints( proc );
}
else
{
//Different order of servers, no need to retry.
break;
}
}
assertFalse( Arrays.deepEquals( endpoints, endpointsInDifferentOrder ) );
}

private Object[] getEndpoints( GetServersProcedure proc )
throws org.neo4j.kernel.api.exceptions.ProcedureException
{
List<Object[]> results = asList( proc.apply( null, new Object[0] ) );
Object[] rows = results.get( 0 );
List<Map<String,Object[]>> servers = (List<Map<String,Object[]>>) rows[1];
Map<String,Object[]> endpoints = servers.get( serverClass );
return endpoints.get( "addresses" );
}
}
Expand Up @@ -422,57 +422,6 @@ public void shouldReturnNoWriteEndpointsIfThereIsNoAddressForTheLeader() throws
containsInAnyOrder( coreAddresses( 0 ).getRaftServer().toString() ) );
}

@Test
public void shouldReturnReadEndpointsInDifferentOrders() throws Exception
{
// given
final CoreTopologyService coreTopologyService = mock( CoreTopologyService.class );

LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( member( 0 ) );

Map<MemberId,CoreAddresses> coreMembers = new HashMap<>();
coreMembers.put( member( 0 ), coreAddresses( 0 ) );
coreMembers.put( member( 1 ), coreAddresses( 1 ) );
coreMembers.put( member( 2 ), coreAddresses( 2 ) );

final CoreTopology clusterTopology = new CoreTopology( clusterId, false, coreMembers );
when( coreTopologyService.coreServers() ).thenReturn( clusterTopology );
when( coreTopologyService.edgeServers() ).thenReturn( new EdgeTopology( clusterId, emptySet() ) );

final GetServersProcedure proc =
new GetServersProcedure( coreTopologyService, leaderLocator, config, getInstance() );

// when
Object[] readServers = getReadServers( proc );

//then
Object[] readServersDifferentOrder = getReadServers( proc );
for ( int i = 0; i < 100; i++ )
{
if ( Arrays.deepEquals( readServersDifferentOrder, readServers ) )
{
readServersDifferentOrder = getReadServers( proc );
}
else
{
//Different order of servers, no need to retry.
break;
}
}
assertFalse( Arrays.deepEquals( readServers, readServersDifferentOrder ) );
}

private Object[] getReadServers( GetServersProcedure proc )
throws org.neo4j.kernel.api.exceptions.ProcedureException
{
List<Object[]> results = asList( proc.apply( null, new Object[0] ) );
Object[] rows = results.get( 0 );
List<Map<String,Object[]>> servers = (List<Map<String,Object[]>>) rows[1];
Map<String,Object[]> readServers = servers.get( 1 );
return readServers.get( "addresses" );
}

static Set<ReadReplicaAddresses> addresses( int... ids )
{
return Arrays.stream( ids ).mapToObj( GetServersProcedureTest::readReplicaAddresses ).collect( Collectors.toSet() );
Expand Down

0 comments on commit 46167dd

Please sign in to comment.