diff --git a/community/common/src/main/java/org/neo4j/scheduler/LockingExecutor.java b/community/common/src/main/java/org/neo4j/scheduler/LockingExecutor.java
new file mode 100644
index 0000000000000..3788c537050f4
--- /dev/null
+++ b/community/common/src/main/java/org/neo4j/scheduler/LockingExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2002-2018 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.scheduler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LockingExecutor implements Executor
+{
+ private final JobScheduler jobScheduler;
+ private final JobScheduler.Group group;
+ private final AtomicBoolean latch = new AtomicBoolean( false );
+
+ public LockingExecutor( JobScheduler jobScheduler, JobScheduler.Group group )
+ {
+ this.jobScheduler = jobScheduler;
+ this.group = group;
+ }
+
+ @Override
+ public void execute( Runnable runnable )
+ {
+ if ( latch.compareAndSet( false, true ) )
+ {
+ jobScheduler.schedule( group, () ->
+ {
+ try
+ {
+ runnable.run();
+ }
+ finally
+ {
+ latch.set( false );
+ }
+ } );
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CheckPointerService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CheckPointerService.java
new file mode 100644
index 0000000000000..aacaa7c31f7b0
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CheckPointerService.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2002-2018 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j Enterprise Edition. The included source
+ * code can be redistributed and/or modified under the terms of the
+ * GNU AFFERO GENERAL PUBLIC LICENSE Version 3
+ * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
+ * Commons Clause, as found in the associated LICENSE.txt file.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * Neo4j object code can be licensed independently from the source
+ * under separate terms from the AGPL. Inquiries can be directed to:
+ * licensing@neo4j.com
+ *
+ * More information is also available at:
+ * https://neo4j.com/licensing/
+ */
+package org.neo4j.causalclustering.catchup;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
+import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
+import org.neo4j.scheduler.JobScheduler;
+import org.neo4j.scheduler.LockingExecutor;
+
+public class CheckPointerService
+{
+ private final Supplier checkPointerSupplier;
+ private final Executor lockingCheckpointExecutor;
+
+ public CheckPointerService( Supplier checkPointerSupplier, JobScheduler jobScheduler, JobScheduler.Group group )
+ {
+ this.checkPointerSupplier = checkPointerSupplier;
+ this.lockingCheckpointExecutor = new LockingExecutor( jobScheduler, group );
+ }
+
+ public CheckPointer getCheckPointer()
+ {
+ return checkPointerSupplier.get();
+ }
+
+ public long lastCheckPointedTransactionId()
+ {
+ return checkPointerSupplier.get().lastCheckPointedTransactionId();
+ }
+
+ public void tryAsyncCheckpoint( Consumer exceptionHandler )
+ {
+ lockingCheckpointExecutor.execute( () ->
+ {
+ try
+ {
+ getCheckPointer().tryCheckPoint( new SimpleTriggerInfo( "Store file copy" ) );
+ }
+ catch ( IOException e )
+ {
+ exceptionHandler.accept( e );
+ }
+ } );
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java
index 4e7a36568ea71..c6f255ae06783 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java
@@ -42,7 +42,6 @@
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
-import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
@@ -61,14 +60,13 @@ public class RegularCatchupServerHandler implements CatchupServerHandler
private final PageCache pageCache;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CoreSnapshotService snapshotService;
- private final Supplier checkPointerSupplier;
+ private final CheckPointerService checkPointerService;
public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier storeIdSupplier,
Supplier transactionIdStoreSupplier, Supplier logicalTransactionStoreSupplier,
Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, PageCache pageCache,
- StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, Supplier checkPointerSupplier )
+ StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, CheckPointerService checkPointerService )
{
-
this.monitors = monitors;
this.logProvider = logProvider;
this.storeIdSupplier = storeIdSupplier;
@@ -80,7 +78,7 @@ public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider,
this.pageCache = pageCache;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.snapshotService = snapshotService;
- this.checkPointerSupplier = checkPointerSupplier;
+ this.checkPointerService = checkPointerService;
}
@Override
@@ -99,22 +97,21 @@ public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupSer
@Override
public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
- return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier,
- new PrepareStoreCopyFilesProvider( pageCache, fs ) );
+ return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerService::getCheckPointer, storeCopyCheckPointMutex,
+ dataSourceSupplier, new PrepareStoreCopyFilesProvider( pageCache, fs ) );
}
@Override
public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
- return new StoreCopyRequestHandler.GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier,
- new StoreFileStreamingProtocol(), pageCache, fs,
- logProvider );
+ return new StoreCopyRequestHandler.GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerService,
+ new StoreFileStreamingProtocol(), pageCache, fs, logProvider );
}
@Override
public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
- return new StoreCopyRequestHandler.GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier,
+ return new StoreCopyRequestHandler.GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerService,
new StoreFileStreamingProtocol(), pageCache, fs, logProvider );
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/DataSourceChecks.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/DataSourceChecks.java
index 917d7a4a9a2ff..5cf29c5697370 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/DataSourceChecks.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/DataSourceChecks.java
@@ -22,12 +22,8 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;
-import java.io.IOException;
-
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.kernel.NeoStoreDataSource;
-import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
-import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
class DataSourceChecks
{
@@ -35,31 +31,6 @@ private DataSourceChecks()
{
}
- static boolean isTransactionWithinReach( long requiredTxId, CheckPointer checkpointer )
- {
- if ( isWithinLastCheckPoint( requiredTxId, checkpointer ) )
- {
- return true;
- }
- else
- {
- try
- {
- checkpointer.tryCheckPoint( new SimpleTriggerInfo( "Store file copy" ) );
- return isWithinLastCheckPoint( requiredTxId, checkpointer );
- }
- catch ( IOException e )
- {
- return false;
- }
- }
- }
-
- private static boolean isWithinLastCheckPoint( long atLeast, CheckPointer checkPointer )
- {
- return checkPointer.lastCheckPointedTransactionId() >= atLeast;
- }
-
static boolean hasSameStoreId( StoreId storeId, NeoStoreDataSource dataSource )
{
return storeId.equalToKernelStoreId( dataSource.getStoreId() );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java
index 25307aec0b8e1..33a5228e8752a 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java
@@ -34,42 +34,41 @@
import java.util.stream.Collectors;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
+import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.messaging.StoreCopyRequest;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
-import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StoreFileMetadata;
import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.hasSameStoreId;
-import static org.neo4j.causalclustering.catchup.storecopy.DataSourceChecks.isTransactionWithinReach;
import static org.neo4j.io.fs.FileUtils.relativePath;
public abstract class StoreCopyRequestHandler extends SimpleChannelInboundHandler
{
private final CatchupServerProtocol protocol;
private final Supplier dataSource;
- private final Supplier checkpointerSupplier;
+ private final CheckPointerService checkPointerService;
private final StoreFileStreamingProtocol storeFileStreamingProtocol;
private final PageCache pageCache;
private final FileSystemAbstraction fs;
private final Log log;
- StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, Supplier checkpointerSupplier,
+ StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, CheckPointerService checkPointerService,
StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider )
{
this.protocol = protocol;
this.dataSource = dataSource;
- this.checkpointerSupplier = checkpointerSupplier;
this.storeFileStreamingProtocol = storeFileStreamingProtocol;
this.pageCache = pageCache;
this.fs = fs;
this.log = logProvider.getLog( StoreCopyRequestHandler.class );
+ this.checkPointerService = checkPointerService;
}
@Override
@@ -84,9 +83,11 @@ protected void channelRead0( ChannelHandlerContext ctx, T request ) throws Excep
{
responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;
}
- else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpointerSupplier.get() ) )
+ else if ( !isTransactionWithinReach( request.requiredTransactionId() ) )
{
responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND;
+ checkPointerService.tryAsyncCheckpoint(
+ e -> log.error( "Failed to do a checkpoint that was invoked after a too far behind error on store copy request", e ) );
}
else
{
@@ -111,6 +112,11 @@ else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpoint
}
}
+ private boolean isTransactionWithinReach( long transactionId )
+ {
+ return checkPointerService.lastCheckPointedTransactionId() >= transactionId;
+ }
+
abstract ResourceIterator files( T request, NeoStoreDataSource neoStoreDataSource ) throws IOException;
private static Iterator onlyOne( List files, String description )
@@ -129,10 +135,11 @@ private static Predicate matchesRequested( String fileName )
public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler
{
- public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, Supplier checkpointerSupplier,
- StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider )
+ public GetStoreFileRequestHandler( CatchupServerProtocol protocol, Supplier dataSource,
+ CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
+ FileSystemAbstraction fs, LogProvider logProvider )
{
- super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
+ super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
}
@Override
@@ -150,10 +157,10 @@ ResourceIterator files( GetStoreFileRequest request, NeoStore
public static class GetIndexSnapshotRequestHandler extends StoreCopyRequestHandler
{
public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier dataSource,
- Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
+ CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider )
{
- super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
+ super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
}
@Override
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
index 6887ed7cfcc6e..a25e8550898dd 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
@@ -35,6 +35,7 @@
import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerHandler;
+import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper;
@@ -192,11 +193,13 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols );
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), supportedModifierProtocols );
+ CheckPointerService checkPointerService =
+ new CheckPointerService( new CheckpointerSupplier( platformModule.dependencies ), jobScheduler, JobScheduler.Groups.checkPoint );
CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, snapshotService,
- new CheckpointerSupplier( platformModule.dependencies ) );
+ checkPointerService );
CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory,
logProvider, catchupServerHandler );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
index 1eb138c6b92a7..13f269bd0f717 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
@@ -39,6 +39,7 @@
import org.neo4j.causalclustering.catchup.CatchUpResponseHandler;
import org.neo4j.causalclustering.catchup.CatchupProtocolClientInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
+import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
@@ -139,6 +140,7 @@
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.logging.LogProvider;
+import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.time.Clocks;
import org.neo4j.udc.UsageData;
@@ -320,10 +322,12 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
life.add( new ReadReplicaStartupProcess( remoteStore, localDatabase, txPulling, upstreamDatabaseStrategySelector, retryStrategy, logProvider,
platformModule.logging.getUserLogProvider(), storeCopyProcess, topologyService ) );
- RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
- logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
+ RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, logProvider, localDatabase::storeId,
+ platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
- fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null, new CheckpointerSupplier( platformModule.dependencies ) );
+ fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null,
+ new CheckPointerService( new CheckpointerSupplier( platformModule.dependencies ),
+ platformModule.jobScheduler, JobScheduler.Groups.checkPoint ) );
InstalledProtocolHandler installedProtocolHandler = new InstalledProtocolHandler(); // TODO: hook into a procedure
Server catchupServer = new CatchupServerBuilder( catchupServerHandler )
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java
index 4e65110644d33..b51197481b21f 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java
@@ -28,9 +28,16 @@
import java.io.File;
import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
+import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.StoreCopyRequest;
@@ -44,6 +51,7 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
+import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StoreFileMetadata;
import static org.junit.Assert.assertEquals;
@@ -59,10 +67,13 @@ public class StoreCopyRequestHandlerTest
private final DefaultFileSystemAbstraction fileSystemAbstraction = new DefaultFileSystemAbstraction();
private final NeoStoreDataSource neoStoreDataSource = mock( NeoStoreDataSource.class );
- private final CheckPointer checkPointer = new FakeCheckPointer();
+ private final FakeCheckPointer checkPointer = new FakeCheckPointer();
private final PageCache pageCache = mock( PageCache.class );
private EmbeddedChannel embeddedChannel;
private CatchupServerProtocol catchupServerProtocol;
+ private JobScheduler jobScheduler = new FakeSingleThreadedJobScheduler();
+ private CheckPointerService checkPointerService =
+ new CheckPointerService( () -> checkPointer, jobScheduler, JobScheduler.Groups.checkPoint );
@Before
public void setup()
@@ -70,7 +81,7 @@ public void setup()
catchupServerProtocol = new CatchupServerProtocol();
catchupServerProtocol.expect( CatchupServerProtocol.State.GET_STORE_FILE );
StoreCopyRequestHandler storeCopyRequestHandler =
- new NiceStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(),
+ new NiceStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, checkPointerService, new StoreFileStreamingProtocol(),
pageCache, fileSystemAbstraction, NullLogProvider.getInstance() );
when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) );
embeddedChannel = new EmbeddedChannel( storeCopyRequestHandler );
@@ -122,10 +133,10 @@ public void shouldResetProtocolAndGiveErrorOnUncheckedException()
}
@Test
- public void shoulResetProtoclAndGiveErrorIfFilesThrowException()
+ public void shouldResetProtocolAndGiveErrorIfFilesThrowException()
{
EmbeddedChannel alternativeChannel = new EmbeddedChannel(
- new EvilStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(),
+ new EvilStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, checkPointerService, new StoreFileStreamingProtocol(),
pageCache, fileSystemAbstraction, NullLogProvider.getInstance() ) );
try
{
@@ -134,7 +145,7 @@ public void shoulResetProtoclAndGiveErrorIfFilesThrowException()
}
catch ( IllegalStateException ignore )
{
-
+ // do nothing
}
assertEquals( ResponseMessageType.STORE_COPY_FINISHED, alternativeChannel.readOutbound() );
StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_UNKNOWN );
@@ -143,13 +154,38 @@ public void shoulResetProtoclAndGiveErrorIfFilesThrowException()
assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) );
}
+ @Test
+ public void transactionsTooFarBehindStartCheckpointAsynchronously() throws IOException
+ {
+ // given checkpoint will fail if performed
+ checkPointer._tryCheckPoint = Optional.empty();
+
+ // when
+ try
+ {
+ embeddedChannel.writeInbound( new GetStoreFileRequest( STORE_ID_MATCHING, new File( "some-file" ), 123 ) );
+ fail();
+ }
+ catch ( RuntimeException e )
+ {
+ assertEquals( "FakeCheckPointer", e.getMessage() );
+ }
+
+ // then should have received error message
+ assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() );
+
+ // and should have failed on async
+ assertEquals( 1, checkPointer.invocationCounter.get() );
+ assertEquals( 1, checkPointer.failCounter.get() );
+ }
+
private class NiceStoreCopyRequestHandler extends StoreCopyRequestHandler
{
private NiceStoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource,
- Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
+ CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider )
{
- super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
+ super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
}
@Override
@@ -162,10 +198,10 @@ ResourceIterator files( StoreCopyRequest request, NeoStoreDat
private class EvilStoreCopyRequestHandler extends StoreCopyRequestHandler
{
private EvilStoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource,
- Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
+ CheckPointerService checkPointerService, StoreFileStreamingProtocol storeFileStreamingProtocol, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider )
{
- super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, pageCache, fs, logProvider );
+ super( protocol, dataSource, checkPointerService, storeFileStreamingProtocol, pageCache, fs, logProvider );
}
@Override
@@ -177,28 +213,126 @@ ResourceIterator files( StoreCopyRequest request, NeoStoreDat
private class FakeCheckPointer implements CheckPointer
{
+ Optional _checkPointIfNeeded = Optional.of( 1L );
+ Optional _tryCheckPoint = Optional.of( 1L );
+ Optional _forceCheckPoint = Optional.of( 1L );
+ Optional _lastCheckPointedTransactionId = Optional.of( 1L );
+ Supplier exceptionIfEmpty = () -> new RuntimeException( "FakeCheckPointer" );
+ AtomicInteger invocationCounter = new AtomicInteger();
+ AtomicInteger failCounter = new AtomicInteger();
+
@Override
public long checkPointIfNeeded( TriggerInfo triggerInfo )
{
- return 1;
+ incrementInvocationCounter( _checkPointIfNeeded );
+ return _checkPointIfNeeded.orElseThrow( exceptionIfEmpty );
}
@Override
public long tryCheckPoint( TriggerInfo triggerInfo )
{
- return 1;
+ incrementInvocationCounter( _tryCheckPoint );
+ return _tryCheckPoint.orElseThrow( exceptionIfEmpty );
}
@Override
public long forceCheckPoint( TriggerInfo triggerInfo )
{
- return 1;
+ incrementInvocationCounter( _forceCheckPoint );
+ return _forceCheckPoint.orElseThrow( exceptionIfEmpty );
}
@Override
public long lastCheckPointedTransactionId()
{
- return 1;
+ incrementInvocationCounter( _lastCheckPointedTransactionId );
+ return _lastCheckPointedTransactionId.orElseThrow( exceptionIfEmpty );
+ }
+
+ private void incrementInvocationCounter( Optional variable )
+ {
+ if ( variable.isPresent() )
+ {
+ invocationCounter.getAndIncrement();
+ return;
+ }
+ failCounter.getAndIncrement();
+ }
+ }
+
+ class FakeSingleThreadedJobScheduler implements JobScheduler
+ {
+ @Override
+ public void setTopLevelGroupName( String name )
+ {
+ // do nothing
+ }
+
+ @Override
+ public Executor executor( Group group )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public ExecutorService workStealingExecutor( Group group, int parallelism )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public ThreadFactory threadFactory( Group group )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public JobHandle schedule( Group group, Runnable job )
+ {
+ job.run();
+ return mock( JobHandle.class );
+ }
+
+ @Override
+ public JobHandle schedule( Group group, Runnable runnable, long initialDelay, TimeUnit timeUnit )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public JobHandle scheduleRecurring( Group group, Runnable runnable, long period, TimeUnit timeUnit )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay, long period, TimeUnit timeUnit )
+ {
+ throw new RuntimeException( "Unimplemented" );
+ }
+
+ @Override
+ public void init() throws Throwable
+ {
+ // do nothing
+ }
+
+ @Override
+ public void start() throws Throwable
+ {
+ // do nothing
+ }
+
+ @Override
+ public void stop() throws Throwable
+ {
+ // do nothing
+ }
+
+ @Override
+ public void shutdown() throws Throwable
+ {
+ // do nothing
}
}
}
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java
index 9c916c32628b9..c4e1a7e0e33dc 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java
@@ -23,12 +23,10 @@
package org.neo4j.causalclustering.catchup.storecopy;
import java.util.function.BooleanSupplier;
-import java.util.function.Function;
import java.util.function.Supplier;
import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller;
-import org.neo4j.causalclustering.catchup.CatchupServerHandler;
-import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
+import org.neo4j.causalclustering.catchup.CheckPointerService;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.StoreId;
@@ -51,6 +49,7 @@
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.NeoStoreDataSource;
+import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
@@ -60,6 +59,7 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;
+import org.neo4j.scheduler.JobScheduler;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@@ -98,7 +98,8 @@ private static ChildInitializer childInitializer( FileSystemAbstraction fileSyst
RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( new Monitors(), logProvider,
() -> storeId, dependencies.provideDependency( TransactionIdStore.class ), dependencies.provideDependency( LogicalTransactionStore.class ),
- dataSource, availability, fileSystem, pageCache, storeCopyCheckPointMutex, null, checkPointer );
+ dataSource, availability, fileSystem, pageCache, storeCopyCheckPointMutex, null,
+ new CheckPointerService( checkPointer, new CentralJobScheduler(), JobScheduler.Groups.checkPoint ) );
NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VoidPipelineWrapperFactory.VOID_WRAPPER );
CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, logProvider,