Skip to content

Commit

Permalink
MemberID address resolution performed earlier
Browse files Browse the repository at this point in the history
Address resolution of cluster members is now performed earlier.
TopologyService has been removed from many classes that will be used in the future in the backup client.
A retry mechanism has been introduced to handle he fact that it is now possible to request an address for a member
before the table of member addresses has been refreshed
  • Loading branch information
Przemek Hugh Kaznowski authored and phughk committed Aug 11, 2017
1 parent b4b798a commit 36970c3
Show file tree
Hide file tree
Showing 38 changed files with 671 additions and 170 deletions.
Expand Up @@ -31,8 +31,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -42,13 +40,13 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.catchup.TimeoutLoop.waitForCompletion;

public class CatchUpClient extends LifecycleAdapter
{
private final LogProvider logProvider;
private final TopologyService topologyService;
private final Log log;
private final Clock clock;
private final Monitors monitors;
Expand All @@ -58,30 +56,24 @@ public class CatchUpClient extends LifecycleAdapter

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( TopologyService topologyService, LogProvider logProvider, Clock clock,
public CatchUpClient( LogProvider logProvider, Clock clock,
long inactivityTimeoutMillis, Monitors monitors, SslPolicy sslPolicy )
{
this.logProvider = logProvider;
this.topologyService = topologyService;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors;
this.sslPolicy = sslPolicy;
}

public <T> T makeBlockingRequest( MemberId upstream, CatchUpRequest request,
CatchUpResponseCallback<T> responseHandler ) throws CatchUpClientException
public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request,
CatchUpResponseCallback<T> responseHandler )
throws CatchUpClientException
{
CompletableFuture<T> future = new CompletableFuture<>();
Optional<AdvertisedSocketAddress> catchUpAddress = topologyService.findCatchupAddress( upstream );

if ( !catchUpAddress.isPresent() )
{
throw new CatchUpClientException( "Cannot find the target member socket address" );
}

CatchUpChannel channel = pool.acquire( catchUpAddress.get() );
CatchUpChannel channel = pool.acquire( upstream );

future.whenComplete( ( result, e ) ->
{
Expand All @@ -98,8 +90,8 @@ public <T> T makeBlockingRequest( MemberId upstream, CatchUpRequest request,
channel.setResponseHandler( responseHandler, future );
channel.send( request );

String operation = String.format( "Timed out executing operation %s on %s (%s)",
request, upstream, catchUpAddress.get() );
String operation = format( "Timed out executing operation %s on %s",
request, upstream );

return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
Expand Down Expand Up @@ -132,13 +133,13 @@ private long getPullIndex( File storeDir ) throws IOException
}
}

public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException
public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException
{
long pullIndex = getPullIndex( storeDir );
return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false );
}

public void copy( MemberId from, StoreId expectedStoreId, File destDir )
public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException
{
try
Expand All @@ -164,7 +165,7 @@ public void copy( MemberId from, StoreId expectedStoreId, File destDir )
}
}

private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy ) throws IOException, StoreCopyFailedException
private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy ) throws IOException, StoreCopyFailedException
{
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider, fromTxId, asPartOfStoreCopy ) )
{
Expand All @@ -189,7 +190,7 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId,
}
}

public StoreId getStoreId( MemberId from ) throws StoreIdDownloadFailedException
public StoreId getStoreId( AdvertisedSocketAddress from ) throws StoreIdDownloadFailedException
{
return storeCopyClient.fetchStoreId( from );
}
Expand Down
Expand Up @@ -20,16 +20,22 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class StoreCopyClient
{
private final CatchUpClient catchUpClient;
Expand All @@ -41,7 +47,7 @@ public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider )
log = logProvider.getLog( getClass() );
}

long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams )
long copyStoreFiles( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreams storeFileStreams )
throws StoreCopyFailedException
{
try
Expand Down Expand Up @@ -82,7 +88,7 @@ public void onFileStreamingComplete( CompletableFuture<Long> signal,
}
}

StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException
StoreId fetchStoreId( AdvertisedSocketAddress fromAddress ) throws StoreIdDownloadFailedException
{
try
{
Expand All @@ -95,7 +101,7 @@ public void onGetStoreIdResponse( CompletableFuture<StoreId> signal,
signal.complete( response.storeId() );
}
};
return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), responseHandler );
return catchUpClient.makeBlockingRequest( fromAddress, new GetStoreIdRequest(), responseHandler );
}
catch ( CatchUpClientException e )
{
Expand Down
Expand Up @@ -21,8 +21,8 @@

import java.io.IOException;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.logging.Log;
Expand All @@ -48,7 +48,7 @@ public StoreCopyProcess( FileSystemAbstraction fs, PageCache pageCache, LocalDat
this.log = logProvider.getLog( getClass() );
}

public void replaceWithStoreFrom( MemberId source, StoreId expectedStoreId )
public void replaceWithStoreFrom( AdvertisedSocketAddress source, StoreId expectedStoreId )
throws IOException, StoreCopyFailedException, StreamingTransactionsFailedException
{
try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache,
Expand Down
Expand Up @@ -34,10 +34,13 @@
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
Expand All @@ -47,7 +50,6 @@
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.CANCELLED;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING;
Expand Down Expand Up @@ -88,6 +90,7 @@ enum State
private final long txPullIntervalMillis;
private final BatchingTxApplier applier;
private final PullRequestMonitor pullRequestMonitor;
private final TopologyService topologyService;

private RenewableTimeout timeout;
private volatile State state = TX_PULLING;
Expand All @@ -99,7 +102,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
Lifecycle startStopOnStoreCopy, CatchUpClient catchUpClient,
UpstreamDatabaseStrategySelector selectionStrategy, RenewableTimeoutService timeoutService,
long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors,
StoreCopyProcess storeCopyProcess, Supplier<DatabaseHealth> databaseHealthSupplier )
StoreCopyProcess storeCopyProcess, Supplier<DatabaseHealth> databaseHealthSupplier, TopologyService topologyService )

{
this.localDatabase = localDatabase;
Expand All @@ -113,6 +116,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
this.storeCopyProcess = storeCopyProcess;
this.databaseHealthSupplier = databaseHealthSupplier;
this.topologyService = topologyService;
}

@Override
Expand Down Expand Up @@ -246,10 +250,11 @@ private boolean pullAndApplyBatchOfTransactions( MemberId upstream, StoreId loca
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
log.debug( "Pull transactions from %s where tx id > %d [batch #%d]", upstream, lastQueuedTxId, batchCount );

AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( upstream ).orElseThrow( () -> new TopologyLookupException( upstream ) );
TxStreamFinishedResponse response;
try
{
response = catchUpClient.makeBlockingRequest( upstream, txPullRequest, new CatchUpResponseAdaptor<TxStreamFinishedResponse>()
response = catchUpClient.makeBlockingRequest( fromAddress, txPullRequest, new CatchUpResponseAdaptor<TxStreamFinishedResponse>()
{
@Override
public void onTxPullResponse( CompletableFuture<TxStreamFinishedResponse> signal, TxPullResponse response )
Expand Down Expand Up @@ -323,9 +328,10 @@ private void downloadDatabase( MemberId upstream, StoreId localStoreId )
throw new RuntimeException( throwable );
}

AdvertisedSocketAddress fromAddress = topologyService.findCatchupAddress( upstream ).orElseThrow( () -> new TopologyLookupException( upstream ) );
try
{
storeCopyProcess.replaceWithStoreFrom( upstream, localStoreId );
storeCopyProcess.replaceWithStoreFrom( fromAddress, localStoreId );
}
catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e )
{
Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.monitoring.Monitors;

public class TxPullClient
Expand All @@ -40,12 +40,11 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors )
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
}

public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long previousTxId,
TxPullResponseListener txPullResponseListener )
public TxPullRequestResult pullTransactions( AdvertisedSocketAddress fromAddress, StoreId storeId, long previousTxId, TxPullResponseListener txPullResponseListener )
throws CatchUpClientException
{
pullRequestMonitor.txPullRequest( previousTxId );
return catchUpClient.makeBlockingRequest( from, new TxPullRequest( previousTxId, storeId ),
return catchUpClient.makeBlockingRequest( fromAddress, new TxPullRequest( previousTxId, storeId ),
new CatchUpResponseAdaptor<TxPullRequestResult>()
{
private long lastTxIdReceived = previousTxId;
Expand Down
Expand Up @@ -27,7 +27,7 @@
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.graphdb.DatabaseShutdownException;
Expand All @@ -46,21 +46,21 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List
private final Outbound<MemberId,RaftMessages.RaftMessage> outbound;
private final ProgressTracker progressTracker;
private final LocalSessionPool sessionPool;
private final RetryStrategy retryStrategy;
private final TimeoutStrategy timeoutStrategy;
private final AvailabilityGuard availabilityGuard;
private final LeaderLocator leaderLocator;
private final Log log;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy retryStrategy, AvailabilityGuard availabilityGuard,
ProgressTracker progressTracker, TimeoutStrategy timeoutStrategy, AvailabilityGuard availabilityGuard,
LogProvider logProvider )
{
this.me = me;
this.outbound = outbound;
this.progressTracker = progressTracker;
this.sessionPool = sessionPool;
this.retryStrategy = retryStrategy;
this.timeoutStrategy = timeoutStrategy;
this.availabilityGuard = availabilityGuard;

this.leaderLocator = leaderLocator;
Expand All @@ -76,7 +76,7 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() );
Progress progress = progressTracker.start( operation );

RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
TimeoutStrategy.Timeout timeout = timeoutStrategy.newTimeout();
do
{
assertDatabaseNotShutdown();
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.core.state.storage.StateStorage;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
Expand Down Expand Up @@ -99,6 +100,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
final LifeSupport life = platformModule.life;
final Monitors monitors = platformModule.monitors;
final JobScheduler jobScheduler = platformModule.jobScheduler;
final TopologyService topologyService = clusteringModule.topologyService();

LogProvider logProvider = logging.getInternalLogProvider();
LogProvider userLogProvider = logging.getUserLogProvider();
Expand All @@ -120,13 +122,10 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla

long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis();
CatchUpClient catchUpClient = life
.add( new CatchUpClient( clusteringModule.topologyService(), logProvider, Clocks.systemClock(),
inactivityTimeoutMillis, monitors, sslPolicy ) );
.add(new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMillis, monitors, sslPolicy ) );

RemoteStore remoteStore = new RemoteStore( logProvider, fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient, logProvider ),
new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(),
platformModule.monitors );
RemoteStore remoteStore = new RemoteStore( logProvider, fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient, logProvider ),
new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), platformModule.monitors );

CopiedStoreRecovery copiedStoreRecovery = new CopiedStoreRecovery( config,
platformModule.kernelExtensions.listFactories(), platformModule.pageCache );
Expand Down Expand Up @@ -177,7 +176,7 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, servicesToStopOnStoreCopy,
remoteStore, catchUpClient, logProvider, storeCopyProcess, coreStateMachinesModule.coreStateMachines,
snapshotService, commandApplicationProcess );
snapshotService, commandApplicationProcess, topologyService );

RaftMessageHandler messageHandler = new RaftMessageHandler( localDatabase, logProvider,
consensusModule.raftMachine(), downloader, commandApplicationProcess );
Expand Down

0 comments on commit 36970c3

Please sign in to comment.