Skip to content

Commit

Permalink
Use the random server selection strategy when pulling.
Browse files Browse the repository at this point in the history
The random strategy was already in use for store copy, but it also makes
sense for the recurring transaction pulling, as a first and simple solution
for load balancing, handling topology changes, etc.

Previously we would just select the first server in the stable topology.
  • Loading branch information
martinfurmanski committed Jan 21, 2016
1 parent bf7ca6e commit 51fd6e3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.coreedge.catchup.storecopy.edge.CoreClient;
import org.neo4j.coreedge.discovery.EdgeDiscoveryService;
import org.neo4j.coreedge.server.edge.EdgeToCoreConnectionStrategy;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -35,15 +36,15 @@ public class TxPollingClient extends LifecycleAdapter
{
private final JobScheduler jobScheduler;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final EdgeDiscoveryService edgeDiscoveryService;
private final EdgeToCoreConnectionStrategy connectionStrategy;
private final long pollingInterval;
private final CoreClient coreClient;
private final TxPullResponseListener txPullResponseListener;

public TxPollingClient( JobScheduler jobScheduler, long pollingInterval,
Supplier<TransactionIdStore> transactionIdStoreSupplier,
CoreClient coreClient, TxPullResponseListener txPullResponseListener,
EdgeDiscoveryService edgeDiscoveryService )
Supplier<TransactionIdStore> transactionIdStoreSupplier,
CoreClient coreClient, TxPullResponseListener txPullResponseListener,
EdgeToCoreConnectionStrategy connectionStrategy )
{
this.coreClient = coreClient;
this.txPullResponseListener = txPullResponseListener;
Expand All @@ -52,7 +53,7 @@ public TxPollingClient( JobScheduler jobScheduler, long pollingInterval,
this.pollingInterval = pollingInterval;

this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.edgeDiscoveryService = edgeDiscoveryService;
this.connectionStrategy = connectionStrategy;
}

public void startPolling()
Expand All @@ -61,7 +62,7 @@ public void startPolling()
final TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get();
jobScheduler.scheduleRecurring( pullUpdates,
() -> coreClient.pollForTransactions(
first( edgeDiscoveryService.currentTopology().getMembers() ).getCoreAddress(),
connectionStrategy.coreServer(),
transactionIdStore.getLastCommittedTransactionId() ), pollingInterval, MILLISECONDS );
}

Expand Down
Expand Up @@ -30,8 +30,9 @@
public class ConnectToRandomCoreServer implements EdgeToCoreConnectionStrategy
{
private final EdgeDiscoveryService discoveryService;
private final Random random = new Random();

public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService)
public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService )
{
this.discoveryService = discoveryService;
}
Expand All @@ -40,17 +41,18 @@ public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService)
@Override
public AdvertisedSocketAddress coreServer()
{
final Random random = new Random();

final ClusterTopology clusterTopology = discoveryService.currentTopology();
int randomSize = random.nextInt( clusterTopology.getMembers().size() );
int skippedServers = random.nextInt( clusterTopology.getMembers().size() );

final Iterator<CoreMember> iterator = clusterTopology.getMembers().iterator();
AdvertisedSocketAddress result = null;
for ( int i = 0; i <= randomSize; i++ )

CoreMember member;
do
{
result = iterator.next().getCoreAddress();
member = iterator.next();
}
return result;
while ( skippedServers --> 0 );

return member.getCoreAddress();
}
}
Expand Up @@ -159,7 +159,7 @@ public void assertSchemaWritesAllowed() throws InvalidTransactionTypeKernelExcep
TxPollingClient txPollingClient = life.add(
new TxPollingClient( platformModule.jobScheduler, config.get( HaSettings.pull_interval ),
platformModule.dependencies.provideDependency( TransactionIdStore.class ), edgeToCoreClient,
applyPulledTransactions, discoveryService ) );
applyPulledTransactions, new ConnectToRandomCoreServer( discoveryService ) ) );

StoreFetcher storeFetcher = new StoreFetcher( platformModule.logging.getInternalLogProvider(),
new DefaultFileSystemAbstraction(), platformModule.pageCache,
Expand Down
Expand Up @@ -19,15 +19,16 @@
*/
package org.neo4j.metrics;

import org.junit.Rule;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.junit.Rule;
import org.junit.Test;

import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.coreedge.server.edge.EdgeGraphDatabase;
import org.neo4j.function.ThrowingSupplier;
Expand All @@ -37,37 +38,24 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.metrics.source.CoreMetrics;
import org.neo4j.metrics.source.EdgeMetrics;
import org.neo4j.test.TargetDirectory;
import org.neo4j.tooling.GlobalGraphOperations;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.metrics.source.CoreMetrics.APPEND_INDEX;
import static org.neo4j.metrics.source.CoreMetrics.COMMIT_INDEX;
import static org.neo4j.metrics.source.CoreMetrics.IS_LEADER;
import static org.neo4j.metrics.source.CoreMetrics.LEADER_NOT_FOUND;
import static org.neo4j.metrics.source.CoreMetrics.TERM;
import static org.neo4j.metrics.source.CoreMetrics.TX_PULL_REQUESTS_RECEIVED;
import static org.neo4j.metrics.source.CoreMetrics.TX_RETRIES;
import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATES;
import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATE_HIGHEST_TX_ID_RECEIVED;
import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATE_HIGHEST_TX_ID_REQUESTED;
import static org.neo4j.test.Assert.assertEventually;


import org.neo4j.coreedge.discovery.Cluster;

public class CoreEdgeMetricsIT
{
@Rule
Expand Down Expand Up @@ -120,7 +108,16 @@ public void shouldMonitorCoreEdge() throws Exception
equalTo ( 0L ), 5, TimeUnit.SECONDS );

assertEventually("tx pull requests received eventually accurate",
() -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ),
() ->
{
long total = 0;
for ( final CoreGraphDatabase db : cluster.coreServers() )
{
File metricsDir = new File( db.getStoreDir(), "metrics" );
total += readLastValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) );
}
return total;
},
greaterThan ( 0L ), 5, TimeUnit.SECONDS );

assertEventually("tx retries eventually accurate",
Expand Down

0 comments on commit 51fd6e3

Please sign in to comment.