Skip to content

Commit

Permalink
rename and move store copy process class
Browse files Browse the repository at this point in the history
It was badly named, placed and instantiated.
  • Loading branch information
martinfurmanski committed Jan 18, 2017
1 parent 750d97c commit 92f13c9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 22 deletions.
Expand Up @@ -17,29 +17,23 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.readreplica;
package org.neo4j.causalclustering.catchup.storecopy;

import java.io.IOException;

import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.catchup.storecopy.TemporaryStoreDirectory;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.Log;

public class CopyStoreSafely
public class StoreCopyProcess
{
private final FileSystemAbstraction fs;
private final LocalDatabase localDatabase;
private final CopiedStoreRecovery copiedStoreRecovery;
private final Log log;

public CopyStoreSafely( FileSystemAbstraction fs, LocalDatabase localDatabase, CopiedStoreRecovery copiedStoreRecovery, Log log )
public StoreCopyProcess( FileSystemAbstraction fs, LocalDatabase localDatabase, CopiedStoreRecovery copiedStoreRecovery, Log log )
{
this.fs = fs;
this.localDatabase = localDatabase;
Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionException;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.internal.DatabaseHealth;
Expand Down Expand Up @@ -322,7 +322,7 @@ private void downloadDatabase( MemberId core, StoreId localStoreId )

try
{
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ).
new StoreCopyProcess( fs, localDatabase, copiedStoreRecovery, log ).
copyWholeStoreFrom( core, localStoreId, storeFetcher );
}
catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e )
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.core.state.CoreState;
import org.neo4j.causalclustering.readreplica.CopyStoreSafely;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -66,6 +66,8 @@ public CoreStateDownloader( FileSystemAbstraction fs, LocalDatabase localDatabas
public synchronized void downloadSnapshot( MemberId source, CoreState coreState ) throws StoreCopyFailedException
{
// TODO: Think about recovery scenarios.
StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fs, localDatabase, copiedStoreRecovery, log );

try
{
/* Extract some key properties before shutting it down. */
Expand Down Expand Up @@ -101,8 +103,7 @@ public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot

if ( isEmptyStore )
{
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ).
copyWholeStoreFrom( source, remoteStoreId, storeFetcher );
storeCopyProcess.copyWholeStoreFrom( source, remoteStoreId, storeFetcher );
}
else
{
Expand All @@ -114,8 +115,8 @@ public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot
{
log.info( "Failed to pull transactions from " + source + ". They may have been pruned away." );
localDatabase.delete();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ).
copyWholeStoreFrom( source, localStoreId, storeFetcher );

storeCopyProcess.copyWholeStoreFrom( source, localStoreId, storeFetcher );
}
else if ( catchupResult != SUCCESS_END_OF_STREAM )
{
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
Expand All @@ -41,7 +42,6 @@

class ReadReplicaStartupProcess implements Lifecycle
{
private final FileSystemAbstraction fs;
private final StoreFetcher storeFetcher;
private final LocalDatabase localDatabase;
private final Lifecycle txPulling;
Expand All @@ -50,22 +50,21 @@ class ReadReplicaStartupProcess implements Lifecycle
private final Log userLog;

private final RetryStrategy retryStrategy;
private final CopiedStoreRecovery copiedStoreRecovery;
private String lastIssue;
private final StoreCopyProcess storeCopyProcess;

ReadReplicaStartupProcess( FileSystemAbstraction fs, StoreFetcher storeFetcher, LocalDatabase localDatabase,
Lifecycle txPulling, CoreMemberSelectionStrategy connectionStrategy, RetryStrategy retryStrategy,
LogProvider debugLogProvider, LogProvider userLogProvider, CopiedStoreRecovery copiedStoreRecovery )
{
this.fs = fs;
this.storeFetcher = storeFetcher;
this.localDatabase = localDatabase;
this.txPulling = txPulling;
this.connectionStrategy = connectionStrategy;
this.retryStrategy = retryStrategy;
this.copiedStoreRecovery = copiedStoreRecovery;
this.debugLog = debugLogProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
this.storeCopyProcess = new StoreCopyProcess( fs, localDatabase, copiedStoreRecovery, debugLog );
}

@Override
Expand Down Expand Up @@ -160,8 +159,7 @@ private void syncStoreWithCore( MemberId source ) throws IOException, StoreIdDow

debugLog.info( "Copying store from core server %s", source );
localDatabase.delete();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, debugLog )
.copyWholeStoreFrom( source, storeId, storeFetcher );
storeCopyProcess.copyWholeStoreFrom( source, storeId, storeFetcher );

debugLog.info( "Restarting local database after copy.", source );
}
Expand Down

0 comments on commit 92f13c9

Please sign in to comment.