diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java index 72d883d3f0de6..e83cf93bcb9ec 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClient.java @@ -69,7 +69,6 @@ long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expe private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier terminationConditions ) throws StoreCopyFailedException { - CatchUpResponseAdaptor copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ); long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); for ( File file : prepareStoreCopyResponse.getFiles() ) { @@ -82,7 +81,8 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes AdvertisedSocketAddress from = addressProvider.primary(); log.info( format( "Downloading file '%s' from '%s'", file, from ) ); StoreCopyFinishedResponse response = - catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), copyHandler ); + catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), + StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) ); successful = successfulFileDownload( response ); } catch ( CatchUpClientException | CatchupAddressResolutionException e ) @@ -103,7 +103,6 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier terminationConditions ) throws StoreCopyFailedException { - CatchUpResponseAdaptor copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ); long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator(); while ( indexIds.hasNext() ) @@ -119,7 +118,7 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor log.info( format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) ); StoreCopyFinishedResponse response = catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ), - copyHandler ); + StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ) ); successful = successfulFileDownload( response ); } catch ( CatchUpClientException | CatchupAddressResolutionException e ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyResponseAdaptors.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyResponseAdaptors.java index 1da1aac517f48..f3037d8f77e0f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyResponseAdaptors.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyResponseAdaptors.java @@ -20,10 +20,13 @@ package org.neo4j.causalclustering.catchup.storecopy; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.logging.Log; +import static java.lang.String.format; + public abstract class StoreCopyResponseAdaptors extends CatchUpResponseAdaptor { static StoreCopyResponseAdaptors filesCopyAdaptor( StoreFileStreamProvider storeFileStreamProvider, Log log ) @@ -46,23 +49,21 @@ private StoreCopyResponseAdaptors( StoreFileStreamProvider storeFileStreamProvid this.log = log; } + /** + * Files will be sent in order but multiple files may be sent during one response. + * + * @param requestOutcomeSignal signal + * @param fileHeader header for most resent file being sent + */ @Override public void onFileHeader( CompletableFuture requestOutcomeSignal, FileHeader fileHeader ) { try { - storeFileStream = storeFileStreamProvider.acquire( fileHeader.fileName(), fileHeader.requiredAlignment() ); - requestOutcomeSignal.whenComplete( ( storeCopyFinishedResponse, throwable ) -> - { - try - { - storeFileStream.close(); - } - catch ( Exception e ) - { - log.error( "Unable to close store file stream", e ); - } - } ); + final StoreFileStream fileStream = storeFileStreamProvider.acquire( fileHeader.fileName(), fileHeader.requiredAlignment() ); + // Make sure that each stream closes on complete but only the latest is written to + requestOutcomeSignal.whenComplete( new CloseFileStreamOnComplete<>( fileStream, fileHeader.fileName() ) ); + this.storeFileStream = fileStream; } catch ( Exception e ) { @@ -111,4 +112,29 @@ public void onFileStreamingComplete( CompletableFuture implements BiConsumer + { + private final StoreFileStream fileStream; + private String fileName; + + private CloseFileStreamOnComplete( StoreFileStream fileStream, String fileName ) + { + this.fileStream = fileStream; + this.fileName = fileName; + } + + @Override + public void accept( RESPONSE response, Throwable throwable ) + { + try + { + fileStream.close(); + } + catch ( Exception e ) + { + log.error( format( "Unable to close store file stream for file '%s'", fileName ), e ); + } + } + } }