Skip to content

Commit

Permalink
StoreCopy E_TOO_FAR_BEHIND response before async checkpoint
Browse files Browse the repository at this point in the history
Performing a store copy against a machine that is too far behind
will
cause a checkpoint to be performed before the erroneous response is
sent.

This change sends the response first then asynchronously performs a
checkpoint.
  • Loading branch information
phughk committed Jul 30, 2018
1 parent c9d73b9 commit 3d725ac
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 72 deletions.
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.scheduler;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class LockingExecutor implements Executor
{
private final JobScheduler jobScheduler;
private final JobScheduler.Group group;
private final AtomicBoolean latch = new AtomicBoolean( false );

public LockingExecutor( JobScheduler jobScheduler, JobScheduler.Group group )
{
this.jobScheduler = jobScheduler;
this.group = group;
}

@Override
public void execute( Runnable runnable )
{
if ( latch.compareAndSet( false, true ) )
{
jobScheduler.schedule( group, () ->
{
try
{
runnable.run();
}
finally
{
latch.set( false );
}
} );
}
}
}
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.catchup;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.scheduler.LockingExecutor;

public class CheckPointerService
{
private final Supplier<CheckPointer> checkPointerSupplier;
private final Executor lockingCheckpointExecutor;

public CheckPointerService( Supplier<CheckPointer> checkPointerSupplier, JobScheduler jobScheduler, JobScheduler.Group group )
{
this.checkPointerSupplier = checkPointerSupplier;
this.lockingCheckpointExecutor = new LockingExecutor( jobScheduler, group );
}

public CheckPointer getCheckPointer()
{
return checkPointerSupplier.get();
}

public long lastCheckPointedTransactionId()
{
return checkPointerSupplier.get().lastCheckPointedTransactionId();
}

public void tryAsyncCheckpoint( Consumer<IOException> exceptionHandler )
{
lockingCheckpointExecutor.execute( () ->
{
try
{
getCheckPointer().tryCheckPoint( new SimpleTriggerInfo( "Store file copy" ) );
}
catch ( IOException e )
{
exceptionHandler.accept( e );
}
} );
}
}
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex; import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand All @@ -61,14 +60,13 @@ public class RegularCatchupServerHandler implements CatchupServerHandler
private final PageCache pageCache; private final PageCache pageCache;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CoreSnapshotService snapshotService; private final CoreSnapshotService snapshotService;
private final Supplier<CheckPointer> checkPointerSupplier; private final CheckPointerService checkPointerService;


public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier, public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, PageCache pageCache, Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, Supplier<CheckPointer> checkPointerSupplier ) StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, CheckPointerService checkPointerService )
{ {

this.monitors = monitors; this.monitors = monitors;
this.logProvider = logProvider; this.logProvider = logProvider;
this.storeIdSupplier = storeIdSupplier; this.storeIdSupplier = storeIdSupplier;
Expand All @@ -80,7 +78,7 @@ public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider,
this.pageCache = pageCache; this.pageCache = pageCache;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex; this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.snapshotService = snapshotService; this.snapshotService = snapshotService;
this.checkPointerSupplier = checkPointerSupplier; this.checkPointerService = checkPointerService;
} }


@Override @Override
Expand All @@ -99,22 +97,21 @@ public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupSer
@Override @Override
public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ) public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol )
{ {
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier, return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerService::getCheckPointer, storeCopyCheckPointMutex,
new PrepareStoreCopyFilesProvider( pageCache, fs ) ); dataSourceSupplier, new PrepareStoreCopyFilesProvider( pageCache, fs ) );
} }


@Override @Override
public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol ) public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol )
{ {
return new StoreCopyRequestHandler.GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, return new StoreCopyRequestHandler.GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerService,
new StoreFileStreamingProtocol(), pageCache, fs, new StoreFileStreamingProtocol(), pageCache, fs, logProvider );
logProvider );
} }


@Override @Override
public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol ) public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol )
{ {
return new StoreCopyRequestHandler.GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, return new StoreCopyRequestHandler.GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerService,
new StoreFileStreamingProtocol(), pageCache, fs, logProvider ); new StoreFileStreamingProtocol(), pageCache, fs, logProvider );
} }


Expand Down
Expand Up @@ -22,44 +22,15 @@
*/ */
package org.neo4j.causalclustering.catchup.storecopy; package org.neo4j.causalclustering.catchup.storecopy;


import java.io.IOException;

import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;


class DataSourceChecks class DataSourceChecks
{ {
private DataSourceChecks() private DataSourceChecks()
{ {
} }


static boolean isTransactionWithinReach( long requiredTxId, CheckPointer checkpointer )
{
if ( isWithinLastCheckPoint( requiredTxId, checkpointer ) )
{
return true;
}
else
{
try
{
checkpointer.tryCheckPoint( new SimpleTriggerInfo( "Store file copy" ) );
return isWithinLastCheckPoint( requiredTxId, checkpointer );
}
catch ( IOException e )
{
return false;
}
}
}

private static boolean isWithinLastCheckPoint( long atLeast, CheckPointer checkPointer )
{
return checkPointer.lastCheckPointedTransactionId() >= atLeast;
}

static boolean hasSameStoreId( StoreId storeId, NeoStoreDataSource dataSource ) static boolean hasSameStoreId( StoreId storeId, NeoStoreDataSource dataSource )
{ {
return storeId.equalToKernelStoreId( dataSource.getStoreId() ); return storeId.equalToKernelStoreId( dataSource.getStoreId() );
Expand Down
Expand Up @@ -34,42 +34,41 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.catchup.CatchupServerProtocol; import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.messaging.StoreCopyRequest; import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.storageengine.api.StoreFileMetadata;


import static java.lang.String.format; import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.hasSameStoreId; import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.hasSameStoreId;
import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.isTransactionWithinReach;
import static org.neo4j.io.fs.FileUtils.relativePath; import static org.neo4j.io.fs.FileUtils.relativePath;


public abstract class StoreCopyRequestHandler<T extends StoreCopyRequest> extends SimpleChannelInboundHandler<T> public abstract class StoreCopyRequestHandler<T extends StoreCopyRequest> extends SimpleChannelInboundHandler<T>
{ {
private final CatchupServerProtocol protocol; private final CatchupServerProtocol protocol;
private final Supplier<NeoStoreDataSource> dataSource; private final Supplier<NeoStoreDataSource> dataSource;
private final Supplier<CheckPointer> checkpointerSupplier; private final CheckPointerService checkPointerService;
private final StoreFileStreamingProtocol storeFileStreamingProtocol; private final StoreFileStreamingProtocol storeFileStreamingProtocol;
private final PageCache pageCache; private final PageCache pageCache;
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final Log log; private final Log log;


StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier, StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, CheckPointerService checkPointerService,
StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider ) StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider )
{ {
this.protocol = protocol; this.protocol = protocol;
this.dataSource = dataSource; this.dataSource = dataSource;
this.checkpointerSupplier = checkpointerSupplier;
this.storeFileStreamingProtocol = storeFileStreamingProtocol; this.storeFileStreamingProtocol = storeFileStreamingProtocol;
this.pageCache = pageCache; this.pageCache = pageCache;
this.fs = fs; this.fs = fs;
this.log = logProvider.getLog( StoreCopyRequestHandler.class ); this.log = logProvider.getLog( StoreCopyRequestHandler.class );
this.checkPointerService = checkPointerService;
} }


@Override @Override
Expand All @@ -84,9 +83,11 @@ protected void channelRead0( ChannelHandlerContext ctx, T request ) throws Excep
{ {
responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH; responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;
} }
else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpointerSupplier.get() ) ) else if ( !isTransactionWithinReach( request.requiredTransactionId() ) )
{ {
responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND; responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND;
checkPointerService.tryAsyncCheckpoint(
e -> log.error( "Failed to do a checkpoint that was invoked after a too far behind error on store copy request", e ) );
} }
else else
{ {
Expand All @@ -111,6 +112,11 @@ else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpoint
} }
} }


private boolean isTransactionWithinReach( long transactionId )
{
return checkPointerService.lastCheckPointedTransactionId() >= transactionId;
}

abstract ResourceIterator<StoreFileMetadata> files( T request, NeoStoreDataSource neoStoreDataSource ) throws IOException; abstract ResourceIterator<StoreFileMetadata> files( T request, NeoStoreDataSource neoStoreDataSource ) throws IOException;


private static Iterator<StoreFileMetadata> onlyOne( List<StoreFileMetadata> files, String description ) private static Iterator<StoreFileMetadata> onlyOne( List<StoreFileMetadata> files, String description )
Expand All @@ -129,10 +135,11 @@ private static Predicate<StoreFileMetadata> matchesRequested( String fileName )


public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler<GetStoreFileRequest> public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler<GetStoreFileRequest>
{ {
public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, Supplier<CheckPointer> checkpointerSupplier, public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource,
StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider ) CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider )
{ {
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider ); super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
} }


@Override @Override
Expand All @@ -150,10 +157,10 @@ ResourceIterator<StoreFileMetadata> files( GetStoreFileRequest request, NeoStore
public static class GetIndexSnapshotRequestHandler extends StoreCopyRequestHandler<GetIndexFilesRequest> public static class GetIndexSnapshotRequestHandler extends StoreCopyRequestHandler<GetIndexFilesRequest>
{ {
public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource,
Supplier<CheckPointer> checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider ) FileSystemAbstraction fs, LogProvider logProvider )
{ {
super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider ); super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
} }


@Override @Override
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller; import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder; import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerHandler; import org.neo4j.causalclustering.catchup.CatchupServerHandler;
import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier; import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler; import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper; import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper;
Expand Down Expand Up @@ -192,11 +193,13 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols ); ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols );
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), supportedModifierProtocols ); ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), supportedModifierProtocols );


CheckPointerService checkPointerService =
new CheckPointerService( new CheckpointerSupplier( platformModule.dependencies ), jobScheduler, JobScheduler.Groups.checkPoint );
CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, snapshotService, fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, snapshotService,
new CheckpointerSupplier( platformModule.dependencies ) ); checkPointerService );


CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory, CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory,
logProvider, catchupServerHandler ); logProvider, catchupServerHandler );
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.neo4j.causalclustering.catchup.CatchUpResponseHandler; import org.neo4j.causalclustering.catchup.CatchUpResponseHandler;
import org.neo4j.causalclustering.catchup.CatchupProtocolClientInstaller; import org.neo4j.causalclustering.catchup.CatchupProtocolClientInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder; import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier; import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler; import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery; import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
Expand Down Expand Up @@ -139,6 +140,7 @@
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleStatus; import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;
Expand Down Expand Up @@ -320,10 +322,12 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, upstreamDatabaseStrategySelector, retryStrategy, logProvider, life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, upstreamDatabaseStrategySelector, retryStrategy, logProvider,
platformModule.logging.getUserLogProvider(), storeCopyProcess, topologyService ) ); platformModule.logging.getUserLogProvider(), storeCopyProcess, topologyService ) );


RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, logProvider, localDatabase::storeId,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null, new CheckpointerSupplier( platformModule.dependencies ) ); fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null,
new CheckPointerService( new CheckpointerSupplier( platformModule.dependencies ),
platformModule.jobScheduler, JobScheduler.Groups.checkPoint ) );


InstalledProtocolHandler installedProtocolHandler = new InstalledProtocolHandler(); // TODO: hook into a procedure InstalledProtocolHandler installedProtocolHandler = new InstalledProtocolHandler(); // TODO: hook into a procedure
Server catchupServer = new CatchupServerBuilder( catchupServerHandler ) Server catchupServer = new CatchupServerBuilder( catchupServerHandler )
Expand Down

0 comments on commit 3d725ac

Please sign in to comment.