Skip to content

Commit

Permalink
First draft of block device support for Causal Cluster
Browse files Browse the repository at this point in the history
* For doing store copy, store files are now read and written through the page
  cache.
* File operations - copy, move, delete - now go through the page cache for
  store files.
* Tests have been changed to not rely on the file system to e.g. copy store
  directories around, and also to allow the tests to be configured with block
  device storage in the blockdevice repository.
* The test changes primarily involve CoreToCoreCopySnapshotIT and
  ReadReplicaReplicationIT.
* We now send an alignment requirement over with the `FileHeader` message.
  _This is a protocol change!_ This is only a preliminary approach, and an
  invitation to discuss the design and what the best way is to solve this
  problem.
* The checks that mutually excludes Causal Clustering and Custom IO (block
  device) have been removed.

A few things are still left to do:

* Make sure TemporaryStoreDirectory cleans up after itself not only via the
  file system, but also via the page cache.
* Closer looks at LocalDatabase and catch-up code to make sure there are no
  latent bugs in there.
* Possibly extract some duplication that has been discovered between HA and CC?
  • Loading branch information
ragadeeshu authored and chrisvest committed Jan 30, 2017
1 parent 0dbf244 commit d25df80
Show file tree
Hide file tree
Showing 26 changed files with 231 additions and 199 deletions.
Expand Up @@ -47,6 +47,7 @@ public boolean onFileContent( CompletableFuture<T> signal, FileChunk response )

@Override
public void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response )
throws IOException
{
signal.completeExceptionally( new CatchUpProtocolViolationException( "Unexpected response: %s", response ) );
}
Expand Down
Expand Up @@ -36,7 +36,7 @@ public interface CatchUpResponseCallback<T>

boolean onFileContent( CompletableFuture<T> signal, FileChunk fileChunk ) throws IOException;

void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response );
void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response ) throws IOException;

void onTxPullResponse( CompletableFuture<T> signal, TxPullResponse tx );

Expand Down
Expand Up @@ -39,7 +39,7 @@ public interface CatchUpResponseHandler
*/
boolean onFileContent( FileChunk fileChunk ) throws IOException;

void onFileStreamingComplete( StoreCopyFinishedResponse response );
void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException;

void onTxPullResponse( TxPullResponse tx );

Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
Expand All @@ -87,6 +88,7 @@ public class CatchupServer extends LifecycleAdapter
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
private final CoreState coreState;
Expand All @@ -102,7 +104,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreState coreState, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier,
FileSystemAbstraction fs )
FileSystemAbstraction fs, PageCache pageCache )
{
this.coreState = coreState;
this.listenAddress = config.get( CausalClusteringSettings.transaction_listen_address );
Expand All @@ -118,6 +120,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
this.dataSourceSupplier = dataSourceSupplier;
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
}

@Override
Expand Down Expand Up @@ -169,7 +172,7 @@ protected void initChannel( SocketChannel ch )
monitors, logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier, fs, logProvider ) );
checkPointerSupplier, fs, pageCache, logProvider ) );
pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );
pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) );

Expand Down
Expand Up @@ -76,7 +76,7 @@ public boolean onFileContent( FileChunk fileChunk ) throws IOException
}

@Override
public void onFileStreamingComplete( StoreCopyFinishedResponse response )
public void onFileStreamingComplete( StoreCopyFinishedResponse response ) throws IOException
{
if ( !requestOutcomeSignal.isCancelled() )
{
Expand Down
Expand Up @@ -24,17 +24,30 @@
public class FileHeader
{
private final String fileName;
private final int requiredAlignment;

public FileHeader( String fileName )
{
// A required alignment of 1 basically means that any alignment will do.
this( fileName, 1 );
}

public FileHeader( String fileName, int requiredAlignment )
{
this.fileName = fileName;
this.requiredAlignment = requiredAlignment;
}

public String fileName()
{
return fileName;
}

public int requiredAlignment()
{
return requiredAlignment;
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -36,6 +36,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out
byte[] bytes = new byte[length];
msg.readBytes( bytes );
String name = UTF8.decode( bytes );
out.add( new FileHeader( name ) );
int requiredAlignment = msg.readInt();
out.add( new FileHeader( name, requiredAlignment ) );
}
}
Expand Up @@ -34,5 +34,6 @@ protected void encode( ChannelHandlerContext ctx, FileHeader msg, ByteBuf out )
byte[] bytes = UTF8.encode( name );
out.writeInt( bytes.length );
out.writeBytes( bytes );
out.writeInt( msg.requiredAlignment() );
}
}
Expand Up @@ -19,24 +19,23 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

import org.neo4j.io.fs.StoreChannel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;

class FileSender implements ChunkedInput<FileChunk>
{
private final StoreChannel channel;
private final ReadableByteChannel channel;
private final ByteBuffer byteBuffer;
private boolean endOfInput = false;
private boolean sentChunk = false;
private byte[] preFetchedBytes;

public FileSender( StoreChannel channel ) throws IOException
public FileSender( ReadableByteChannel channel ) throws IOException
{
this.channel = channel;
byteBuffer = ByteBuffer.allocateDirect( FileChunk.MAX_SIZE );
Expand Down Expand Up @@ -67,7 +66,16 @@ public FileChunk readChunk( ByteBufAllocator allocator ) throws Exception
sentChunk = true;
}

byte[] next = prefetch();
byte[] next;
if ( !endOfInput )
{
next = prefetch();
}
else
{
next = null;
}

FileChunk fileChunk = FileChunk.create( preFetchedBytes == null ? new byte[0] : preFetchedBytes, next == null );

preFetchedBytes = next;
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.File;
import java.util.Optional;
import java.util.function.Supplier;

import io.netty.channel.ChannelHandlerContext;
Expand All @@ -30,6 +31,8 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
Expand All @@ -47,16 +50,18 @@ public class GetStoreRequestHandler extends SimpleChannelInboundHandler<GetStore
private final Supplier<NeoStoreDataSource> dataSource;
private final Supplier<CheckPointer> checkPointerSupplier;
private final FileSystemAbstraction fs;
private PageCache pageCache;
private final Log log;

public GetStoreRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource,
Supplier<CheckPointer> checkPointerSupplier, FileSystemAbstraction fs,
LogProvider logProvider )
Supplier<CheckPointer> checkPointerSupplier, FileSystemAbstraction fs, PageCache pageCache,
LogProvider logProvider )
{
this.protocol = protocol;
this.dataSource = dataSource;
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -74,12 +79,25 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) th
{
while ( files.hasNext() )
{
File file = files.next().file();
StoreFileMetadata fileMetadata = files.next();
File file = fileMetadata.file();
log.debug( "Sending file " + file );

ctx.writeAndFlush( ResponseMessageType.FILE );
ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ) ) );
ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) );
ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ),
fileMetadata.recordSize() ) );
Optional<PagedFile> existingMapping = pageCache.getExistingMapping( file );
if ( existingMapping.isPresent() )
{
try ( PagedFile pagedFile = existingMapping.get() )
{
ctx.writeAndFlush( new FileSender(
pagedFile.openReadableByteChannel() ) );
}
}
else
{
ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) );
}
}
}
endStoreCopy( SUCCESS, ctx, lastCheckPointedTx );
Expand Down
Expand Up @@ -21,10 +21,12 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.StandardCopyOption;
import java.util.function.Supplier;

import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.FileHandle;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
Expand Down Expand Up @@ -57,8 +59,7 @@ public class LocalDatabase implements Lifecycle

private volatile TransactionCommitProcess localCommit;

public LocalDatabase( File storeDir, StoreFiles storeFiles,
DataSourceManager dataSourceManager,
public LocalDatabase( File storeDir, StoreFiles storeFiles, DataSourceManager dataSourceManager,
PageCache pageCache, FileSystemAbstraction fileSystemAbstraction,
Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider )
{
Expand Down Expand Up @@ -172,7 +173,7 @@ private boolean hasStoreFiles()
for ( StoreType storeType : StoreType.values() )
{
StoreFile storeFile = storeType.getStoreFile();
if(storeFile != null)
if ( storeFile != null )
{
boolean exists = fileSystemAbstraction.fileExists( new File( storeDir, storeFile.storeFileName() ) );
if ( exists )
Expand Down
Expand Up @@ -142,7 +142,8 @@ public void copy( MemberId from, StoreId expectedStoreId, File destDir )
try
{
log.info( "Copying store from %s", from );
long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs, monitors ) );
long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs,
pageCache, monitors ) );

log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId );

Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.catchup.CatchUpClient;
Expand All @@ -42,35 +41,39 @@ public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider )
log = logProvider.getLog( getClass() );
}

long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams ) throws StoreCopyFailedException
long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams storeFileStreams )
throws StoreCopyFailedException
{
try
{
return catchUpClient.makeBlockingRequest( from, new GetStoreRequest( expectedStoreId ),
new CatchUpResponseAdaptor<Long>()
{
private String destination;
private int requiredAlignment;

@Override
public void onFileHeader( CompletableFuture<Long> requestOutcomeSignal, FileHeader fileHeader )
{
this.destination = fileHeader.fileName();
this.requiredAlignment = fileHeader.requiredAlignment();
}

@Override
public boolean onFileContent( CompletableFuture<Long> signal, FileChunk fileChunk )
throws IOException
{
try ( OutputStream outputStream = storeFileStreams.createStream( destination ) )
storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() );
if ( fileChunk.isLast() )
{
outputStream.write( fileChunk.bytes() );
storeFileStreams.finish( destination );
}
return fileChunk.isLast();
}

@Override
public void onFileStreamingComplete( CompletableFuture<Long> signal,
StoreCopyFinishedResponse response )
StoreCopyFinishedResponse response ) throws IOException
{
log.info( "Finished streaming %s", destination );
signal.complete( response.lastCommittedTxBeforeStoreCopy() );
Expand All @@ -87,16 +90,16 @@ StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException
{
try
{
return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(),
new CatchUpResponseAdaptor<StoreId>()
{
@Override
public void onGetStoreIdResponse( CompletableFuture<StoreId> signal,
GetStoreIdResponse response )
{
signal.complete( response.storeId() );
}
} );
CatchUpResponseAdaptor<StoreId> responseHandler = new CatchUpResponseAdaptor<StoreId>()
{
@Override
public void onGetStoreIdResponse( CompletableFuture<StoreId> signal,
GetStoreIdResponse response )
{
signal.complete( response.storeId() );
}
};
return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), responseHandler );
}
catch ( CatchUpClientException e )
{
Expand Down
Expand Up @@ -20,9 +20,9 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.IOException;
import java.io.OutputStream;

public interface StoreFileStreams
{
OutputStream createStream( String destination ) throws IOException;
void write( String destination, int requiredAlignment, byte[] data ) throws IOException;
void finish( String destination ) throws IOException;
}

0 comments on commit d25df80

Please sign in to comment.