Skip to content

Commit

Permalink
Release the storecopy mutex after the store copy
Browse files Browse the repository at this point in the history
The store copy request handler just queues up requests
on the netty pipeline and the release of the mutex must
be scheduled as a response to the completion of the actual
handling. This is done by registering on the future supplied
by netty.

Also did a general clean up of the code here with a better flow,
cleaner paragraphs, refactoring of methods and a few comments.
  • Loading branch information
martinfurmanski committed Dec 11, 2017
1 parent a5b6603 commit 4fd8c3c
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 157 deletions.
Expand Up @@ -19,7 +19,11 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.Iterator;

import org.neo4j.cursor.Cursor;
import org.neo4j.cursor.CursorValue;
import org.neo4j.cursor.RawCursor;

public class Cursors
{
Expand Down Expand Up @@ -65,4 +69,78 @@ public static int count( Cursor<?> cursor )
cursor.close();
}
}

public static <T, EX extends Exception> RawCursor<T,EX> rawCursorOf( T... values )
{
return new RawCursor<T,EX>()
{
private int idx = 0;
private CursorValue<T> current = new CursorValue<>();

@Override
public T get()
{
return current.get();
}

@Override
public boolean next() throws EX
{
if ( idx >= values.length )
{
current.invalidate();
return false;
}

current.set( values[idx] );
idx++;

return true;
}

@Override
public void close() throws EX
{
idx = values.length;
current.invalidate();
}
};
}

public static <T, EX extends Exception> RawCursor<T,EX> rawCursorOf( Iterable<T> iterable )
{
return new RawCursor<T,EX>()
{
private int idx = 0;
private CursorValue<T> current = new CursorValue<>();
private Iterator<T> itr = iterable.iterator();

@Override
public T get()
{
return current.get();
}

@Override
public boolean next() throws EX
{
if ( itr.hasNext() )
{
current.set( itr.next() );
return true;
}
else
{
current.invalidate();
return false;
}
}

@Override
public void close() throws EX
{
current.invalidate();
}
};
}
}
Expand Up @@ -43,6 +43,9 @@
import org.neo4j.causalclustering.catchup.CatchupServerProtocol.State;
import org.neo4j.causalclustering.catchup.storecopy.FileChunkEncoder;
import org.neo4j.causalclustering.catchup.storecopy.FileHeaderEncoder;
import org.neo4j.causalclustering.catchup.storecopy.StoreResourceStreamFactory;
import org.neo4j.causalclustering.catchup.storecopy.StoreStreamingProcess;
import org.neo4j.causalclustering.catchup.storecopy.StoreStreamingProtocol;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequestHandler;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdResponseEncoder;
Expand Down Expand Up @@ -181,7 +184,8 @@ protected void initChannel( SocketChannel ch ) throws Exception
pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier,
transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors, logProvider ) );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier, fs, pageCache, logProvider, storeCopyCheckPointMutex ) );
new StoreStreamingProcess( new StoreStreamingProtocol(), checkPointerSupplier, storeCopyCheckPointMutex,
new StoreResourceStreamFactory( pageCache, fs, dataSourceSupplier ) ) ) );

pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );

Expand Down
Expand Up @@ -48,12 +48,6 @@ public int requiredAlignment()
return requiredAlignment;
}

@Override
public String toString()
{
return String.format( "FileHeader{fileName='%s'}", fileName );
}

@Override
public boolean equals( Object o )
{
Expand All @@ -66,12 +60,18 @@ public boolean equals( Object o )
return false;
}
FileHeader that = (FileHeader) o;
return Objects.equals( fileName, that.fileName );
return requiredAlignment == that.requiredAlignment && Objects.equals( fileName, that.fileName );
}

@Override
public int hashCode()
{
return Objects.hash( fileName );
return Objects.hash( fileName, requiredAlignment );
}

@Override
public String toString()
{
return "FileHeader{" + "fileName='" + fileName + '\'' + ", requiredAlignment=" + requiredAlignment + '}';
}
}
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;

import static org.neo4j.causalclustering.catchup.storecopy.FileChunk.MAX_SIZE;
import static org.neo4j.causalclustering.catchup.storecopy.FileSender.State.FINISHED;
Expand All @@ -35,15 +36,16 @@

class FileSender implements ChunkedInput<FileChunk>
{
private final ReadableByteChannel channel;
private final StoreResource resource;
private final ByteBuffer byteBuffer;

private ReadableByteChannel channel;
private byte[] nextBytes;
private State state = PRE_INIT;

FileSender( ReadableByteChannel channel ) throws IOException
FileSender( StoreResource resource ) throws IOException
{
this.channel = channel;
this.resource = resource;
this.byteBuffer = ByteBuffer.allocateDirect( MAX_SIZE );
}

Expand All @@ -56,7 +58,7 @@ public boolean isEndOfInput() throws Exception
@Override
public void close() throws Exception
{
channel.close();
resource.close();
}

@Override
Expand All @@ -68,6 +70,7 @@ public FileChunk readChunk( ByteBufAllocator allocator ) throws Exception
}
else if ( state == PRE_INIT )
{
channel = resource.open();
nextBytes = prefetch();
if ( nextBytes == null )
{
Expand Down Expand Up @@ -128,6 +131,27 @@ public long progress()
return 0;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
FileSender that = (FileSender) o;
return Objects.equals( resource, that.resource );
}

@Override
public int hashCode()
{
return Objects.hash( resource );
}

private byte[] prefetch() throws IOException
{
do
Expand Down
Expand Up @@ -19,103 +19,42 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.File;
import java.util.Optional;
import java.util.function.Supplier;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.function.Supplier;

import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status;
import org.neo4j.graphdb.Resource;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StoreFileMetadata;

import static org.neo4j.causalclustering.catchup.CatchupServerProtocol.State;
import static org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status.SUCCESS;
import static org.neo4j.io.fs.FileUtils.relativePath;
import static org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;

public class GetStoreRequestHandler extends SimpleChannelInboundHandler<GetStoreRequest>
{
private final CatchupServerProtocol protocol;
private final Supplier<NeoStoreDataSource> dataSource;
private final Supplier<CheckPointer> checkPointerSupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final Log log;
private final StoreCopyCheckPointMutex mutex;

public GetStoreRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource,
Supplier<CheckPointer> checkPointerSupplier, FileSystemAbstraction fs, PageCache pageCache,
LogProvider logProvider, StoreCopyCheckPointMutex mutex )
private final StoreStreamingProcess storeStreamingProcess;

public GetStoreRequestHandler( CatchupServerProtocol protocol, Supplier<NeoStoreDataSource> dataSource, StoreStreamingProcess storeStreamingProcess )
{
this.protocol = protocol;
this.dataSource = dataSource;
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
this.mutex = mutex;
this.log = logProvider.getLog( getClass() );
this.storeStreamingProcess = storeStreamingProcess;
}

@Override
protected void channelRead0( ChannelHandlerContext ctx, GetStoreRequest msg ) throws Exception
{
if ( !msg.expectedStoreId().equalToKernelStoreId( dataSource.get().getStoreId() ) )
{
endStoreCopy( SUCCESS, ctx, -1 );
storeStreamingProcess.fail( ctx, E_STORE_ID_MISMATCH );
}
else
{
CheckPointer checkPointer = checkPointerSupplier.get();
long lastCheckPointedTx;
try ( Resource lock = mutex.storeCopy(
() -> checkPointer.tryCheckPoint( new SimpleTriggerInfo( "Store copy" ) ) );
ResourceIterator<StoreFileMetadata> files = dataSource.get().listStoreFiles( false ) )
{
lastCheckPointedTx = checkPointer.lastCheckPointedTransactionId();
while ( files.hasNext() )
{
StoreFileMetadata fileMetadata = files.next();
File file = fileMetadata.file();
log.debug( "Sending file " + file );
ctx.writeAndFlush( ResponseMessageType.FILE );
ctx.writeAndFlush( new FileHeader( relativePath( dataSource.get().getStoreDir(), file ),
fileMetadata.recordSize() ) );
Optional<PagedFile> existingMapping = pageCache.getExistingMapping( file );
if ( existingMapping.isPresent() )
{
try ( PagedFile pagedFile = existingMapping.get() )
{
ctx.writeAndFlush( new FileSender(
pagedFile.openReadableByteChannel() ) );
}
}
else
{
ctx.writeAndFlush( new FileSender( fs.open( file, "r" ) ) );
}
}
}
endStoreCopy( SUCCESS, ctx, lastCheckPointedTx );
storeStreamingProcess.perform( ctx );
}
protocol.expect( State.MESSAGE_TYPE );
}

private void endStoreCopy( Status status, ChannelHandlerContext ctx, long lastCommittedTxBeforeStoreCopy )
{
ctx.write( ResponseMessageType.STORE_COPY_FINISHED );
ctx.writeAndFlush( new StoreCopyFinishedResponse( status, lastCommittedTxBeforeStoreCopy ) );
}
}

This file was deleted.

This file was deleted.

0 comments on commit 4fd8c3c

Please sign in to comment.