diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java index db4623868f3e..a42049ce1f6b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseAdaptor.java @@ -47,6 +47,7 @@ public boolean onFileContent( CompletableFuture signal, FileChunk response ) @Override public void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ) + throws IOException { signal.completeExceptionally( new CatchUpProtocolViolationException( "Unexpected response: %s", response ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java index 1be0f9f01a16..9c874a07e6ed 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseCallback.java @@ -36,7 +36,7 @@ public interface CatchUpResponseCallback boolean onFileContent( CompletableFuture signal, FileChunk fileChunk ) throws IOException; - void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ); + void onFileStreamingComplete( CompletableFuture signal, StoreCopyFinishedResponse response ) throws IOException; void onTxPullResponse( CompletableFuture signal, TxPullResponse tx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java index ab83c270b319..5d45242b59a6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpResponseHandler.java @@ -39,7 +39,7 @@ public interface CatchUpResponseHandler */ boolean onFileContent( FileChunk fileChunk ) throws IOException; - void onFileStreamingComplete( StoreCopyFinishedResponse response ); + void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException; void onTxPullResponse( TxPullResponse tx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java index cf92bd45c9b1..135bcbc4b012 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java @@ -64,6 +64,7 @@ import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; @@ -87,6 +88,7 @@ public class CatchupServer extends LifecycleAdapter private final Supplier dataSourceSupplier; private final BooleanSupplier dataSourceAvailabilitySupplier; private final FileSystemAbstraction fs; + private final PageCache pageCache; private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" ); private final CoreState coreState; @@ -102,7 +104,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp Supplier logicalTransactionStoreSupplier, Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, CoreState coreState, Config config, Monitors monitors, Supplier checkPointerSupplier, - FileSystemAbstraction fs ) + FileSystemAbstraction fs, PageCache pageCache ) { this.coreState = coreState; this.listenAddress = config.get( CausalClusteringSettings.transaction_listen_address ); @@ -118,6 +120,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp this.dataSourceSupplier = dataSourceSupplier; this.checkPointerSupplier = checkPointerSupplier; this.fs = fs; + this.pageCache = pageCache; } @Override @@ -169,7 +172,7 @@ protected void initChannel( SocketChannel ch ) monitors, logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier, - checkPointerSupplier, fs, logProvider ) ); + checkPointerSupplier, fs, pageCache, logProvider ) ); pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) ); pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java index c12e398aa7fc..b672f303d800 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TrackingResponseHandler.java @@ -76,7 +76,7 @@ public boolean onFileContent( FileChunk fileChunk ) throws IOException } @Override - public void onFileStreamingComplete( StoreCopyFinishedResponse response ) + public void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException { if ( !requestOutcomeSignal.isCancelled() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java index dd000db84759..045e7c308272 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeader.java @@ -24,10 +24,18 @@ public class FileHeader { private final String fileName; + private final int requiredAlignment; public FileHeader( String fileName ) + { + // A required alignment of 1 basically means that any alignment will do. + this( fileName, 1 ); + } + + public FileHeader( String fileName, int requiredAlignment ) { this.fileName = fileName; + this.requiredAlignment = requiredAlignment; } public String fileName() @@ -35,6 +43,11 @@ public String fileName() return fileName; } + public int requiredAlignment() + { + return requiredAlignment; + } + @Override public String toString() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java index dcc98ec66898..0278edd93256 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java @@ -36,6 +36,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out byte[] bytes = new byte[length]; msg.readBytes( bytes ); String name = UTF8.decode( bytes ); - out.add( new FileHeader( name ) ); + int requiredAlignment = msg.readInt(); + out.add( new FileHeader( name, requiredAlignment ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java index cc6491d64012..e1064953c1d0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java @@ -34,5 +34,6 @@ protected void encode( ChannelHandlerContext ctx, FileHeader msg, ByteBuf out ) byte[] bytes = UTF8.encode( name ); out.writeInt( bytes.length ); out.writeBytes( bytes ); + out.writeInt( msg.requiredAlignment() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java index b8cba6aa052b..af47226b7ef4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileSender.java @@ -19,24 +19,23 @@ */ package org.neo4j.causalclustering.catchup.storecopy; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.stream.ChunkedInput; - import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; -import org.neo4j.io.fs.StoreChannel; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; class FileSender implements ChunkedInput { - private final StoreChannel channel; + private final ReadableByteChannel channel; private final ByteBuffer byteBuffer; private boolean endOfInput = false; private boolean sentChunk = false; private byte[] preFetchedBytes; - public FileSender( StoreChannel channel ) throws IOException + public FileSender( ReadableByteChannel channel ) throws IOException { this.channel = channel; byteBuffer = ByteBuffer.allocateDirect( FileChunk.MAX_SIZE ); @@ -67,7 +66,16 @@ public FileChunk readChunk( ByteBufAllocator allocator ) throws Exception sentChunk = true; } - byte[] next = prefetch(); + byte[] next; + if ( !endOfInput ) + { + next = prefetch(); + } + else + { + next = null; + } + FileChunk fileChunk = FileChunk.create( preFetchedBytes == null ? new byte[0] : preFetchedBytes, next == null ); preFetchedBytes = next; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java index a5c05a58773c..b5f94db9871f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestHandler.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.File; +import java.util.Optional; import java.util.function.Supplier; import io.netty.channel.ChannelHandlerContext; @@ -30,6 +31,8 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status; import org.neo4j.graphdb.ResourceIterator; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo; @@ -47,16 +50,18 @@ public class GetStoreRequestHandler extends SimpleChannelInboundHandler dataSource; private final Supplier checkPointerSupplier; private final FileSystemAbstraction fs; + private PageCache pageCache; private final Log log; public GetStoreRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, - Supplier checkPointerSupplier, FileSystemAbstraction fs, - LogProvider logProvider ) + Supplier checkPointerSupplier, FileSystemAbstraction fs, PageCache pageCache, + LogProvider logProvider ) { this.protocol = protocol; this.dataSource = dataSource; this.checkPointerSupplier = checkPointerSupplier; this.fs = fs; + this.pageCache = pageCache; this.log = logProvider.getLog( getClass() ); } @@ -74,12 +79,25 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) th { while ( files.hasNext() ) { - File file = files.next().file(); + StoreFileMetadata fileMetadata = files.next(); + File file = fileMetadata.file(); log.debug( "Sending file " + file ); - ctx.writeAndFlush( ResponseMessageType.FILE ); - ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ) ) ); - ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) ); + ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ), + fileMetadata.recordSize() ) ); + Optional existingMapping = pageCache.getExistingMapping( file ); + if ( existingMapping.isPresent() ) + { + try ( PagedFile pagedFile = existingMapping.get() ) + { + ctx.writeAndFlush( new FileSender( + pagedFile.openReadableByteChannel() ) ); + } + } + else + { + ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) ); + } } } endStoreCopy( SUCCESS, ctx, lastCheckPointedTx ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java index 66dec877d2b3..91048065ab27 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/LocalDatabase.java @@ -21,10 +21,12 @@ import java.io.File; import java.io.IOException; +import java.nio.file.StandardCopyOption; import java.util.function.Supplier; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.FileHandle; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -57,8 +59,7 @@ public class LocalDatabase implements Lifecycle private volatile TransactionCommitProcess localCommit; - public LocalDatabase( File storeDir, StoreFiles storeFiles, - DataSourceManager dataSourceManager, + public LocalDatabase( File storeDir, StoreFiles storeFiles, DataSourceManager dataSourceManager, PageCache pageCache, FileSystemAbstraction fileSystemAbstraction, Supplier databaseHealthSupplier, LogProvider logProvider ) { @@ -172,7 +173,7 @@ private boolean hasStoreFiles() for ( StoreType storeType : StoreType.values() ) { StoreFile storeFile = storeType.getStoreFile(); - if(storeFile != null) + if ( storeFile != null ) { boolean exists = fileSystemAbstraction.fileExists( new File( storeDir, storeFile.storeFileName() ) ); if ( exists ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java index 33643cac2927..215521d195ba 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java @@ -142,7 +142,8 @@ public void copy( MemberId from, StoreId expectedStoreId, File destDir ) try { log.info( "Copying store from %s", from ); - long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs, monitors ) ); + long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs, + pageCache, monitors ) ); log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java index b8bbbabfaac8..2b6632362489 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.IOException; -import java.io.OutputStream; import java.util.concurrent.CompletableFuture; import org.neo4j.causalclustering.catchup.CatchUpClient; @@ -42,7 +41,8 @@ public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider ) log = logProvider.getLog( getClass() ); } - long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) throws StoreCopyFailedException + long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) + throws StoreCopyFailedException { try { @@ -50,27 +50,30 @@ long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams st new CatchUpResponseAdaptor() { private String destination; + private int requiredAlignment; @Override public void onFileHeader( CompletableFuture requestOutcomeSignal, FileHeader fileHeader ) { this.destination = fileHeader.fileName(); + this.requiredAlignment = fileHeader.requiredAlignment(); } @Override public boolean onFileContent( CompletableFuture signal, FileChunk fileChunk ) throws IOException { - try ( OutputStream outputStream = storeFileStreams.createStream( destination ) ) + storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() ); + if ( fileChunk.isLast() ) { - outputStream.write( fileChunk.bytes() ); + storeFileStreams.finish( destination ); } return fileChunk.isLast(); } @Override public void onFileStreamingComplete( CompletableFuture signal, - StoreCopyFinishedResponse response ) + StoreCopyFinishedResponse response ) throws IOException { log.info( "Finished streaming %s", destination ); signal.complete( response.lastCommittedTxBeforeStoreCopy() ); @@ -87,16 +90,16 @@ StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException { try { - return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), - new CatchUpResponseAdaptor() - { - @Override - public void onGetStoreIdResponse( CompletableFuture signal, - GetStoreIdResponse response ) - { - signal.complete( response.storeId() ); - } - } ); + CatchUpResponseAdaptor responseHandler = new CatchUpResponseAdaptor() + { + @Override + public void onGetStoreIdResponse( CompletableFuture signal, + GetStoreIdResponse response ) + { + signal.complete( response.storeId() ); + } + }; + return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), responseHandler ); } catch ( CatchUpClientException e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java index e2cd604c9e6d..171273e7b6c4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFileStreams.java @@ -20,9 +20,9 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.io.IOException; -import java.io.OutputStream; public interface StoreFileStreams { - OutputStream createStream( String destination ) throws IOException; + void write( String destination, int requiredAlignment, byte[] data ) throws IOException; + void finish( String destination ) throws IOException; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java index 2b4f0d4ea4fd..f870e92dfbef 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFiles.java @@ -22,23 +22,28 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.FileHandle; +import org.neo4j.io.pagecache.PageCache; public class StoreFiles { - private static final FilenameFilter STORE_FILE_FILTER = ( dir, name ) -> - { + private static final FilenameFilter STORE_FILE_FILTER = ( dir, name ) -> { // Skip log files and tx files from temporary database return !name.startsWith( "metrics" ) && !name.startsWith( "temp-copy" ) && - !name.startsWith( "raft-messages." ) && !name.startsWith( "debug." ) && - !name.startsWith( "data" ) && !name.startsWith( "store_lock" ); + !name.startsWith( "raft-messages." ) && !name.startsWith( "debug." ) && + !name.startsWith( "data" ) && !name.startsWith( "store_lock" ); }; private FileSystemAbstraction fs; + private PageCache pageCache; - public StoreFiles( FileSystemAbstraction fs ) + public StoreFiles( FileSystemAbstraction fs, PageCache pageCache ) { this.fs = fs; + this.pageCache = pageCache; } public void delete( File storeDir ) throws IOException @@ -47,7 +52,23 @@ public void delete( File storeDir ) throws IOException { fs.deleteRecursively( file ); } + Iterable fileHandles = pageCache.streamFilesRecursive( storeDir )::iterator; + for ( FileHandle fh : fileHandles ) + { + Path storePath = storeDir.toPath(); + Path filePath = fh.getFile().toPath(); + Path relative = storePath.relativize( filePath ); + if ( STORE_FILE_FILTER.accept( storeDir, getRootFileName( relative ) ) ) + { + fh.delete(); + } + } + } + private String getRootFileName( Path path ) + { + Path root = path.getRoot(); + return (root == null) ? path.toString() : root.toString(); } void moveTo( File source, File target ) throws IOException @@ -56,5 +77,14 @@ void moveTo( File source, File target ) throws IOException { fs.moveToDirectory( candidate, target ); } + + Iterable fileHandles = pageCache.streamFilesRecursive( source )::iterator; + for ( FileHandle fh : fileHandles ) + { + if ( STORE_FILE_FILTER.accept( source, fh.getFile().getName() ) ) + { + fh.rename( new File( target, fh.getFile().getName() ), StandardCopyOption.REPLACE_EXISTING ); + } + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java index 67ff5f9384fa..5352e9eb6580 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java @@ -22,33 +22,81 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.io.pagecache.PagedFile; import org.neo4j.kernel.monitoring.Monitors; class StreamToDisk implements StoreFileStreams { private final File storeDir; private final FileSystemAbstraction fs; + private final PageCache pageCache; private final FileCopyMonitor fileCopyMonitor; + private final Map channels; + private final Map pagedFiles; - StreamToDisk( File storeDir, FileSystemAbstraction fs, Monitors monitors ) throws IOException + StreamToDisk( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Monitors monitors ) throws IOException { this.storeDir = storeDir; this.fs = fs; + this.pageCache = pageCache; fs.mkdirs( storeDir ); this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class ); + channels = new HashMap(); + pagedFiles = new HashMap(); + } @Override - public OutputStream createStream( String destination ) throws IOException + public void write( String destination, int requiredAlignment, byte[] data ) throws IOException { File fileName = new File( storeDir, destination ); fs.mkdirs( fileName.getParentFile() ); fileCopyMonitor.copyFile( fileName ); + if ( destination.endsWith( ".id" ) ) + { + try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) ) + { + outputStream.write( data ); + } + } + else + { + WritableByteChannel channel = channels.get( destination ); + if ( channel == null ) + { + int filePageSize = pageCache.pageSize() - pageCache.pageSize() % requiredAlignment; + PagedFile pagedFile = pageCache.map( fileName, filePageSize, StandardOpenOption.CREATE ); + channel = pagedFile.openWritableByteChannel(); + pagedFiles.put( destination, pagedFile ); + channels.put( destination, channel ); + } + + ByteBuffer buffer = ByteBuffer.wrap( data ); + while ( buffer.hasRemaining() ) + { + channel.write( buffer ); + } + } + } - return fs.openAsOutputStream( fileName, true ); + @Override + public void finish( String destination ) throws IOException + { + PagedFile pagedFile = pagedFiles.get( destination ); + if ( pagedFile != null ) + { + channels.get( destination ).close(); + pagedFile.close(); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java index 1beeb7e6a478..948f7576c8e9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CoreGraphDatabase.java @@ -26,20 +26,15 @@ import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.HazelcastDiscoveryServiceFactory; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.EditionModule; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.util.CustomIOConfigValidator; public class CoreGraphDatabase extends GraphDatabaseFacade { - public static final String CUSTOM_IO_EXCEPTION_MESSAGE = - "Core cluster mode is not allowed with custom IO integrations"; - public CoreGraphDatabase( File storeDir, Config config, GraphDatabaseFacadeFactory.Dependencies dependencies ) { @@ -49,8 +44,6 @@ public CoreGraphDatabase( File storeDir, Config config, public CoreGraphDatabase( File storeDir, Config config, GraphDatabaseFacadeFactory.Dependencies dependencies, DiscoveryServiceFactory discoveryServiceFactory ) { - CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, - CUSTOM_IO_EXCEPTION_MESSAGE ); Function factory = ( platformModule ) -> new EnterpriseCoreEditionModule( platformModule, discoveryServiceFactory ); new GraphDatabaseFacadeFactory( DatabaseInfo.CORE, factory ).initFacade( storeDir, config, dependencies, this ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 81e32ab471ab..dd285af8ffe1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -144,8 +144,12 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke logProvider = logging.getInternalLogProvider(); final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); - LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( fileSystem ), - platformModule.dataSourceManager, platformModule.pageCache, fileSystem, databaseHealthSupplier, + LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, + new StoreFiles( fileSystem, platformModule.pageCache ), + platformModule.dataSourceManager, + platformModule.pageCache, + fileSystem, + databaseHealthSupplier, logProvider ); IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 3e3e94f923a4..b5d968b3342b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -192,7 +192,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, coreState, config, - platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem ); + platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem, + platformModule.pageCache ); servicesToStopOnStoreCopy.add( catchupServer ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 28b835f8c3fa..673dba3d89b4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -198,7 +198,8 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke DelayedRenewableTimeoutService catchupTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); - LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( fileSystem ), + StoreFiles storeFiles = new StoreFiles( fileSystem, pageCache ); + LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, storeFiles, platformModule.dataSourceManager, pageCache, fileSystem, databaseHealthSupplier, logProvider ); RemoteStore remoteStore = diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java index 6d0daf03546c..1df88416d57b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaGraphDatabase.java @@ -31,13 +31,9 @@ import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory.Dependencies; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.util.CustomIOConfigValidator; public class ReadReplicaGraphDatabase extends GraphDatabaseFacade { - public static final String CUSTOM_IO_EXCEPTION_MESSAGE = - "Read replica mode is not allowed with custom IO integrations"; - public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies ) { this( storeDir, config, dependencies, new HazelcastDiscoveryServiceFactory() ); @@ -46,7 +42,6 @@ public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies depe public ReadReplicaGraphDatabase( File storeDir, Config config, Dependencies dependencies, DiscoveryServiceFactory discoveryServiceFactory ) { - CustomIOConfigValidator.assertCustomIOConfigNotUsed( config, CUSTOM_IO_EXCEPTION_MESSAGE ); Function factory = ( platformModule ) -> new EnterpriseReadReplicaEditionModule( platformModule, discoveryServiceFactory ); new GraphDatabaseFacadeFactory( DatabaseInfo.READ_REPLICA, factory ).initFacade( storeDir, config, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java deleted file mode 100644 index 5f0a692b54aa..000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/RejectCustomIOTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.Map; - -import org.neo4j.causalclustering.core.CoreGraphDatabase; -import org.neo4j.causalclustering.discovery.CoreClusterMember; -import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; -import org.neo4j.causalclustering.discovery.ReadReplica; -import org.neo4j.causalclustering.discovery.SharedDiscoveryService; -import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase; -import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.test.rule.TestDirectory; - -import static java.util.Collections.emptyMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.neo4j.causalclustering.discovery.Cluster.buildAddresses; -import static org.neo4j.helpers.collection.Iterators.set; -import static org.neo4j.helpers.collection.MapUtil.stringMap; -import static org.neo4j.kernel.impl.pagecache.PageSwapperFactoryForTesting.TEST_PAGESWAPPER_NAME; -import static org.neo4j.kernel.impl.store.format.RecordFormatSelector.defaultFormat; - -/** - * At the time of writing this test, certain operations required by Causal Clustering - * do not work with custom IO configurations. This test ensures - * that we fail gracefully with a helpful error message if the user - * tries to combine Causal Clustering with custom IO configurations. - * Specifically, the functionality that Causal Clustering needs that - * custom IO configurations does not support is store copying. - */ - -public class RejectCustomIOTest -{ - private DiscoveryServiceFactory discovery = new SharedDiscoveryService(); - - @Rule - public TestDirectory storeDir = TestDirectory.testDirectory(); - - @Test - public void shouldFailToStartWithCustomIOConfigurationInCoreModeTest() throws Exception - { - try - { - Map extraParams = - stringMap( GraphDatabaseSettings.pagecache_swapper.name(), TEST_PAGESWAPPER_NAME ); - CoreClusterMember clusterMember = new CoreClusterMember( 0, 3, buildAddresses( set( 0, 1, 2 ) ), discovery, - defaultFormat().toString(), storeDir.directory(), extraParams, emptyMap() ); - clusterMember.start(); - fail( "Should not have created database with custom IO configuration in Core Mode." ); - } - catch ( RuntimeException ex ) - { - assertEquals( CoreGraphDatabase.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getMessage() ); - } - } - - @Test - public void shouldFailToStartWithCustomIOConfigurationInReadReplicaModeTest() throws Exception - { - try - { - DiscoveryServiceFactory discovery = new SharedDiscoveryService(); - Map extraParams = - stringMap( GraphDatabaseSettings.pagecache_swapper.name(), TEST_PAGESWAPPER_NAME ); - ReadReplica clusterMember = - new ReadReplica( storeDir.directory(), 2, discovery, buildAddresses( set( 0, 1, 2 ) ), - extraParams, emptyMap(), defaultFormat().toString() ); - clusterMember.start(); - fail( "Should not have created database with custom IO configuration in Read Replica Mode." ); - } - catch ( RuntimeException ex ) - { - assertEquals( ReadReplicaGraphDatabase.CUSTOM_IO_EXCEPTION_MESSAGE, ex.getMessage() ); - } - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java index 79c64b5cec5e..5208fa47c957 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FileSenderTest.java @@ -48,7 +48,7 @@ public class FileSenderTest @Rule public EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); @Rule - public TestDirectory testDirectory = TestDirectory.testDirectory( fsRule.get()); + public TestDirectory testDirectory = TestDirectory.testDirectory( fsRule.get() ); private final FileSystemAbstraction fs = fsRule.get(); private final Random random = new Random(); @@ -60,7 +60,7 @@ public void sendEmptyFile() throws Exception // given File emptyFile = testDirectory.file( "emptyFile" ); fs.create( emptyFile ).close(); - FileSender fileSender = new FileSender(fs.open( emptyFile, "r" )); + FileSender fileSender = new FileSender( fs.open( emptyFile, "r" ) ); // when + then assertFalse( fileSender.isEndOfInput() ); @@ -77,7 +77,7 @@ public void sendSmallFile() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } @@ -100,17 +100,18 @@ public void sendLargeFile() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } - FileSender fileSender = new FileSender(fs.open( smallFile, "r" )); + FileSender fileSender = new FileSender( fs.open( smallFile, "r" ) ); // when + then assertFalse( fileSender.isEndOfInput() ); assertEquals( FileChunk.create( copyOfRange( bytes, 0, MAX_SIZE ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, bytes.length ), true ), fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, bytes.length ), true ), + fileSender.readChunk( allocator ) ); assertNull( fileSender.readChunk( allocator ) ); assertTrue( fileSender.isEndOfInput() ); } @@ -123,7 +124,7 @@ public void sendLargeFileWithSizeMultipleOfTheChunkSize() throws Exception random.nextBytes( bytes ); File smallFile = testDirectory.file( "smallFile" ); - try( StoreChannel storeChannel = fs.create( smallFile ) ) + try ( StoreChannel storeChannel = fs.create( smallFile ) ) { storeChannel.write( ByteBuffer.wrap( bytes ) ); } @@ -133,8 +134,10 @@ public void sendLargeFileWithSizeMultipleOfTheChunkSize() throws Exception // when + then assertFalse( fileSender.isEndOfInput() ); assertEquals( FileChunk.create( copyOfRange( bytes, 0, MAX_SIZE ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, MAX_SIZE * 2 ), false ), fileSender.readChunk( allocator ) ); - assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE * 2, bytes.length ), true ), fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE, MAX_SIZE * 2 ), false ), + fileSender.readChunk( allocator ) ); + assertEquals( FileChunk.create( copyOfRange( bytes, MAX_SIZE * 2, bytes.length ), true ), + fileSender.readChunk( allocator ) ); assertNull( fileSender.readChunk( allocator ) ); assertTrue( fileSender.isEndOfInput() ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java index 3f60308e0c50..5b258bfcbc6e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java @@ -38,6 +38,7 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.BoltConnector; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.HttpConnector; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; @@ -85,7 +86,7 @@ public void catchupServerMessage() throws Throwable CatchupServer catchupServer = new CatchupServer( logProvider, userLogProvider, mockSupplier(), mockSupplier(), mockSupplier(), mockSupplier(), mock( BooleanSupplier.class ), coreState, config, new Monitors(), - mockSupplier(), mock( FileSystemAbstraction.class ) ); + mockSupplier(), mock( FileSystemAbstraction.class ), mock( PageCache.class ) ); //then try diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java index 126a5a19d6a2..7ffd53884da1 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java @@ -49,11 +49,15 @@ import static org.neo4j.causalclustering.scenarios.SampleData.createData; import static org.neo4j.helpers.collection.MapUtil.stringMap; +/** + * Note that this test is extended in the blockdevice repository. + */ public class CoreToCoreCopySnapshotIT { + protected static final int NR_CORE_MEMBERS = 3; @Rule public final ClusterRule clusterRule = new ClusterRule( getClass() ) - .withNumberOfCoreMembers( 3 ) + .withNumberOfCoreMembers( NR_CORE_MEMBERS ) .withNumberOfReadReplicas( 0 ); @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java index 5a6d5f3b266c..f7f1c5f85c11 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java @@ -25,7 +25,6 @@ import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -34,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; +import java.util.function.Function; import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor; import org.neo4j.causalclustering.core.CausalClusteringSettings; @@ -55,7 +55,6 @@ import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.FileUtils; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.impl.muninn.StandalonePageCacheFactory; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; @@ -75,7 +74,6 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.startsWith; @@ -95,13 +93,20 @@ import static org.neo4j.kernel.impl.store.MetaDataStore.Position.TIME; import static org.neo4j.test.assertion.Assert.assertEventually; +/** + * Note that this test is extended in the blockdevice repository. + */ public class ReadReplicaReplicationIT { + // This test is extended in the blockdevice repository, and these constants are required there as well. + public static final int NR_CORE_MEMBERS = 3; + public static final int NR_READ_REPLICAS = 1; + @Rule - public final ClusterRule clusterRule = - new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 1 ) - .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) - .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); + public final ClusterRule clusterRule = new ClusterRule( getClass() ).withNumberOfCoreMembers( NR_CORE_MEMBERS ) + .withNumberOfReadReplicas( NR_READ_REPLICAS ) + .withSharedCoreParam( CausalClusteringSettings.cluster_topology_refresh, "5s" ) + .withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ); @Test public void shouldNotBeAbleToWriteToReadReplica() throws Exception @@ -217,13 +222,16 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI cluster.coreTx( createSomeData ); - CoreClusterMember follower = cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 2, TimeUnit.SECONDS ); - // Shutdown server before copying its data, because Windows can't copy open files. - follower.shutdown(); + cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 2, TimeUnit.SECONDS ); + // Get a read replica and make sure that it is operational ReadReplica readReplica = cluster.addReadReplicaWithId( 4 ); - putSomeDataWithDifferentStoreId( readReplica.storeDir(), follower.storeDir() ); - follower.start(); + readReplica.start(); + readReplica.database().beginTx().close(); + readReplica.shutdown(); + + // Change the store id, so it should fail to join the cluster again + changeStoreId( readReplica.storeDir() ); try { @@ -264,15 +272,12 @@ public void aReadReplicShouldBeAbleToRejoinTheCluster() throws Exception awaitEx( () -> readReplicasUpToDateAsTheLeader( cluster.awaitLeader(), cluster.readReplicas() ), 1, TimeUnit.MINUTES ); - List coreStoreDirs = - cluster.coreMembers().stream().map( CoreClusterMember::storeDir ).collect( toList() ); - List readReplicaStoreDirs = - cluster.readReplicas().stream().map( ReadReplica::storeDir ).collect( toList() ); + Function toRep = db -> DbRepresentation.of( db.database() ); + Set dbs = cluster.coreMembers().stream().map( toRep ).collect( toSet() ); + dbs.addAll( cluster.readReplicas().stream().map( toRep ).collect( toSet() ) ); cluster.shutdown(); - Set dbs = coreStoreDirs.stream().map( DbRepresentation::of ).collect( toSet() ); - dbs.addAll( readReplicaStoreDirs.stream().map( DbRepresentation::of ).collect( toSet() ) ); assertEquals( 1, dbs.size() ); } @@ -339,6 +344,7 @@ public void shouldBeAbleToDownloadANewStoreAfterPruning() throws Exception () -> DbRepresentation.of( readReplica.database() ), equalTo( DbRepresentation.of( cluster.awaitLeader().database() ) ), 10, TimeUnit.SECONDS ); } + @Test public void shouldBeAbleToPullTxAfterHavingDownloadedANewStoreAfterPruning() throws Exception { @@ -404,12 +410,6 @@ private boolean readReplicasUpToDateAsTheLeader( CoreClusterMember leader, .reduce( true, ( acc, txId ) -> acc && txId == leaderTxId, Boolean::logicalAnd ); } - private void putSomeDataWithDifferentStoreId( File storeDir, File coreStoreDir ) throws IOException - { - FileUtils.copyRecursively( coreStoreDir, storeDir ); - changeStoreId( storeDir ); - } - private void changeStoreId( File storeDir ) throws IOException { File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); @@ -505,7 +505,7 @@ public void shouldBeAbleToCopyStoresFromCoreToReadReplica() throws Exception SECONDS ); // when - cluster.addReadReplicaWithIdAndRecordFormat( 42, HighLimit.NAME ).start(); + cluster.addReadReplicaWithIdAndRecordFormat( 4, HighLimit.NAME ).start(); // then for ( final ReadReplica readReplica : cluster.readReplicas() )