Skip to content

Commit

Permalink
Revert "Store Copy catchup avoids self"
Browse files Browse the repository at this point in the history
This reverts commit 588b170.
  • Loading branch information
phughk committed Jul 20, 2018
1 parent ed7d3b9 commit e4a24f0
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 603 deletions.
Expand Up @@ -26,7 +26,9 @@
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.helpers.AdvertisedSocketAddress;

/**
Expand Down Expand Up @@ -137,4 +139,30 @@ public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionExcept
return secondaryUpstreamStrategyAddressSupplier.get();
}
}

class UpstreamStrategyAddressSupplier implements ThrowingSupplier<AdvertisedSocketAddress,CatchupAddressResolutionException>
{
private final UpstreamDatabaseStrategySelector strategySelector;
private final TopologyService topologyService;

private UpstreamStrategyAddressSupplier( UpstreamDatabaseStrategySelector strategySelector, TopologyService topologyService )
{
this.strategySelector = strategySelector;
this.topologyService = topologyService;
}

@Override
public AdvertisedSocketAddress get() throws CatchupAddressResolutionException
{
try
{
MemberId upstreamMember = strategySelector.bestUpstreamDatabase();
return topologyService.findCatchupAddress( upstreamMember ).orElseThrow( () -> new CatchupAddressResolutionException( upstreamMember ) );
}
catch ( UpstreamDatabaseSelectionException e )
{
throw new CatchupAddressResolutionException( e );
}
}
}
}

This file was deleted.

Expand Up @@ -59,6 +59,15 @@ public String toString()
return String.format( "{readReplicas=%s}", readReplicaMembers );
}

public Optional<MemberId> randomReadReplicaMemberId()
{
if ( readReplicaMembers.isEmpty() )
{
return Optional.empty();
}
return readReplicaMembers.keySet().stream().skip( ThreadLocalRandom.current().nextInt( readReplicaMembers.size() ) ).findFirst();
}

@Override
public ReadReplicaTopology filterTopologyByDb( String dbName )
{
Expand Down
Expand Up @@ -23,7 +23,6 @@
package org.neo4j.causalclustering.upstream.strategies;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.neo4j.causalclustering.discovery.RoleInfo;
Expand Down Expand Up @@ -55,7 +54,7 @@ public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionExc
for ( Map.Entry<MemberId,RoleInfo> entry : memberRoles.entrySet() )
{
RoleInfo role = entry.getValue();
if ( role == RoleInfo.LEADER && !Objects.equals( myself, entry.getKey() ) )
if ( role == RoleInfo.LEADER )
{
return Optional.of( entry.getKey() );
}
Expand Down
Expand Up @@ -22,20 +22,12 @@
*/
package org.neo4j.causalclustering.upstream.strategies;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy;
import org.neo4j.helpers.Service;

import static org.neo4j.function.Predicates.not;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class TypicallyConnectToRandomReadReplicaStrategy extends UpstreamDatabaseSelectionStrategy
{
Expand All @@ -58,31 +50,27 @@ public Optional<MemberId> upstreamDatabase()
{
if ( counter.shouldReturnCoreMemberId() )
{
return randomCoreMember();
return getCoreMemberId();
}
else
{
// shuffled members
List<MemberId> readReplicaMembers = new ArrayList<>( topologyService.localReadReplicas().members().keySet() );
Collections.shuffle( readReplicaMembers );

List<MemberId> coreMembers = new ArrayList<>( topologyService.localCoreServers().members().keySet() );
Collections.shuffle( coreMembers );

return Stream.concat( readReplicaMembers.stream(), coreMembers.stream() ).filter( not( myself::equals ) ).findFirst();
Optional<MemberId> memberId = getReadReplicaMemberId();
if ( !memberId.isPresent() )
{
memberId = getCoreMemberId();
}
return memberId;
}
}

private Optional<MemberId> randomCoreMember()
private Optional<MemberId> getReadReplicaMemberId()
{
List<MemberId> coreMembersNotSelf =
topologyService.localCoreServers().members().keySet().stream().filter( not( myself::equals ) ).collect( Collectors.toList() );
Collections.shuffle( coreMembersNotSelf );
if ( coreMembersNotSelf.size() == 0 )
{
return Optional.empty();
}
return Optional.of( coreMembersNotSelf.get( 0 ) );
return topologyService.localReadReplicas().randomReadReplicaMemberId();
}

private Optional<MemberId> getCoreMemberId()
{
return topologyService.localCoreServers().randomCoreMemberId();
}

private static class ModuloCounter
Expand Down

This file was deleted.

Expand Up @@ -41,7 +41,8 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterables.iterable;

Expand Down Expand Up @@ -103,7 +104,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
when( topologyService.localCoreServers() ).thenReturn(
new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) );

ConnectToRandomCoreServerStrategy shouldNotUse = mock( ConnectToRandomCoreServerStrategy.class );
ConnectToRandomCoreServerStrategy shouldNotUse = new ConnectToRandomCoreServerStrategy();

UpstreamDatabaseSelectionStrategy mockStrategy = mock( UpstreamDatabaseSelectionStrategy.class );
when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) );
Expand All @@ -115,7 +116,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
selector.bestUpstreamDatabase();

// then
verifyZeroInteractions( shouldNotUse );
verify( mockStrategy, times( 2 ) ).upstreamDatabase();
}

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
Expand Down

0 comments on commit e4a24f0

Please sign in to comment.