Skip to content

Commit

Permalink
Merge pull request #11348 from RagnarW/3.4-write-to-beginning-store-copy
Browse files Browse the repository at this point in the history
Redesign store file stream
  • Loading branch information
RagnarW committed Mar 22, 2018
2 parents 6b8d980 + 8d58d25 commit 9fde820
Show file tree
Hide file tree
Showing 19 changed files with 505 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpReque
channel.setResponseHandler( responseHandler, future );
channel.send( request );

String operation = format( "Timed out executing operation %s on %s ",
request, upstream );
String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream );

return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void onFileHeader( CompletableFuture<T> signal, FileHeader response )
}

@Override
public boolean onFileContent( CompletableFuture<T> signal, FileChunk response ) throws IOException
public boolean onFileContent( CompletableFuture<T> signal, FileChunk response )
{
unimplementedMethod( signal, response );
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface CatchUpResponseCallback<T>
{
void onFileHeader( CompletableFuture<T> signal, FileHeader fileHeader );

boolean onFileContent( CompletableFuture<T> signal, FileChunk fileChunk ) throws IOException;
boolean onFileContent( CompletableFuture<T> signal, FileChunk fileChunk );

void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response );

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI
try
{
long lastFlushedTxId;
try ( StreamToDisk storeFileStreams = new StreamToDisk( destDir, fs, pageCache, monitors ) )
{
lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, storeFileStreams, DEFAULT_TERMINATION_CONDITIONS );
}
StreamToDiskProvider streamToDiskProvider = new StreamToDiskProvider( destDir, fs, pageCache, monitors );
lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, streamToDiskProvider, DEFAULT_TERMINATION_CONDITIONS );

log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand All @@ -35,28 +34,29 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

public class StoreCopyClient
{
private final CatchUpClient catchUpClient;
private final Log log;
private final LogProvider logProvider;

public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider )
{
this.catchUpClient = catchUpClient;
log = logProvider.getLog( getClass() );
this.logProvider = logProvider;
}

long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expectedStoreId, StoreFileStreams storeFileStreams,
long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expectedStoreId, StoreFileStreamProvider storeFileStreamProvider,
Supplier<TerminationCondition> requestWiseTerminationCondition )
throws StoreCopyFailedException
{
try
{
PrepareStoreCopyResponse prepareStoreCopyResponse = listFiles( catchupAddressProvider.primary(), expectedStoreId, storeFileStreams );
copyFilesIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreams, requestWiseTerminationCondition );
copyIndexSnapshotIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreams,
PrepareStoreCopyResponse prepareStoreCopyResponse = prepareStoreCopy( catchupAddressProvider.primary(), expectedStoreId, storeFileStreamProvider );
copyFilesIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreamProvider,
requestWiseTerminationCondition );
copyIndexSnapshotIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreamProvider,
requestWiseTerminationCondition );
return prepareStoreCopyResponse.lastTransactionId();
}
Expand All @@ -67,9 +67,8 @@ long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expe
}

private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, CatchupAddressProvider addressProvider,
StoreFileStreams storeFileStreams, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
{
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = new StoreFileCopyResponseAdaptor( storeFileStreams, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
for ( File file : prepareStoreCopyResponse.getFiles() )
{
Expand All @@ -80,9 +79,10 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes
try
{
AdvertisedSocketAddress from = addressProvider.primary();
log.info( String.format( "Downloading file '%s' from '%s'", file, from ) );
log.info( format( "Downloading file '%s' from '%s'", file, from ) );
StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), copyHandler );
catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ),
StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) );
successful = successfulFileDownload( response );
}
catch ( CatchUpClientException | CatchupAddressResolutionException e )
Expand All @@ -100,10 +100,9 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes
}

private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId,
CatchupAddressProvider addressProvider,
StoreFileStreams storeFileStreams, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions )
throws StoreCopyFailedException
{
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = new StoreFileCopyResponseAdaptor( storeFileStreams, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator();
while ( indexIds.hasNext() )
Expand All @@ -116,10 +115,10 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
try
{
AdvertisedSocketAddress from = addressProvider.primary();
log.info( String.format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) );
log.info( format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) );
StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ),
copyHandler );
StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) );
successful = successfulFileDownload( response );
}
catch ( CatchUpClientException | CatchupAddressResolutionException e )
Expand All @@ -136,12 +135,12 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
}
}

private PrepareStoreCopyResponse listFiles( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreams storeFileStreams )
private PrepareStoreCopyResponse prepareStoreCopy( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreamProvider storeFileStream )
throws CatchUpClientException, StoreCopyFailedException
{
log.info( "Requesting store listing from: " + from );
PrepareStoreCopyResponse prepareStoreCopyResponse = catchUpClient.makeBlockingRequest( from, new PrepareStoreCopyRequest( expectedStoreId ),
new PrepareStoreCopyResponseAdaptor( storeFileStreams, logProvider ) );
StoreCopyResponseAdaptors.prepareStoreCopyAdaptor( storeFileStream, log ) );
if ( prepareStoreCopyResponse.status() != PrepareStoreCopyResponse.Status.SUCCESS )
{
throw new StoreCopyFailedException( "Preparing store failed due to: " + prepareStoreCopyResponse.status() );
Expand Down Expand Up @@ -173,56 +172,25 @@ private boolean successfulFileDownload( StoreCopyFinishedResponse response ) thr
{
StoreCopyFinishedResponse.Status responseStatus = response.status();
log.debug( "Request for individual file resulted in response type: %s", response.status() );
if ( responseStatus == StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND )
if ( responseStatus == StoreCopyFinishedResponse.Status.SUCCESS )
{
return true;
}
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND )
{
return false;
}
else if ( responseStatus == StoreCopyFinishedResponse.Status.SUCCESS )
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_UNKNOWN )
{
return true;
return false;
}
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH )
{
throw new StoreCopyFailedException( "Store id mismatch" );
return false;
}
else
{
throw new StoreCopyFailedException( "Unknown response type: " + responseStatus );
}
}

public static class StoreFileCopyResponseAdaptor extends CatchUpResponseAdaptor<StoreCopyFinishedResponse>
{
private final StoreFileStreams storeFileStreams;
private final Log log;
private String destination;
private int requiredAlignment;

StoreFileCopyResponseAdaptor( StoreFileStreams storeFileStreams, Log log )
{
this.storeFileStreams = storeFileStreams;
this.log = log;
}

@Override
public void onFileHeader( CompletableFuture<StoreCopyFinishedResponse> requestOutcomeSignal, FileHeader fileHeader )
{
this.destination = fileHeader.fileName();
this.requiredAlignment = fileHeader.requiredAlignment();
}

@Override
public boolean onFileContent( CompletableFuture<StoreCopyFinishedResponse> signal, FileChunk fileChunk ) throws IOException
{
storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() );
return fileChunk.isLast();
}

@Override
public void onFileStreamingComplete( CompletableFuture<StoreCopyFinishedResponse> signal, StoreCopyFinishedResponse response )
{
log.info( "Finished streaming" );
signal.complete( response );
}
}
}

0 comments on commit 9fde820

Please sign in to comment.