Skip to content

Commit

Permalink
Make sure all store file streams are closed in the response adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW committed Mar 22, 2018
1 parent 7a2e62f commit 8d58d25
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
Expand Up @@ -69,7 +69,6 @@ long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expe
private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, CatchupAddressProvider addressProvider,
StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
{
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
for ( File file : prepareStoreCopyResponse.getFiles() )
{
Expand All @@ -82,7 +81,8 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes
AdvertisedSocketAddress from = addressProvider.primary();
log.info( format( "Downloading file '%s' from '%s'", file, from ) );
StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), copyHandler );
catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ),
StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) );
successful = successfulFileDownload( response );
}
catch ( CatchUpClientException | CatchupAddressResolutionException e )
Expand All @@ -103,7 +103,6 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions )
throws StoreCopyFailedException
{
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator();
while ( indexIds.hasNext() )
Expand All @@ -119,7 +118,7 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
log.info( format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) );
StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ),
copyHandler );
StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) );
successful = successfulFileDownload( response );
}
catch ( CatchUpClientException | CatchupAddressResolutionException e )
Expand Down
Expand Up @@ -20,10 +20,13 @@
package org.neo4j.causalclustering.catchup.storecopy;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.logging.Log;

import static java.lang.String.format;

public abstract class StoreCopyResponseAdaptors<T> extends CatchUpResponseAdaptor<T>
{
static StoreCopyResponseAdaptors<StoreCopyFinishedResponse> filesCopyAdaptor( StoreFileStreamProvider storeFileStreamProvider, Log log )
Expand All @@ -46,23 +49,21 @@ private StoreCopyResponseAdaptors( StoreFileStreamProvider storeFileStreamProvid
this.log = log;
}

/**
* Files will be sent in order but multiple files may be sent during one response.
*
* @param requestOutcomeSignal signal
* @param fileHeader header for most resent file being sent
*/
@Override
public void onFileHeader( CompletableFuture<T> requestOutcomeSignal, FileHeader fileHeader )
{
try
{
storeFileStream = storeFileStreamProvider.acquire( fileHeader.fileName(), fileHeader.requiredAlignment() );
requestOutcomeSignal.whenComplete( ( storeCopyFinishedResponse, throwable ) ->
{
try
{
storeFileStream.close();
}
catch ( Exception e )
{
log.error( "Unable to close store file stream", e );
}
} );
final StoreFileStream fileStream = storeFileStreamProvider.acquire( fileHeader.fileName(), fileHeader.requiredAlignment() );
// Make sure that each stream closes on complete but only the latest is written to
requestOutcomeSignal.whenComplete( new CloseFileStreamOnComplete<>( fileStream, fileHeader.fileName() ) );
this.storeFileStream = fileStream;
}
catch ( Exception e )
{
Expand Down Expand Up @@ -111,4 +112,29 @@ public void onFileStreamingComplete( CompletableFuture<StoreCopyFinishedResponse
signal.complete( response );
}
}

private class CloseFileStreamOnComplete<RESPONSE> implements BiConsumer<RESPONSE,Throwable>
{
private final StoreFileStream fileStream;
private String fileName;

private CloseFileStreamOnComplete( StoreFileStream fileStream, String fileName )
{
this.fileStream = fileStream;
this.fileName = fileName;
}

@Override
public void accept( RESPONSE response, Throwable throwable )
{
try
{
fileStream.close();
}
catch ( Exception e )
{
log.error( format( "Unable to close store file stream for file '%s'", fileName ), e );
}
}
}
}

0 comments on commit 8d58d25

Please sign in to comment.