Skip to content

Commit

Permalink
Adding a Data Center Local upstream strategy.
Browse files Browse the repository at this point in the history
A sensible out of the box plugin for allowing read replicas to catch up from named others within their locality.
Log the loaded strategies and their order of precedence.
Does not catchup from self.
  • Loading branch information
jimwebber committed Mar 13, 2017
1 parent 2691ca8 commit 20231d2
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 24 deletions.
Expand Up @@ -239,7 +239,7 @@ private boolean pullAndApplyBatchOfTransactions( MemberId upstream, StoreId loca
long lastQueuedTxId = applier.lastQueuedTxId();
pullRequestMonitor.txPullRequest( lastQueuedTxId );
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount );
log.debug( "Pull transactions from %s where tx id > %d [batch #%d]", upstream, lastQueuedTxId, batchCount );

TxStreamFinishedResponse response;
try
Expand Down Expand Up @@ -275,7 +275,7 @@ public void onTxStreamFinishedResponse( CompletableFuture<TxStreamFinishedRespon
case SUCCESS_END_OF_BATCH:
return true;
case SUCCESS_END_OF_STREAM:
log.debug( "Successfully pulled transactions from %d", lastQueuedTxId );
log.debug( "Successfully pulled transactions from tx id %d", lastQueuedTxId );
upToDateFuture.complete( true );
return false;
case E_TRANSACTION_PRUNED:
Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.readreplica;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.ReadReplicaInfo;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;

@Service.Implementation(UpstreamDatabaseSelectionStrategy.class)
public class ConnectWithinDataCenter extends UpstreamDatabaseSelectionStrategy
{
private Random random = new Random();;

public ConnectWithinDataCenter()
{
super( "connect-within-data-center" );
}

@Override
public Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException
{
Map<MemberId, ReadReplicaInfo> replicas = topologyService.readReplicas().replicaMembers();

List<String> tags = config.get( CausalClusteringSettings.server_tags );
if ( tags.isEmpty() )
{
return Optional.empty();
}

String myTag = tags.get( 0 );

List<Map.Entry<MemberId, ReadReplicaInfo>> choices = replicas.entrySet().stream()
.filter( entry -> entry.getValue().tags().contains( myTag ) && !entry.getKey().equals( myself ) )
.collect( Collectors.toList() );

if ( choices.isEmpty() )
{
return Optional.empty();
}
else
{
return Optional.of( choices.get( random.nextInt( choices.size() ) ).getKey() );
}
}
}
Expand Up @@ -248,10 +248,12 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

ConnectToRandomCoreServer defaultStrategy = new ConnectToRandomCoreServer();
defaultStrategy.setTopologyService( topologyService );
defaultStrategy.setConfig( config );
defaultStrategy.setMyself( myself );

UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector =
new UpstreamDatabaseStrategySelector( defaultStrategy,
new UpstreamDatabaseStrategiesLoader( topologyService, config ), myself );
new UpstreamDatabaseStrategiesLoader( topologyService, config, myself, logProvider ), myself, logProvider );

CatchupPollingProcess catchupProcess =
new CatchupPollingProcess( logProvider, localDatabase, servicesToStopOnStoreCopy, catchUpClient,
Expand Down
Expand Up @@ -24,10 +24,13 @@
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;

public abstract class UpstreamDatabaseSelectionStrategy extends Service
{
TopologyService topologyService;
protected TopologyService topologyService;
protected Config config;
protected MemberId myself;

public UpstreamDatabaseSelectionStrategy( String key, String... altKeys )
{
Expand All @@ -40,5 +43,37 @@ void setTopologyService( TopologyService topologyService )
this.topologyService = topologyService;
}

void setConfig( Config config )
{
this.config = config;
}

void setMyself(MemberId myself) { this.myself = myself;}

public abstract Optional<MemberId> upstreamDatabase() throws UpstreamDatabaseSelectionException;

@Override
public String toString()
{
return nicelyCommaSeparatedList( getKeys() );
}

private static String nicelyCommaSeparatedList( Iterable<String> keys )
{
StringBuilder sb = new StringBuilder();
for ( String key : keys )
{
sb.append( key );
sb.append( "," );
sb.append( " " );
}

int trimThese = sb.lastIndexOf( ", " );
if ( trimThese > 1 )
{
sb.replace( trimThese, sb.length(), "" );
}

return sb.toString();
}
}
Expand Up @@ -19,13 +19,17 @@
*/
package org.neo4j.causalclustering.readreplica;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/**
* Loads and initialises any service implementations of <class>UpstreamDatabaseSelectionStrategy</class>.
Expand All @@ -35,11 +39,15 @@ public class UpstreamDatabaseStrategiesLoader implements Iterable<UpstreamDataba
{
private final TopologyService topologyService;
private final Config config;
private final MemberId myself;
private final Log log;

UpstreamDatabaseStrategiesLoader( TopologyService topologyService, Config config )
UpstreamDatabaseStrategiesLoader( TopologyService topologyService, Config config, MemberId myself, LogProvider logProvider )
{
this.topologyService = topologyService;
this.config = config;
this.myself = myself;
this.log = logProvider.getLog( this.getClass() );
}

@Override
Expand All @@ -57,10 +65,40 @@ public Iterator<UpstreamDatabaseSelectionStrategy> iterator()
if ( candidate.getKeys().iterator().next().equals( key ) )
{
candidate.setTopologyService( topologyService );
candidate.setConfig( config );
candidate.setMyself( myself );
candidates.add( candidate );
}
}
}

log( candidates );

return candidates.iterator();
}

private void log( LinkedHashSet<UpstreamDatabaseSelectionStrategy> candidates )
{
log.debug( "Upstream database strategies loaded in order of precedence: " +
nicelyCommaSeparatedList( candidates ) );
}

private static String nicelyCommaSeparatedList( Collection<UpstreamDatabaseSelectionStrategy> items )
{
StringBuilder sb = new StringBuilder();
for ( UpstreamDatabaseSelectionStrategy strategy : items )
{
sb.append( strategy.toString() );
sb.append( "," );
sb.append( " " );
}

int trimThese = sb.lastIndexOf( ", " );
if ( trimThese > 1 )
{
sb.replace( trimThese, sb.length(), "" );
}

return sb.toString();
}
}
Expand Up @@ -24,23 +24,30 @@
import java.util.NoSuchElementException;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

import static org.neo4j.helpers.collection.Iterables.empty;

public class UpstreamDatabaseStrategySelector
{
private LinkedHashSet<UpstreamDatabaseSelectionStrategy> strategies = new LinkedHashSet<>();
private MemberId myself;
private Log log;

UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy )
{
this( defaultStrategy, empty(), null );
this( defaultStrategy, empty(), null, NullLogProvider.getInstance() );
}

UpstreamDatabaseStrategySelector( UpstreamDatabaseSelectionStrategy defaultStrategy,
Iterable<UpstreamDatabaseSelectionStrategy> otherStrategies, MemberId myself )
Iterable<UpstreamDatabaseSelectionStrategy> otherStrategies, MemberId myself,
LogProvider logProvider )
{
this.myself = myself;
this.log = logProvider.getLog( getClass() );

if ( otherStrategies != null )
{
for ( UpstreamDatabaseSelectionStrategy otherStrategy : otherStrategies )
Expand All @@ -56,6 +63,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
MemberId result = null;
for ( UpstreamDatabaseSelectionStrategy strategy : strategies )
{
log.debug( "Trying selection strategy [%s]", strategy.toString() );
try
{
if ( strategy.upstreamDatabase().isPresent() )
Expand All @@ -76,6 +84,7 @@ public MemberId bestUpstreamDatabase() throws UpstreamDatabaseSelectionException
"Could not find an upstream database with which to connect." );
}

log.debug( "Selected upstream database [%s]", result );
return result;
}
}
@@ -1 +1,2 @@
org.neo4j.causalclustering.readreplica.ConnectToRandomCoreServer
org.neo4j.causalclustering.readreplica.ConnectWithinDataCenter
Expand Up @@ -22,9 +22,12 @@
import org.junit.Test;

import java.util.Set;
import java.util.UUID;

import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
Expand All @@ -33,15 +36,19 @@

public class UpstreamDatabaseStrategiesLoaderTest
{

private MemberId myself = new MemberId( UUID.randomUUID() );

@Test
public void shouldReturnConfiguredClassesOnly() throws Exception
{
// given
Config config = Config.defaults();
config.augment( stringMap( "causal_clustering.upstream_selection_strategy", "dummy" ) );

UpstreamDatabaseStrategiesLoader
strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config );
UpstreamDatabaseStrategiesLoader strategies =
new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config,
myself, NullLogProvider.getInstance() );

// when
Set<UpstreamDatabaseSelectionStrategy> upstreamDatabaseSelectionStrategies = asSet( strategies.iterator() );
Expand All @@ -61,8 +68,9 @@ public void shouldReturnTheFirstStrategyThatWorksFromThoseConfigured() throws Ex
stringMap( "causal_clustering.upstream_selection_strategy", "yet-another-dummy,dummy,another-dummy" ) );

// when
UpstreamDatabaseStrategiesLoader
strategies = new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config );
UpstreamDatabaseStrategiesLoader strategies =
new UpstreamDatabaseStrategiesLoader( mock( TopologyService.class ), config,
myself, NullLogProvider.getInstance() );

// then
assertEquals( UpstreamDatabaseStrategySelectorTest.YetAnotherDummyUpstreamDatabaseSelectionStrategy.class,
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.Service;
import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void shouldReturnTheMemberIdFromFirstSucessfulStrategy() throws Exception
when( goodOne.upstreamDatabase() ).thenReturn( Optional.of( theMemberId ) );

UpstreamDatabaseStrategySelector selector =
new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null );
new UpstreamDatabaseStrategySelector( badOne, iterable( goodOne, anotherBadOne ), null, NullLogProvider.getInstance() );

// when
MemberId result = selector.bestUpstreamDatabase();
Expand Down Expand Up @@ -104,7 +105,7 @@ public void shouldUseSpecifiedStrategyInPreferenceToDefault() throws Exception
when( mockStrategy.upstreamDatabase() ).thenReturn( Optional.of( new MemberId( UUID.randomUUID() ) ) );

UpstreamDatabaseStrategySelector selector =
new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null );
new UpstreamDatabaseStrategySelector( shouldNotUse, iterable( mockStrategy ), null, NullLogProvider.getInstance() );

// when
selector.bestUpstreamDatabase();
Expand Down

0 comments on commit 20231d2

Please sign in to comment.