From 51fd6e3d07acc293ea727ccb5b1d221947b5ed4b Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 21 Jan 2016 14:54:55 +0100 Subject: [PATCH] Use the random server selection strategy when pulling. The random strategy was already in use for store copy, but it also makes sense for the recurring transaction pulling, as a first and simple solution for load balancing, handling topology changes, etc. Previously we would just select the first server in the stable topology. --- .../catchup/tx/edge/TxPollingClient.java | 13 ++++---- .../edge/ConnectToRandomCoreServer.java | 18 ++++++----- .../edge/EnterpriseEdgeEditionModule.java | 2 +- .../org/neo4j/metrics/CoreEdgeMetricsIT.java | 31 +++++++++---------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/edge/TxPollingClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/edge/TxPollingClient.java index 0069f610e9fe..c1fe51f4a9a9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/edge/TxPollingClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/edge/TxPollingClient.java @@ -23,6 +23,7 @@ import org.neo4j.coreedge.catchup.storecopy.edge.CoreClient; import org.neo4j.coreedge.discovery.EdgeDiscoveryService; +import org.neo4j.coreedge.server.edge.EdgeToCoreConnectionStrategy; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -35,15 +36,15 @@ public class TxPollingClient extends LifecycleAdapter { private final JobScheduler jobScheduler; private final Supplier transactionIdStoreSupplier; - private final EdgeDiscoveryService edgeDiscoveryService; + private final EdgeToCoreConnectionStrategy connectionStrategy; private final long pollingInterval; private final CoreClient coreClient; private final TxPullResponseListener txPullResponseListener; public TxPollingClient( JobScheduler jobScheduler, long pollingInterval, - Supplier transactionIdStoreSupplier, - CoreClient coreClient, TxPullResponseListener txPullResponseListener, - EdgeDiscoveryService edgeDiscoveryService ) + Supplier transactionIdStoreSupplier, + CoreClient coreClient, TxPullResponseListener txPullResponseListener, + EdgeToCoreConnectionStrategy connectionStrategy ) { this.coreClient = coreClient; this.txPullResponseListener = txPullResponseListener; @@ -52,7 +53,7 @@ public TxPollingClient( JobScheduler jobScheduler, long pollingInterval, this.pollingInterval = pollingInterval; this.transactionIdStoreSupplier = transactionIdStoreSupplier; - this.edgeDiscoveryService = edgeDiscoveryService; + this.connectionStrategy = connectionStrategy; } public void startPolling() @@ -61,7 +62,7 @@ public void startPolling() final TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get(); jobScheduler.scheduleRecurring( pullUpdates, () -> coreClient.pollForTransactions( - first( edgeDiscoveryService.currentTopology().getMembers() ).getCoreAddress(), + connectionStrategy.coreServer(), transactionIdStore.getLastCommittedTransactionId() ), pollingInterval, MILLISECONDS ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/ConnectToRandomCoreServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/ConnectToRandomCoreServer.java index afd7fd0354a7..fa3694b2471c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/ConnectToRandomCoreServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/ConnectToRandomCoreServer.java @@ -30,8 +30,9 @@ public class ConnectToRandomCoreServer implements EdgeToCoreConnectionStrategy { private final EdgeDiscoveryService discoveryService; + private final Random random = new Random(); - public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService) + public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService ) { this.discoveryService = discoveryService; } @@ -40,17 +41,18 @@ public ConnectToRandomCoreServer( EdgeDiscoveryService discoveryService) @Override public AdvertisedSocketAddress coreServer() { - final Random random = new Random(); - final ClusterTopology clusterTopology = discoveryService.currentTopology(); - int randomSize = random.nextInt( clusterTopology.getMembers().size() ); + int skippedServers = random.nextInt( clusterTopology.getMembers().size() ); final Iterator iterator = clusterTopology.getMembers().iterator(); - AdvertisedSocketAddress result = null; - for ( int i = 0; i <= randomSize; i++ ) + + CoreMember member; + do { - result = iterator.next().getCoreAddress(); + member = iterator.next(); } - return result; + while ( skippedServers --> 0 ); + + return member.getCoreAddress(); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java index 16c8fd355caa..f714cf5f5fca 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/edge/EnterpriseEdgeEditionModule.java @@ -159,7 +159,7 @@ public void assertSchemaWritesAllowed() throws InvalidTransactionTypeKernelExcep TxPollingClient txPollingClient = life.add( new TxPollingClient( platformModule.jobScheduler, config.get( HaSettings.pull_interval ), platformModule.dependencies.provideDependency( TransactionIdStore.class ), edgeToCoreClient, - applyPulledTransactions, discoveryService ) ); + applyPulledTransactions, new ConnectToRandomCoreServer( discoveryService ) ) ); StoreFetcher storeFetcher = new StoreFetcher( platformModule.logging.getInternalLogProvider(), new DefaultFileSystemAbstraction(), platformModule.pageCache, diff --git a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java index 8431a2f081db..fcebb85287a0 100644 --- a/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java +++ b/enterprise/metrics/src/test/java/org/neo4j/metrics/CoreEdgeMetricsIT.java @@ -19,15 +19,16 @@ */ package org.neo4j.metrics; +import org.junit.Rule; +import org.junit.Test; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.TimeUnit; -import org.junit.Rule; -import org.junit.Test; - +import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.server.core.CoreGraphDatabase; import org.neo4j.coreedge.server.edge.EdgeGraphDatabase; import org.neo4j.function.ThrowingSupplier; @@ -37,37 +38,24 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.metrics.source.CoreMetrics; -import org.neo4j.metrics.source.EdgeMetrics; import org.neo4j.test.TargetDirectory; import org.neo4j.tooling.GlobalGraphOperations; import static java.util.concurrent.TimeUnit.SECONDS; - import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; - import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address; import static org.neo4j.graphdb.Label.label; import static org.neo4j.helpers.collection.Iterables.count; -import static org.neo4j.metrics.source.CoreMetrics.APPEND_INDEX; -import static org.neo4j.metrics.source.CoreMetrics.COMMIT_INDEX; -import static org.neo4j.metrics.source.CoreMetrics.IS_LEADER; -import static org.neo4j.metrics.source.CoreMetrics.LEADER_NOT_FOUND; -import static org.neo4j.metrics.source.CoreMetrics.TERM; -import static org.neo4j.metrics.source.CoreMetrics.TX_PULL_REQUESTS_RECEIVED; -import static org.neo4j.metrics.source.CoreMetrics.TX_RETRIES; import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATES; import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATE_HIGHEST_TX_ID_RECEIVED; import static org.neo4j.metrics.source.EdgeMetrics.PULL_UPDATE_HIGHEST_TX_ID_REQUESTED; import static org.neo4j.test.Assert.assertEventually; - -import org.neo4j.coreedge.discovery.Cluster; - public class CoreEdgeMetricsIT { @Rule @@ -120,7 +108,16 @@ public void shouldMonitorCoreEdge() throws Exception equalTo ( 0L ), 5, TimeUnit.SECONDS ); assertEventually("tx pull requests received eventually accurate", - () -> readLastValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ), + () -> + { + long total = 0; + for ( final CoreGraphDatabase db : cluster.coreServers() ) + { + File metricsDir = new File( db.getStoreDir(), "metrics" ); + total += readLastValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) ); + } + return total; + }, greaterThan ( 0L ), 5, TimeUnit.SECONDS ); assertEventually("tx retries eventually accurate",