From d25df8073cf9544ec913eb53db29a78fdf44dcff Mon Sep 17 00:00:00 2001 From: Ragnar Mellbin Date: Wed, 11 Jan 2017 17:00:15 +0100 Subject: [PATCH] First draft of block device support for Causal Cluster * For doing store copy, store files are now read and written through the page cache. * File operations - copy, move, delete - now go through the page cache for store files. * Tests have been changed to not rely on the file system to e.g. copy store directories around, and also to allow the tests to be configured with block device storage in the blockdevice repository. * The test changes primarily involve CoreToCoreCopySnapshotIT and ReadReplicaReplicationIT. * We now send an alignment requirement over with the `FileHeader` message. _This is a protocol change!_ This is only a preliminary approach, and an invitation to discuss the design and what the best way is to solve this problem. * The checks that mutually excludes Causal Clustering and Custom IO (block device) have been removed. A few things are still left to do: * Make sure TemporaryStoreDirectory cleans up after itself not only via the file system, but also via the page cache. * Closer looks at LocalDatabase and catch-up code to make sure there are no latent bugs in there. * Possibly extract some duplication that has been discovered between HA and CC? --- .../catchup/CatchUpResponseAdaptor.java | 1 + .../catchup/CatchUpResponseCallback.java | 2 +- .../catchup/CatchUpResponseHandler.java | 2 +- .../catchup/CatchupServer.java | 7 +- .../catchup/TrackingResponseHandler.java | 2 +- .../catchup/storecopy/FileHeader.java | 13 +++ .../catchup/storecopy/FileHeaderDecoder.java | 3 +- .../catchup/storecopy/FileHeaderEncoder.java | 1 + .../catchup/storecopy/FileSender.java | 24 +++-- .../storecopy/GetStoreRequestHandler.java | 30 ++++-- .../catchup/storecopy/LocalDatabase.java | 7 +- .../catchup/storecopy/RemoteStore.java | 3 +- .../catchup/storecopy/StoreCopyClient.java | 33 ++++--- .../catchup/storecopy/StoreFileStreams.java | 4 +- .../catchup/storecopy/StoreFiles.java | 40 +++++++- .../catchup/storecopy/StreamToDisk.java | 54 +++++++++- .../core/CoreGraphDatabase.java | 7 -- .../core/EnterpriseCoreEditionModule.java | 8 +- .../core/server/CoreServerModule.java | 3 +- .../EnterpriseReadReplicaEditionModule.java | 3 +- .../readreplica/ReadReplicaGraphDatabase.java | 5 - .../causalclustering/RejectCustomIOTest.java | 98 ------------------- .../catchup/storecopy/FileSenderTest.java | 21 ++-- .../scenarios/ConnectionInfoIT.java | 3 +- .../scenarios/CoreToCoreCopySnapshotIT.java | 6 +- .../scenarios/ReadReplicaReplicationIT.java | 50 +++++----- 26 files changed, 231 insertions(+), 199 deletions(-) delete mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java index db4623868f3ed..a42049ce1f6b8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java @@ -47,6 +47,7 @@ public boolean onFileContent( CompletableFuture signal, FileChunk response ) @Override public void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ) + throws IOException { signal.completeExceptionally( new CatchUpProtocolViolationException( "Unexpected response: %s", response ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java index 1be0f9f01a164..9c874a07e6ed1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java @@ -36,7 +36,7 @@ public interface CatchUpResponseCallback boolean onFileContent( CompletableFuture signal, FileChunk fileChunk ) throws IOException; - void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ); + void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ) throws IOException; void onTxPullResponse( CompletableFuture signal, TxPullResponse tx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java index ab83c270b3192..5d45242b59a69 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java @@ -39,7 +39,7 @@ public interface CatchUpResponseHandler */ boolean onFileContent( FileChunk fileChunk ) throws IOException; - void onFileStreamingComplete( StoreCopyFinishedResponse response ); + void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException; void onTxPullResponse( TxPullResponse tx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java index cf92bd45c9b19..135bcbc4b0122 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java @@ -64,6 +64,7 @@ import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; @@ -87,6 +88,7 @@ public class CatchupServer extends LifecycleAdapter private final Supplier dataSourceSupplier; private final BooleanSupplier dataSourceAvailabilitySupplier; private final FileSystemAbstraction fs; + private final PageCache pageCache; private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" ); private final CoreState coreState; @@ -102,7 +104,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp Supplier logicalTransactionStoreSupplier, Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, CoreState coreState, Config config, Monitors monitors, Supplier checkPointerSupplier, - FileSystemAbstraction fs ) + FileSystemAbstraction fs, PageCache pageCache ) { this.coreState = coreState; this.listenAddress = config.get( CausalClusteringSettings.transaction_listen_address ); @@ -118,6 +120,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp this.dataSourceSupplier = dataSourceSupplier; this.checkPointerSupplier = checkPointerSupplier; this.fs = fs; + this.pageCache = pageCache; } @Override @@ -169,7 +172,7 @@ protected void initChannel( SocketChannel ch ) monitors, logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier, - checkPointerSupplier, fs, logProvider ) ); + checkPointerSupplier, fs, pageCache, logProvider ) ); pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) ); pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java index c12e398aa7fc9..b672f303d800f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java @@ -76,7 +76,7 @@ public boolean onFileContent( FileChunk fileChunk ) throws IOException } @Override - public void onFileStreamingComplete( StoreCopyFinishedResponse response ) + public void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException { if ( !requestOutcomeSignal.isCancelled() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java index dd000db847598..045e7c308272b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java @@ -24,10 +24,18 @@ public class FileHeader { private final String fileName; + private final int requiredAlignment; public FileHeader( String fileName ) + { + // A required alignment of 1 basically means that any alignment will do. + this( fileName, 1 ); + } + + public FileHeader( String fileName, int requiredAlignment ) { this.fileName = fileName; + this.requiredAlignment = requiredAlignment; } public String fileName() @@ -35,6 +43,11 @@ public String fileName() return fileName; } + public int requiredAlignment() + { + return requiredAlignment; + } + @Override public String toString() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java index dcc98ec668983..0278edd93256f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java @@ -36,6 +36,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out byte[] bytes = new byte[length]; msg.readBytes( bytes ); String name = UTF8.decode( bytes ); - out.add( new FileHeader( name ) ); + int requiredAlignment = msg.readInt(); + out.add( new FileHeader( name, requiredAlignment ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java index cc6491d64012f..e1064953c1d08 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java @@ -34,5 +34,6 @@ protected void encode( ChannelHandlerContext ctx, FileHeader msg, ByteBuf out ) byte[] bytes = UTF8.encode( name ); out.writeInt( bytes.length ); out.writeBytes( bytes ); + out.writeInt( msg.requiredAlignment() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java index b8cba6aa052b7..af47226b7ef44 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java @@ -19,24 +19,23 @@ */ package org.neo4j.causalclustering.catchup.storecopy; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.stream.ChunkedInput; - import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; -import org.neo4j.io.fs.StoreChannel; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; class FileSender implements ChunkedInput { - private final StoreChannel channel; + private final ReadableByteChannel channel; private final ByteBuffer byteBuffer; private boolean endOfInput = false; private boolean sentChunk = false; private byte[] preFetchedBytes; - public FileSender( StoreChannel channel ) throws IOException + public FileSender( ReadableByteChannel channel ) throws IOException { this.channel = channel; byteBuffer = ByteBuffer.allocateDirect( FileChunk.MAX_SIZE ); @@ -67,7 +66,16 @@ public FileChunk readChunk( ByteBufAllocator allocator ) throws Exception sentChunk = true; } - byte[] next = prefetch(); + byte[] next; + if ( !endOfInput ) + { + next = prefetch(); + } + else + { + next = null; + } + FileChunk fileChunk = FileChunk.create( preFetchedBytes == null ? new byte[0] : preFetchedBytes, next == null ); preFetchedBytes = next; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java index a5c05a58773cc..b5f94db9871fc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.File; +import java.util.Optional; import java.util.function.Supplier; import io.netty.channel.ChannelHandlerContext; @@ -30,6 +31,8 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo; @@ -47,16 +50,18 @@ public class GetStoreRequestHandler extends SimpleChannelInboundHandler dataSource; private final Supplier checkPointerSupplier; private final FileSystemAbstraction fs; + private PageCache pageCache; private final Log log; public GetStoreRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, - Supplier checkPointerSupplier, FileSystemAbstraction fs, - LogProvider logProvider ) + Supplier checkPointerSupplier, FileSystemAbstraction fs, PageCache pageCache, + LogProvider logProvider ) { this.protocol = protocol; this.dataSource = dataSource; this.checkPointerSupplier = checkPointerSupplier; this.fs = fs; + this.pageCache = pageCache; this.log = logProvider.getLog( getClass() ); } @@ -74,12 +79,25 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) th { while ( files.hasNext() ) { - File file = files.next().file(); + StoreFileMetadata fileMetadata = files.next(); + File file = fileMetadata.file(); log.debug( "Sending file " + file ); - ctx.writeAndFlush( ResponseMessageType.FILE ); - ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ) ) ); - ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) ); + ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ), + fileMetadata.recordSize() ) ); + Optional existingMapping = pageCache.getExistingMapping( file ); + if ( existingMapping.isPresent() ) + { + try ( PagedFile pagedFile = existingMapping.get() ) + { + ctx.writeAndFlush( new FileSender( + pagedFile.openReadableByteChannel() ) ); + } + } + else + { + ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) ); + } } } endStoreCopy( SUCCESS, ctx, lastCheckPointedTx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java index 66dec877d2b35..91048065ab271 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java @@ -21,10 +21,12 @@ import java.io.File; import java.io.IOException; +import java.nio.file.StandardCopyOption; import java.util.function.Supplier; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.FileHandle; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -57,8 +59,7 @@ public class LocalDatabase implements Lifecycle private volatile TransactionCommitProcess localCommit; - public LocalDatabase( File storeDir, StoreFiles storeFiles, - DataSourceManager dataSourceManager, + public LocalDatabase( File storeDir, StoreFiles storeFiles, DataSourceManager dataSourceManager, PageCache pageCache, FileSystemAbstraction fileSystemAbstraction, Supplier databaseHealthSupplier, LogProvider logProvider ) { @@ -172,7 +173,7 @@ private boolean hasStoreFiles() for ( StoreType storeType : StoreType.values() ) { StoreFile storeFile = storeType.getStoreFile(); - if(storeFile != null) + if ( storeFile != null ) { boolean exists = fileSystemAbstraction.fileExists( new File( storeDir, storeFile.storeFileName() ) ); if ( exists ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java index 33643cac2927f..215521d195bab 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java @@ -142,7 +142,8 @@ public void copy( MemberId from, StoreId expectedStoreId, File destDir ) try { log.info( "Copying store from %s", from ); - long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs, monitors ) ); + long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs, + pageCache, monitors ) ); log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java index b8bbbabfaac86..2b6632362489e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.IOException; -import java.io.OutputStream; import java.util.concurrent.CompletableFuture; import org.neo4j.causalclustering.catchup.CatchUpClient; @@ -42,7 +41,8 @@ public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider ) log = logProvider.getLog( getClass() ); } - long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) throws StoreCopyFailedException + long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) + throws StoreCopyFailedException { try { @@ -50,27 +50,30 @@ long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams st new CatchUpResponseAdaptor() { private String destination; + private int requiredAlignment; @Override public void onFileHeader( CompletableFuture requestOutcomeSignal, FileHeader fileHeader ) { this.destination = fileHeader.fileName(); + this.requiredAlignment = fileHeader.requiredAlignment(); } @Override public boolean onFileContent( CompletableFuture signal, FileChunk fileChunk ) throws IOException { - try ( OutputStream outputStream = storeFileStreams.createStream( destination ) ) + storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() ); + if ( fileChunk.isLast() ) { - outputStream.write( fileChunk.bytes() ); + storeFileStreams.finish( destination ); } return fileChunk.isLast(); } @Override public void onFileStreamingComplete( CompletableFuture signal, - StoreCopyFinishedResponse response ) + StoreCopyFinishedResponse response ) throws IOException { log.info( "Finished streaming %s", destination ); signal.complete( response.lastCommittedTxBeforeStoreCopy() ); @@ -87,16 +90,16 @@ StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException { try { - return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), - new CatchUpResponseAdaptor() - { - @Override - public void onGetStoreIdResponse( CompletableFuture signal, - GetStoreIdResponse response ) - { - signal.complete( response.storeId() ); - } - } ); + CatchUpResponseAdaptor responseHandler = new CatchUpResponseAdaptor() + { + @Override + public void onGetStoreIdResponse( CompletableFuture signal, + GetStoreIdResponse response ) + { + signal.complete( response.storeId() ); + } + }; + return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), responseHandler ); } catch ( CatchUpClientException e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java index e2cd604c9e6dc..171273e7b6c4b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java @@ -20,9 +20,9 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.IOException; -import java.io.OutputStream; public interface StoreFileStreams { - OutputStream createStream( String destination ) throws IOException; + void write( String destination, int requiredAlignment, byte[] data ) throws IOException; + void finish( String destination ) throws IOException; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java index 2b4f0d4ea4fde..f870e92dfbef7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java @@ -22,23 +22,28 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.FileHandle; +import org.neo4j.io.pagecache.PageCache; public class StoreFiles { - private static final FilenameFilter STORE_FILE_FILTER = ( dir, name ) -> - { + private static final FilenameFilter STORE_FILE_FILTER = ( dir, name ) -> { // Skip log files and tx files from temporary database return !name.startsWith( "metrics" ) && !name.startsWith( "temp-copy" ) && - !name.startsWith( "raft-messages." ) && !name.startsWith( "debug." ) && - !name.startsWith( "data" ) && !name.startsWith( "store_lock" ); + !name.startsWith( "raft-messages." ) && !name.startsWith( "debug." ) && + !name.startsWith( "data" ) && !name.startsWith( "store_lock" ); }; private FileSystemAbstraction fs; + private PageCache pageCache; - public StoreFiles( FileSystemAbstraction fs ) + public StoreFiles( FileSystemAbstraction fs, PageCache pageCache ) { this.fs = fs; + this.pageCache = pageCache; } public void delete( File storeDir ) throws IOException @@ -47,7 +52,23 @@ public void delete( File storeDir ) throws IOException { fs.deleteRecursively( file ); } + Iterable fileHandles = pageCache.streamFilesRecursive( storeDir )::iterator; + for ( FileHandle fh : fileHandles ) + { + Path storePath = storeDir.toPath(); + Path filePath = fh.getFile().toPath(); + Path relative = storePath.relativize( filePath ); + if ( STORE_FILE_FILTER.accept( storeDir, getRootFileName( relative ) ) ) + { + fh.delete(); + } + } + } + private String getRootFileName( Path path ) + { + Path root = path.getRoot(); + return (root == null) ? path.toString() : root.toString(); } void moveTo( File source, File target ) throws IOException @@ -56,5 +77,14 @@ void moveTo( File source, File target ) throws IOException { fs.moveToDirectory( candidate, target ); } + + Iterable fileHandles = pageCache.streamFilesRecursive( source )::iterator; + for ( FileHandle fh : fileHandles ) + { + if ( STORE_FILE_FILTER.accept( source, fh.getFile().getName() ) ) + { + fh.rename( new File( target, fh.getFile().getName() ), StandardCopyOption.REPLACE_EXISTING ); + } + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java index 67ff5f9384fa0..5352e9eb65805 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java @@ -22,33 +22,81 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.monitoring.Monitors; class StreamToDisk implements StoreFileStreams { private final File storeDir; private final FileSystemAbstraction fs; + private final PageCache pageCache; private final FileCopyMonitor fileCopyMonitor; + private final Map channels; + private final Map pagedFiles; - StreamToDisk( File storeDir, FileSystemAbstraction fs, Monitors monitors ) throws IOException + StreamToDisk( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Monitors monitors ) throws IOException { this.storeDir = storeDir; this.fs = fs; + this.pageCache = pageCache; fs.mkdirs( storeDir ); this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class ); + channels = new HashMap(); + pagedFiles = new HashMap(); + } @Override - public OutputStream createStream( String destination ) throws IOException + public void write( String destination, int requiredAlignment, byte[] data ) throws IOException { File fileName = new File( storeDir, destination ); fs.mkdirs( fileName.getParentFile() ); fileCopyMonitor.copyFile( fileName ); + if ( destination.endsWith( ".id" ) ) + { + try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) ) + { + outputStream.write( data ); + } + } + else + { + WritableByteChannel channel = channels.get( destination ); + if ( channel == null ) + { + int filePageSize = pageCache.pageSize() - pageCache.pageSize() % requiredAlignment; + PagedFile pagedFile = pageCache.map( fileName, filePageSize, StandardOpenOption.CREATE ); + channel = pagedFile.openWritableByteChannel(); + pagedFiles.put( destination, pagedFile ); + channels.put( destination, channel ); + } + + ByteBuffer buffer = ByteBuffer.wrap( data ); + while ( buffer.hasRemaining() ) + { + channel.write( buffer ); + } + } + } - return fs.openAsOutputStream( fileName, true ); + @Override + public void finish( String destination ) throws IOException + { + PagedFile pagedFile = pagedFiles.get( destination ); + if ( pagedFile != null ) + { + channels.get( destination ).close(); + pagedFile.close(); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java index 1beeb7e6a478a..948f7576c8e9e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java @@ -26,20 +26,15 @@ import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.EditionModule; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.util.CustomIOConfigValidator; public class CoreGraphDatabase extends GraphDatabaseFacade { - public static final String CUSTOM_IO_EXCEPTION_MESSAGE = - "Core cluster mode is not allowed with custom IO integrations"; - public CoreGraphDatabase( File storeDir, Config config, GraphDatabaseFacadeFactory.Dependencies dependencies ) { @@ -49,8 +44,6 @@ public CoreGraphDatabase( File storeDir, Config config, public CoreGraphDatabase( File storeDir, Config config, GraphDatabaseFacadeFactory.Dependencies dependencies, DiscoveryServiceFactory discoveryServiceFactory ) { - CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, - CUSTOM_IO_EXCEPTION_MESSAGE ); Function factory = ( platformModule ) -> new EnterpriseCoreEditionModule( platformModule, discoveryServiceFactory ); new GraphDatabaseFacadeFactory( DatabaseInfo.CORE, factory ).initFacade( storeDir, config, dependencies, this ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 81e32ab471ab7..dd285af8ffe13 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -144,8 +144,12 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke logProvider = logging.getInternalLogProvider(); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); - LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( fileSystem ), - platformModule.dataSourceManager, platformModule.pageCache, fileSystem, databaseHealthSupplier, + LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, + new StoreFiles( fileSystem, platformModule.pageCache ), + platformModule.dataSourceManager, + platformModule.pageCache, + fileSystem, + databaseHealthSupplier, logProvider ); IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 3e3e94f923a45..b5d968b3342bc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -192,7 +192,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, coreState, config, - platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem ); + platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem, + platformModule.pageCache ); servicesToStopOnStoreCopy.add( catchupServer ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 28b835f8c3faa..673dba3d89b47 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -198,7 +198,8 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke DelayedRenewableTimeoutService catchupTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); - LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( fileSystem ), + StoreFiles storeFiles = new StoreFiles( fileSystem, pageCache ); + LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, storeFiles, platformModule.dataSourceManager, pageCache, fileSystem, databaseHealthSupplier, logProvider ); RemoteStore remoteStore = diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java index 6d0daf03546c5..1df88416d57b6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java @@ -31,13 +31,9 @@ import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory.Dependencies; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.util.CustomIOConfigValidator; public class ReadReplicaGraphDatabase extends GraphDatabaseFacade { - public static final String CUSTOM_IO_EXCEPTION_MESSAGE = - "Read replica mode is not allowed with custom IO integrations"; - public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies ) { this( storeDir, config, dependencies, new HazelcastDiscoveryServiceFactory() ); @@ -46,7 +42,6 @@ public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies depe public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies, DiscoveryServiceFactory discoveryServiceFactory ) { - CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE ); Function factory = ( platformModule ) -> new EnterpriseReadReplicaEditionModule( platformModule, discoveryServiceFactory ); new GraphDatabaseFacadeFactory( DatabaseInfo.READ_REPLICA, factory ).initFacade( storeDir, config, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java deleted file mode 100644 index 5f0a692b54aa7..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.Map; - -import org.neo4j.causalclustering.core.CoreGraphDatabase; -import org.neo4j.causalclustering.discovery.CoreClusterMember; -import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; -import org.neo4j.causalclustering.discovery.ReadReplica; -import org.neo4j.causalclustering.discovery.SharedDiscoveryService; -import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.test.rule.TestDirectory; - -import static java.util.Collections.emptyMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.neo4j.causalclustering.discovery.Cluster.buildAddresses; -import static org.neo4j.helpers.collection.Iterators.set; -import static org.neo4j.helpers.collection.MapUtil.stringMap; -import static org.neo4j.kernel.impl.pagecache.PageSwapperFactoryForTesting.TEST_PAGESWAPPER_NAME; -import static org.neo4j.kernel.impl.store.format.RecordFormatSelector.defaultFormat; - -/** - * At the time of writing this test, certain operations required by Causal Clustering - * do not work with custom IO configurations. This test ensures - * that we fail gracefully with a helpful error message if the user - * tries to combine Causal Clustering with custom IO configurations. - * Specifically, the functionality that Causal Clustering needs that - * custom IO configurations does not support is store copying. - */ - -public class RejectCustomIOTest -{ - private DiscoveryServiceFactory discovery = new SharedDiscoveryService(); - - @Rule - public TestDirectory storeDir = TestDirectory.testDirectory(); - - @Test - public void shouldFailToStartWithCustomIOConfigurationInCoreModeTest() throws Exception - { - try - { - Map extraParams = - stringMap( GraphDatabaseSettings.pagecache_swapper.name(), TEST_PAGESWAPPER_NAME ); - CoreClusterMember clusterMember = new CoreClusterMember( 0, 3, buildAddresses( set( 0, 1, 2 ) ), discovery, - defaultFormat().toString(), storeDir.directory(), extraParams, emptyMap() ); - clusterMember.start(); - fail( "Should not have created database with custom IO configuration in Core Mode." ); - } - catch ( RuntimeException ex ) - { - assertEquals( CoreGraphDatabase.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getMessage() ); - } - } - - @Test - public void shouldFailToStartWithCustomIOConfigurationInReadReplicaModeTest() throws Exception - { - try - { - DiscoveryServiceFactory discovery = new SharedDiscoveryService(); - Map extraParams = - stringMap( GraphDatabaseSettings.pagecache_swapper.name(), TEST_PAGESWAPPER_NAME ); - ReadReplica clusterMember = - new ReadReplica( storeDir.directory(), 2, discovery, buildAddresses( set( 0, 1, 2 ) ), - extraParams, emptyMap(), defaultFormat().toString() ); - clusterMember.start(); - fail( "Should not have created database with custom IO configuration in Read Replica Mode." ); - } - catch ( RuntimeException ex ) - { - assertEquals( ReadReplicaGraphDatabase.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getMessage() ); - } - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java index 79c64b5cec5e2..5208fa47c957f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java @@ -48,7 +48,7 @@ public class FileSenderTest @Rule public EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); @Rule - public TestDirectory testDirectory = TestDirectory.testDirectory( fsRule.get()); + public TestDirectory testDirectory = TestDirectory.testDirectory( fsRule.get() ); private final FileSystemAbstraction fs = fsRule.get(); private final Random random = new Random(); @@ -60,7 +60,7 @@ public void sendEmptyFile() throws Exception // given File emptyFile = testDirectory.file( "emptyFile" ); fs.create( emptyFile ).close(); - FileSender fileSender = new FileSender(fs.open( emptyFile, "r" )); + FileSender fileSender = new FileSender( fs.open( emptyFile, "r" ) ); // when + then assertFalse( fileSender.isEndOfInput() ); @@ -77,7 +77,7 @@ public void sendSmallFile() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } @@ -100,17 +100,18 @@ public void sendLargeFile() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } - FileSender fileSender = new FileSender(fs.open( smallFile, "r" )); + FileSender fileSender = new FileSender( fs.open( smallFile, "r" ) ); // when + then assertFalse( fileSender.isEndOfInput() ); assertEquals( FileChunk.create( copyOfRange( bytes, 0, MAX_SIZE ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, bytes.length ), true ), fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, bytes.length ), true ), + fileSender.readChunk( allocator ) ); assertNull( fileSender.readChunk( allocator ) ); assertTrue( fileSender.isEndOfInput() ); } @@ -123,7 +124,7 @@ public void sendLargeFileWithSizeMultipleOfTheChunkSize() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } @@ -133,8 +134,10 @@ public void sendLargeFileWithSizeMultipleOfTheChunkSize() throws Exception // when + then assertFalse( fileSender.isEndOfInput() ); assertEquals( FileChunk.create( copyOfRange( bytes, 0, MAX_SIZE ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, MAX_SIZE * 2 ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE * 2, bytes.length ), true ), fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, MAX_SIZE * 2 ), false ), + fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE * 2, bytes.length ), true ), + fileSender.readChunk( allocator ) ); assertNull( fileSender.readChunk( allocator ) ); assertTrue( fileSender.isEndOfInput() ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java index 3f60308e0c509..5b258bfcbc6e5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java @@ -38,6 +38,7 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.BoltConnector; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.HttpConnector; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; @@ -85,7 +86,7 @@ public void catchupServerMessage() throws Throwable CatchupServer catchupServer = new CatchupServer( logProvider, userLogProvider, mockSupplier(), mockSupplier(), mockSupplier(), mockSupplier(), mock( BooleanSupplier.class ), coreState, config, new Monitors(), - mockSupplier(), mock( FileSystemAbstraction.class ) ); + mockSupplier(), mock( FileSystemAbstraction.class ), mock( PageCache.class ) ); //then try diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java index 126a5a19d6a23..7ffd53884da1c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java @@ -49,11 +49,15 @@ import static org.neo4j.causalclustering.scenarios.SampleData.createData; import static org.neo4j.helpers.collection.MapUtil.stringMap; +/** + * Note that this test is extended in the blockdevice repository. + */ public class CoreToCoreCopySnapshotIT { + protected static final int NR_CORE_MEMBERS = 3; @Rule public final ClusterRule clusterRule = new ClusterRule( getClass() ) - .withNumberOfCoreMembers( 3 ) + .withNumberOfCoreMembers( NR_CORE_MEMBERS ) .withNumberOfReadReplicas( 0 ); @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java index 5a6d5f3b266c9..f7f1c5f85c113 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java @@ -25,7 +25,6 @@ import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -34,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; +import java.util.function.Function; import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; import org.neo4j.causalclustering.core.CausalClusteringSettings; @@ -55,7 +55,6 @@ import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.FileUtils; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.impl.muninn.StandalonePageCacheFactory; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; @@ -75,7 +74,6 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.startsWith; @@ -95,13 +93,20 @@ import static org.neo4j.kernel.impl.store.MetaDataStore.Position.TIME; import static org.neo4j.test.assertion.Assert.assertEventually; +/** + * Note that this test is extended in the blockdevice repository. + */ public class ReadReplicaReplicationIT { + // This test is extended in the blockdevice repository, and these constants are required there as well. + public static final int NR_CORE_MEMBERS = 3; + public static final int NR_READ_REPLICAS = 1; + @Rule - public final ClusterRule clusterRule = - new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 1 ) - .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) - .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); + public final ClusterRule clusterRule = new ClusterRule( getClass() ).withNumberOfCoreMembers( NR_CORE_MEMBERS ) + .withNumberOfReadReplicas( NR_READ_REPLICAS ) + .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) + .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); @Test public void shouldNotBeAbleToWriteToReadReplica() throws Exception @@ -217,13 +222,16 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI cluster.coreTx( createSomeData ); - CoreClusterMember follower = cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 2, TimeUnit.SECONDS ); - // Shutdown server before copying its data, because Windows can't copy open files. - follower.shutdown(); + cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 2, TimeUnit.SECONDS ); + // Get a read replica and make sure that it is operational ReadReplica readReplica = cluster.addReadReplicaWithId( 4 ); - putSomeDataWithDifferentStoreId( readReplica.storeDir(), follower.storeDir() ); - follower.start(); + readReplica.start(); + readReplica.database().beginTx().close(); + readReplica.shutdown(); + + // Change the store id, so it should fail to join the cluster again + changeStoreId( readReplica.storeDir() ); try { @@ -264,15 +272,12 @@ public void aReadReplicShouldBeAbleToRejoinTheCluster() throws Exception awaitEx( () -> readReplicasUpToDateAsTheLeader( cluster.awaitLeader(), cluster.readReplicas() ), 1, TimeUnit.MINUTES ); - List coreStoreDirs = - cluster.coreMembers().stream().map( CoreClusterMember::storeDir ).collect( toList() ); - List readReplicaStoreDirs = - cluster.readReplicas().stream().map( ReadReplica::storeDir ).collect( toList() ); + Function toRep = db -> DbRepresentation.of( db.database() ); + Set dbs = cluster.coreMembers().stream().map( toRep ).collect( toSet() ); + dbs.addAll( cluster.readReplicas().stream().map( toRep ).collect( toSet() ) ); cluster.shutdown(); - Set dbs = coreStoreDirs.stream().map( DbRepresentation::of ).collect( toSet() ); - dbs.addAll( readReplicaStoreDirs.stream().map( DbRepresentation::of ).collect( toSet() ) ); assertEquals( 1, dbs.size() ); } @@ -339,6 +344,7 @@ public void shouldBeAbleToDownloadANewStoreAfterPruning() throws Exception () -> DbRepresentation.of( readReplica.database() ), equalTo( DbRepresentation.of( cluster.awaitLeader().database() ) ), 10, TimeUnit.SECONDS ); } + @Test public void shouldBeAbleToPullTxAfterHavingDownloadedANewStoreAfterPruning() throws Exception { @@ -404,12 +410,6 @@ private boolean readReplicasUpToDateAsTheLeader( CoreClusterMember leader, .reduce( true, ( acc, txId ) -> acc && txId == leaderTxId, Boolean::logicalAnd ); } - private void putSomeDataWithDifferentStoreId( File storeDir, File coreStoreDir ) throws IOException - { - FileUtils.copyRecursively( coreStoreDir, storeDir ); - changeStoreId( storeDir ); - } - private void changeStoreId( File storeDir ) throws IOException { File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); @@ -505,7 +505,7 @@ public void shouldBeAbleToCopyStoresFromCoreToReadReplica() throws Exception SECONDS ); // when - cluster.addReadReplicaWithIdAndRecordFormat( 42, HighLimit.NAME ).start(); + cluster.addReadReplicaWithIdAndRecordFormat( 4, HighLimit.NAME ).start(); // then for ( final ReadReplica readReplica : cluster.readReplicas() )