Skip to content

Commit

Permalink
Store Copy catchup avoids self
Browse files Browse the repository at this point in the history
There were some cases where store catchup would pick itself as a target
to catch up from (even though it would retry if that were the case)

This change should prevent cores and RR from picking themselves as
targets to catch up from
  • Loading branch information
phughk committed Aug 16, 2018
1 parent 8c91ec6 commit 8cd5e81
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 57 deletions.
Expand Up @@ -26,9 +26,7 @@
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.helpers.AdvertisedSocketAddress;

/**
Expand Down Expand Up @@ -139,30 +137,4 @@ public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionExcept
return secondaryUpstreamStrategyAddressSupplier.get();
}
}

class UpstreamStrategyAddressSupplier implements ThrowingSupplier<AdvertisedSocketAddress,CatchupAddressResolutionException>
{
private final UpstreamDatabaseStrategySelector strategySelector;
private final TopologyService topologyService;

private UpstreamStrategyAddressSupplier( UpstreamDatabaseStrategySelector strategySelector, TopologyService topologyService )
{
this.strategySelector = strategySelector;
this.topologyService = topologyService;
}

@Override
public AdvertisedSocketAddress get() throws CatchupAddressResolutionException
{
try
{
MemberId upstreamMember = strategySelector.bestUpstreamDatabase();
return topologyService.findCatchupAddress( upstreamMember ).orElseThrow( () -> new CatchupAddressResolutionException( upstreamMember ) );
}
catch ( UpstreamDatabaseSelectionException e )
{
throw new CatchupAddressResolutionException( e );
}
}
}
}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.catchup;

import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.helpers.AdvertisedSocketAddress;

public class UpstreamStrategyAddressSupplier implements ThrowingSupplier<AdvertisedSocketAddress,CatchupAddressResolutionException>
{
private final UpstreamDatabaseStrategySelector strategySelector;
private final TopologyService topologyService;

UpstreamStrategyAddressSupplier( UpstreamDatabaseStrategySelector strategySelector, TopologyService topologyService )
{
this.strategySelector = strategySelector;
this.topologyService = topologyService;
}

@Override
public AdvertisedSocketAddress get() throws CatchupAddressResolutionException
{
try
{
MemberId upstreamMember = strategySelector.bestUpstreamDatabase();
return topologyService.findCatchupAddress( upstreamMember ).orElseThrow( () -> new CatchupAddressResolutionException( upstreamMember ) );
}
catch ( UpstreamDatabaseSelectionException e )
{
throw new CatchupAddressResolutionException( e );
}
}
}
Expand Up @@ -59,15 +59,6 @@ public String toString()
return String.format( "{readReplicas=%s}", readReplicaMembers );
}

public Optional<MemberId> randomReadReplicaMemberId()
{
if ( readReplicaMembers.isEmpty() )
{
return Optional.empty();
}
return readReplicaMembers.keySet().stream().skip( ThreadLocalRandom.current().nextInt( readReplicaMembers.size() ) ).findFirst();
}

@Override
public ReadReplicaTopology filterTopologyByDb( String dbName )
{
Expand Down
Expand Up @@ -23,6 +23,7 @@
package org.neo4j.causalclustering.upstream.strategies;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.neo4j.causalclustering.discovery.RoleInfo;
Expand Down Expand Up @@ -54,7 +55,7 @@ public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionExc
for ( Map.Entry<MemberId,RoleInfo> entry : memberRoles.entrySet() )
{
RoleInfo role = entry.getValue();
if ( role == RoleInfo.LEADER )
if ( role == RoleInfo.LEADER && !Objects.equals( myself, entry.getKey() ) )
{
return Optional.of( entry.getKey() );
}
Expand Down
Expand Up @@ -22,12 +22,20 @@
*/
package org.neo4j.causalclustering.upstream.strategies;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy;
import org.neo4j.helpers.Service;

import static org.neo4j.function.Predicates.not;

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
public class TypicallyConnectToRandomReadReplicaStrategy extends UpstreamDatabaseSelectionStrategy
{
Expand All @@ -50,27 +58,31 @@ public Optional<MemberId> upstreamDatabase()
{
if ( counter.shouldReturnCoreMemberId() )
{
return getCoreMemberId();
return randomCoreMember();
}
else
{
Optional<MemberId> memberId = getReadReplicaMemberId();
if ( !memberId.isPresent() )
{
memberId = getCoreMemberId();
}
return memberId;
}
}
// shuffled members
List<MemberId> readReplicaMembers = new ArrayList<>( topologyService.localReadReplicas().members().keySet() );
Collections.shuffle( readReplicaMembers );

private Optional<MemberId> getReadReplicaMemberId()
{
return topologyService.localReadReplicas().randomReadReplicaMemberId();
List<MemberId> coreMembers = new ArrayList<>( topologyService.localCoreServers().members().keySet() );
Collections.shuffle( coreMembers );

return Stream.concat( readReplicaMembers.stream(), coreMembers.stream() ).filter( not( myself::equals ) ).findFirst();
}
}

private Optional<MemberId> getCoreMemberId()
private Optional<MemberId> randomCoreMember()
{
return topologyService.localCoreServers().randomCoreMemberId();
List<MemberId> coreMembersNotSelf =
topologyService.localCoreServers().members().keySet().stream().filter( not( myself::equals ) ).collect( Collectors.toList() );
Collections.shuffle( coreMembersNotSelf );
if ( coreMembersNotSelf.size() == 0 )
{
return Optional.empty();
}
return Optional.of( coreMembersNotSelf.get( 0 ) );
}

private static class ModuloCounter
Expand Down
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.catchup;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;

import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionStrategy;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class UpstreamStrategyAddressSupplierTest
{
private MemberId defaultMember = new MemberId( UUID.randomUUID() );
private MemberId firstMember = new MemberId( UUID.randomUUID() );
private MemberId secondMember = new MemberId( UUID.randomUUID() );
private AdvertisedSocketAddress defaultAddress = new AdvertisedSocketAddress( "Default", 123 );
private AdvertisedSocketAddress firstAddress = new AdvertisedSocketAddress( "First", 456 );
private AdvertisedSocketAddress secondAddress = new AdvertisedSocketAddress( "Second", 789 );
private TopologyService topologyService = mock( TopologyService.class );

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
public void setup()
{
when( topologyService.findCatchupAddress( eq( defaultMember ) ) ).thenReturn( Optional.of( defaultAddress ) );
when( topologyService.findCatchupAddress( eq( firstMember ) ) ).thenReturn( Optional.of( firstAddress ) );
when( topologyService.findCatchupAddress( eq( secondMember ) ) ).thenReturn( Optional.of( secondAddress ) );
}

@Test
public void selectionPrioritiesAreKept() throws CatchupAddressResolutionException
{
// given various strategies with different priorities
UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector =
new UpstreamDatabaseStrategySelector( new CountedSelectionStrategy( defaultMember, 5 ),
Arrays.asList( new CountedSelectionStrategy( firstMember, 1 ), new CountedSelectionStrategy( secondMember, 1 ) ),
NullLogProvider.getInstance() );

// and
UpstreamStrategyAddressSupplier upstreamStrategyAddressSupplier =
new UpstreamStrategyAddressSupplier( upstreamDatabaseStrategySelector, topologyService );

// when
AdvertisedSocketAddress firstResult = upstreamStrategyAddressSupplier.get();
AdvertisedSocketAddress secondResult = upstreamStrategyAddressSupplier.get();
AdvertisedSocketAddress thirdResult = upstreamStrategyAddressSupplier.get();

// then
assertEquals( firstAddress, firstResult );
assertEquals( secondAddress, secondResult );
assertEquals( defaultAddress, thirdResult );
}

@Test
public void exceptionWhenStrategiesFail() throws CatchupAddressResolutionException
{
// given a guaranteed fail strategy
UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector =
new UpstreamDatabaseStrategySelector( new CountedSelectionStrategy( defaultMember, 0 ) );

// and
UpstreamStrategyAddressSupplier upstreamStrategyAddressSupplier =
new UpstreamStrategyAddressSupplier( upstreamDatabaseStrategySelector, topologyService );

// then
expectedException.expect( CatchupAddressResolutionException.class );

// when
upstreamStrategyAddressSupplier.get();
}

private class CountedSelectionStrategy extends UpstreamDatabaseSelectionStrategy
{
MemberId upstreamDatabase;
private int numberOfIterations;

CountedSelectionStrategy( MemberId upstreamDatabase, int numberOfIterations )
{
super( CountedSelectionStrategy.class.getName() );
this.upstreamDatabase = upstreamDatabase;
this.numberOfIterations = numberOfIterations;
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
MemberId consumed = upstreamDatabase;
numberOfIterations--;
if ( numberOfIterations < 0 )
{
upstreamDatabase = null;
}
return Optional.ofNullable( consumed );
}

@Override
public int hashCode()
{
return super.hashCode() + (upstreamDatabase.hashCode() * 17) + (31 * numberOfIterations);
}

@Override
public boolean equals( Object o )
{
if ( o == null || !(o instanceof CountedSelectionStrategy) )
{
return false;
}
CountedSelectionStrategy other = (CountedSelectionStrategy) o;
return this.upstreamDatabase.equals( other.upstreamDatabase ) && this.numberOfIterations == other.numberOfIterations;
}
}
}
Expand Up @@ -41,8 +41,7 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterables.iterable;

Expand Down Expand Up @@ -104,7 +103,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
when( topologyService.localCoreServers() ).thenReturn(
new CoreTopology( new ClusterId( UUID.randomUUID() ), false, mapOf( memberId, mock( CoreServerInfo.class ) ) ) );

ConnectToRandomCoreServerStrategy shouldNotUse = new ConnectToRandomCoreServerStrategy();
ConnectToRandomCoreServerStrategy shouldNotUse = mock( ConnectToRandomCoreServerStrategy.class );

UpstreamDatabaseSelectionStrategy mockStrategy = mock( UpstreamDatabaseSelectionStrategy.class );
when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) );
Expand All @@ -116,7 +115,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
selector.bestUpstreamDatabase();

// then
verify( mockStrategy, times( 2 ) ).upstreamDatabase();
verifyZeroInteractions( shouldNotUse );
}

@Service.Implementation( UpstreamDatabaseSelectionStrategy.class )
Expand Down

0 comments on commit 8cd5e81

Please sign in to comment.