Skip to content

Commit

Permalink
Revert "Redesign StoreFileStreams"
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW committed Mar 22, 2018
1 parent a678ee2 commit a46d897
Show file tree
Hide file tree
Showing 19 changed files with 302 additions and 529 deletions.
Expand Up @@ -85,7 +85,8 @@ public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpReque
channel.setResponseHandler( responseHandler, future ); channel.setResponseHandler( responseHandler, future );
channel.send( request ); channel.send( request );


String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream ); String operation = format( "Timed out executing operation %s on %s ",
request, upstream );


return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log ); return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
} }
Expand Down
Expand Up @@ -40,7 +40,7 @@ public void onFileHeader( CompletableFuture<T> signal, FileHeader response )
} }


@Override @Override
public boolean onFileContent( CompletableFuture<T> signal, FileChunk response ) public boolean onFileContent( CompletableFuture<T> signal, FileChunk response ) throws IOException
{ {
unimplementedMethod( signal, response ); unimplementedMethod( signal, response );
return false; return false;
Expand Down
Expand Up @@ -35,7 +35,7 @@ public interface CatchUpResponseCallback<T>
{ {
void onFileHeader( CompletableFuture<T> signal, FileHeader fileHeader ); void onFileHeader( CompletableFuture<T> signal, FileHeader fileHeader );


boolean onFileContent( CompletableFuture<T> signal, FileChunk fileChunk ); boolean onFileContent( CompletableFuture<T> signal, FileChunk fileChunk ) throws IOException;


void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response ); void onFileStreamingComplete( CompletableFuture<T> signal, StoreCopyFinishedResponse response );


Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/**
* Used client side for event handling of a store listing request
*/
public class PrepareStoreCopyResponseAdaptor extends CatchUpResponseAdaptor<PrepareStoreCopyResponse>
{
private final StoreFileStreams storeFileStreams;
private final Log log;
private String destination;
private int requiredAlignment;

public PrepareStoreCopyResponseAdaptor( StoreFileStreams storeFileStreams, LogProvider logProvider )
{
this.storeFileStreams = storeFileStreams;
log = logProvider.getLog( PrepareStoreCopyResponseAdaptor.class );
}

@Override
public void onStoreListingResponse( CompletableFuture<PrepareStoreCopyResponse> signal, PrepareStoreCopyResponse response )
{
log.debug( "Complete download of file %s", destination );
signal.complete( response );
}

@Override
public void onFileHeader( CompletableFuture<PrepareStoreCopyResponse> requestOutcomeSignal, FileHeader fileHeader )
{
log.debug( "Received file header for file %s", fileHeader.fileName() );
this.destination = fileHeader.fileName();
this.requiredAlignment = fileHeader.requiredAlignment();
}

@Override
public boolean onFileContent( CompletableFuture<PrepareStoreCopyResponse> signal, FileChunk fileChunk ) throws IOException
{
log.debug( "Received %b bytes for file %s", fileChunk.bytes(), destination );
storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() );
return fileChunk.isLast();
}
}
Expand Up @@ -124,8 +124,10 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI
try try
{ {
long lastFlushedTxId; long lastFlushedTxId;
StreamToDiskProvider streamToDiskProvider = new StreamToDiskProvider( destDir, fs, pageCache, monitors ); try ( StreamToDisk storeFileStreams = new StreamToDisk( destDir, fs, pageCache, monitors ) )
lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, streamToDiskProvider, DEFAULT_TERMINATION_CONDITIONS ); {
lastFlushedTxId = storeCopyClient.copyStoreFiles( addressProvider, expectedStoreId, storeFileStreams, DEFAULT_TERMINATION_CONDITIONS );
}


log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId );


Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.catchup.storecopy; package org.neo4j.causalclustering.catchup.storecopy;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier; import java.util.function.Supplier;


Expand All @@ -34,29 +35,28 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.lang.String.format;

public class StoreCopyClient public class StoreCopyClient
{ {
private final CatchUpClient catchUpClient; private final CatchUpClient catchUpClient;
private final Log log; private final Log log;
private final LogProvider logProvider;


public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider ) public StoreCopyClient( CatchUpClient catchUpClient, LogProvider logProvider )
{ {
this.catchUpClient = catchUpClient; this.catchUpClient = catchUpClient;
log = logProvider.getLog( getClass() ); log = logProvider.getLog( getClass() );
this.logProvider = logProvider;
} }


long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expectedStoreId, StoreFileStreamProvider storeFileStreamProvider, long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expectedStoreId, StoreFileStreams storeFileStreams,
Supplier<TerminationCondition> requestWiseTerminationCondition ) Supplier<TerminationCondition> requestWiseTerminationCondition )
throws StoreCopyFailedException throws StoreCopyFailedException
{ {
try try
{ {
PrepareStoreCopyResponse prepareStoreCopyResponse = prepareStoreCopy( catchupAddressProvider.primary(), expectedStoreId, storeFileStreamProvider ); PrepareStoreCopyResponse prepareStoreCopyResponse = listFiles( catchupAddressProvider.primary(), expectedStoreId, storeFileStreams );
copyFilesIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreamProvider, copyFilesIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreams, requestWiseTerminationCondition );
requestWiseTerminationCondition ); copyIndexSnapshotIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreams,
copyIndexSnapshotIndividually( prepareStoreCopyResponse, expectedStoreId, catchupAddressProvider, storeFileStreamProvider,
requestWiseTerminationCondition ); requestWiseTerminationCondition );
return prepareStoreCopyResponse.lastTransactionId(); return prepareStoreCopyResponse.lastTransactionId();
} }
Expand All @@ -67,9 +67,9 @@ long copyStoreFiles( CatchupAddressProvider catchupAddressProvider, StoreId expe
} }


private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, CatchupAddressProvider addressProvider, private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, CatchupAddressProvider addressProvider,
StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException StoreFileStreams storeFileStreams, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
{ {
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ); CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = new StoreFileCopyResponseAdaptor( storeFileStreams, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
for ( File file : prepareStoreCopyResponse.getFiles() ) for ( File file : prepareStoreCopyResponse.getFiles() )
{ {
Expand All @@ -80,7 +80,7 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes
try try
{ {
AdvertisedSocketAddress from = addressProvider.primary(); AdvertisedSocketAddress from = addressProvider.primary();
log.info( format( "Downloading file '%s' from '%s'", file, from ) ); log.info( String.format( "Downloading file '%s' from '%s'", file, from ) );
StoreCopyFinishedResponse response = StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), copyHandler ); catchUpClient.makeBlockingRequest( from, new GetStoreFileRequest( expectedStoreId, file, lastTransactionId ), copyHandler );
successful = successfulFileDownload( response ); successful = successfulFileDownload( response );
Expand All @@ -100,10 +100,10 @@ private void copyFilesIndividually( PrepareStoreCopyResponse prepareStoreCopyRes
} }


private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId, private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStoreCopyResponse, StoreId expectedStoreId,
CatchupAddressProvider addressProvider, StoreFileStreamProvider storeFileStream, Supplier<TerminationCondition> terminationConditions ) CatchupAddressProvider addressProvider,
throws StoreCopyFailedException StoreFileStreams storeFileStreams, Supplier<TerminationCondition> terminationConditions ) throws StoreCopyFailedException
{ {
CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = StoreCopyResponseAdaptors.filesCopyAdaptor( storeFileStream, log ); CatchUpResponseAdaptor<StoreCopyFinishedResponse> copyHandler = new StoreFileCopyResponseAdaptor( storeFileStreams, log );
long lastTransactionId = prepareStoreCopyResponse.lastTransactionId(); long lastTransactionId = prepareStoreCopyResponse.lastTransactionId();
PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator(); PrimitiveLongIterator indexIds = prepareStoreCopyResponse.getIndexIds().iterator();
while ( indexIds.hasNext() ) while ( indexIds.hasNext() )
Expand All @@ -116,7 +116,7 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
try try
{ {
AdvertisedSocketAddress from = addressProvider.primary(); AdvertisedSocketAddress from = addressProvider.primary();
log.info( format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) ); log.info( String.format( "Downloading snapshot of index '%s' from '%s'", indexId, from ) );
StoreCopyFinishedResponse response = StoreCopyFinishedResponse response =
catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ), catchUpClient.makeBlockingRequest( from, new GetIndexFilesRequest( expectedStoreId, indexId, lastTransactionId ),
copyHandler ); copyHandler );
Expand All @@ -136,12 +136,12 @@ private void copyIndexSnapshotIndividually( PrepareStoreCopyResponse prepareStor
} }
} }


private PrepareStoreCopyResponse prepareStoreCopy( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreamProvider storeFileStream ) private PrepareStoreCopyResponse listFiles( AdvertisedSocketAddress from, StoreId expectedStoreId, StoreFileStreams storeFileStreams )
throws CatchUpClientException, StoreCopyFailedException throws CatchUpClientException, StoreCopyFailedException
{ {
log.info( "Requesting store listing from: " + from ); log.info( "Requesting store listing from: " + from );
PrepareStoreCopyResponse prepareStoreCopyResponse = catchUpClient.makeBlockingRequest( from, new PrepareStoreCopyRequest( expectedStoreId ), PrepareStoreCopyResponse prepareStoreCopyResponse = catchUpClient.makeBlockingRequest( from, new PrepareStoreCopyRequest( expectedStoreId ),
StoreCopyResponseAdaptors.prepareStoreCopyAdaptor( storeFileStream, log ) ); new PrepareStoreCopyResponseAdaptor( storeFileStreams, logProvider ) );
if ( prepareStoreCopyResponse.status() != PrepareStoreCopyResponse.Status.SUCCESS ) if ( prepareStoreCopyResponse.status() != PrepareStoreCopyResponse.Status.SUCCESS )
{ {
throw new StoreCopyFailedException( "Preparing store failed due to: " + prepareStoreCopyResponse.status() ); throw new StoreCopyFailedException( "Preparing store failed due to: " + prepareStoreCopyResponse.status() );
Expand Down Expand Up @@ -173,25 +173,56 @@ private boolean successfulFileDownload( StoreCopyFinishedResponse response ) thr
{ {
StoreCopyFinishedResponse.Status responseStatus = response.status(); StoreCopyFinishedResponse.Status responseStatus = response.status();
log.debug( "Request for individual file resulted in response type: %s", response.status() ); log.debug( "Request for individual file resulted in response type: %s", response.status() );
if ( responseStatus == StoreCopyFinishedResponse.Status.SUCCESS ) if ( responseStatus == StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND )
{
return true;
}
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND )
{ {
return false; return false;
} }
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_UNKNOWN ) else if ( responseStatus == StoreCopyFinishedResponse.Status.SUCCESS )
{ {
return false; return true;
} }
else if ( responseStatus == StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH ) else if ( responseStatus == StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH )
{ {
return false; throw new StoreCopyFailedException( "Store id mismatch" );
} }
else else
{ {
throw new StoreCopyFailedException( "Unknown response type: " + responseStatus ); throw new StoreCopyFailedException( "Unknown response type: " + responseStatus );
} }
} }

public static class StoreFileCopyResponseAdaptor extends CatchUpResponseAdaptor<StoreCopyFinishedResponse>
{
private final StoreFileStreams storeFileStreams;
private final Log log;
private String destination;
private int requiredAlignment;

StoreFileCopyResponseAdaptor( StoreFileStreams storeFileStreams, Log log )
{
this.storeFileStreams = storeFileStreams;
this.log = log;
}

@Override
public void onFileHeader( CompletableFuture<StoreCopyFinishedResponse> requestOutcomeSignal, FileHeader fileHeader )
{
this.destination = fileHeader.fileName();
this.requiredAlignment = fileHeader.requiredAlignment();
}

@Override
public boolean onFileContent( CompletableFuture<StoreCopyFinishedResponse> signal, FileChunk fileChunk ) throws IOException
{
storeFileStreams.write( destination, requiredAlignment, fileChunk.bytes() );
return fileChunk.isLast();
}

@Override
public void onFileStreamingComplete( CompletableFuture<StoreCopyFinishedResponse> signal, StoreCopyFinishedResponse response )
{
log.info( "Finished streaming" );
signal.complete( response );
}
}
} }

This file was deleted.

0 comments on commit a46d897

Please sign in to comment.