Skip to content

Commit

Permalink
Fix cluster seeding
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Feb 14, 2018
1 parent ec32a90 commit bcdec94
Show file tree
Hide file tree
Showing 22 changed files with 193 additions and 94 deletions.
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup.storecopy;

import java.util.Optional;

class CommitState
{
private final long metaDataStoreIndex;
private final Long transactionLogIndex;

CommitState( long metaDataStoreIndex )
{
this.metaDataStoreIndex = metaDataStoreIndex;
this.transactionLogIndex = null;
}

CommitState( long metaDataStoreIndex, long transactionLogIndex )
{
assert transactionLogIndex >= metaDataStoreIndex;

this.metaDataStoreIndex = metaDataStoreIndex;
this.transactionLogIndex = transactionLogIndex;
}

long metaDataStoreIndex()
{
return metaDataStoreIndex;
}

Optional<Long> transactionLogIndex()
{
return Optional.ofNullable( transactionLogIndex );
}

@Override
public String toString()
{
return "CommitState{" + "metaDataStoreIndex=" + metaDataStoreIndex + ", transactionLogIndex=" + transactionLogIndex + '}';
}
}
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.impl.storemigration.LogFiles;
import org.neo4j.kernel.impl.storemigration.StoreFile; import org.neo4j.kernel.impl.storemigration.StoreFile;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
Expand Down Expand Up @@ -252,4 +253,15 @@ private void dropAvailabilityGuard()
availabilityGuard.fulfill( currentRequirement ); availabilityGuard.fulfill( currentRequirement );
currentRequirement = null; currentRequirement = null;
} }

public boolean hasTxLogs()
{
File[] files = storeDir.listFiles( LogFiles.FILENAME_FILTER );
if ( files == null )
{
throw new RuntimeException( "Files was null. Incorrect directory or I/O error?" );
}

return files.length > 0;
}
} }
Expand Up @@ -21,14 +21,14 @@


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;


import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchupResult; import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.TxPullRequestResult; import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter;
import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
Expand All @@ -38,12 +38,12 @@
import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore; import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionStore; import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.neo4j.causalclustering.catchup.CatchupResult.E_TRANSACTION_PRUNED;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_BATCH; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_BATCH;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM; import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
Expand All @@ -54,6 +54,7 @@
public class RemoteStore public class RemoteStore
{ {
private final Log log; private final Log log;
private final LocalDatabase localDatabase;
private final Monitors monitors; private final Monitors monitors;
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final PageCache pageCache; private final PageCache pageCache;
Expand All @@ -62,11 +63,8 @@ public class RemoteStore
private final TxPullClient txPullClient; private final TxPullClient txPullClient;
private final TransactionLogCatchUpFactory transactionLogFactory; private final TransactionLogCatchUpFactory transactionLogFactory;


public RemoteStore( LogProvider logProvider, public RemoteStore( LogProvider logProvider, FileSystemAbstraction fs, PageCache pageCache, StoreCopyClient storeCopyClient, TxPullClient txPullClient,
FileSystemAbstraction fs, PageCache pageCache, TransactionLogCatchUpFactory transactionLogFactory, Monitors monitors, LocalDatabase localDatabase )
StoreCopyClient storeCopyClient, TxPullClient txPullClient,
TransactionLogCatchUpFactory transactionLogFactory,
Monitors monitors )
{ {
this.logProvider = logProvider; this.logProvider = logProvider;
this.storeCopyClient = storeCopyClient; this.storeCopyClient = storeCopyClient;
Expand All @@ -76,45 +74,48 @@ public RemoteStore( LogProvider logProvider,
this.transactionLogFactory = transactionLogFactory; this.transactionLogFactory = transactionLogFactory;
this.monitors = monitors; this.monitors = monitors;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.localDatabase = localDatabase;
} }


/** private CommitState getStoreState() throws IOException
* Later stages of the startup process require at least one transaction to
* figure out the mapping between the transaction log and the consensus log.
*
* If there are no transaction logs then we can pull from and including
* the index which the metadata store points to. This would be the case
* for example with a backup taken during an idle period of the system.
*
* However, if there are transaction logs then we want to find out where
* they end and pull from there, excluding the last one so that we do not
* get duplicate entries.
*/
private long getPullIndex( File storeDir ) throws IOException
{ {
/* this is the metadata store */ ReadOnlyTransactionIdStore metaDataStore = new ReadOnlyTransactionIdStore( pageCache, localDatabase.storeDir() );
ReadOnlyTransactionIdStore txIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); long metaDataStoreTxId = metaDataStore.getLastCommittedTransactionId();


/* Clean as in clean shutdown. Without transaction logs this should be the truth, Optional<Long> latestTransactionLogIndex = getLatestTransactionLogIndex( metaDataStoreTxId );
* but otherwise it can be used as a starting point for scanning the logs. */
long lastCleanTxId = txIdStore.getLastCommittedTransactionId();
log.info( "Last Clean Tx Id: %d", lastCleanTxId );


/* these are the transaction logs */ //noinspection OptionalIsPresent
ReadOnlyTransactionStore txStore = new ReadOnlyTransactionStore( pageCache, fs, storeDir, new Monitors() ); if ( latestTransactionLogIndex.isPresent() )
{
return new CommitState( metaDataStoreTxId, latestTransactionLogIndex.get() );
}
else
{
return new CommitState( metaDataStoreTxId );
}
}

private Optional<Long> getLatestTransactionLogIndex( long startTxId ) throws IOException
{
if ( !localDatabase.hasTxLogs() )
{
return Optional.empty();
}

// this is not really a read-only store, because it will create an empty transaction log if there is none
ReadOnlyTransactionStore txStore = new ReadOnlyTransactionStore( pageCache, fs, localDatabase.storeDir(), new Monitors() );


long lastTxId = BASE_TX_ID; long lastTxId = BASE_TX_ID;
try ( Lifespan ignored = new Lifespan( txStore ) ) try ( Lifespan ignored = new Lifespan( txStore ) )
{ {
TransactionCursor cursor; TransactionCursor cursor;
try try
{ {
cursor = txStore.getTransactions( lastCleanTxId ); cursor = txStore.getTransactions( startTxId );
} }
catch ( NoSuchTransactionException e ) catch ( NoSuchTransactionException e )
{ {
log.info( "No transaction logs found. Will use metadata store as base for pull request." ); return Optional.empty();
return Math.max( TransactionIdStore.BASE_TX_ID + 1, lastCleanTxId );
} }


while ( cursor.next() ) while ( cursor.next() )
Expand All @@ -123,20 +124,48 @@ private long getPullIndex( File storeDir ) throws IOException
lastTxId = tx.getCommitEntry().getTxId(); lastTxId = tx.getCommitEntry().getTxId();
} }


if ( lastTxId < lastCleanTxId ) return Optional.of( lastTxId );
{
throw new IllegalStateException( "Metadata index was higher than transaction log index." );
}

// we don't want to pull a transaction we already have in the log, hence +1
return lastTxId + 1;
} }
} }


public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException /**
* Later stages of the startup process require at least one transaction to
* figure out the mapping between the transaction log and the consensus log.
*
* If there are no transaction logs then we can pull from and including
* the index which the metadata store points to. This would be the case
* for example with a backup taken during an idle period of the system.
*
* However, if there are transaction logs then we want to find out where
* they end and pull from there, excluding the last one so that we do not
* get duplicate entries.
*/
public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId ) throws StoreCopyFailedException, IOException
{ {
long pullIndex = getPullIndex( storeDir ); CommitState commitState = getStoreState();
return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false ); log.info( "Store commit state: " + commitState );

if ( commitState.transactionLogIndex().isPresent() )
{
return pullTransactions( from, expectedStoreId, localDatabase.storeDir(), commitState.transactionLogIndex().get() + 1, false );
}
else
{
CatchupResult catchupResult;
if ( commitState.metaDataStoreIndex() == BASE_TX_ID )
{
return pullTransactions( from, expectedStoreId, localDatabase.storeDir(), commitState.metaDataStoreIndex() + 1, false );
}
else
{
catchupResult = pullTransactions( from, expectedStoreId, localDatabase.storeDir(), commitState.metaDataStoreIndex(), false );
if ( catchupResult == E_TRANSACTION_PRUNED )
{
return pullTransactions( from, expectedStoreId, localDatabase.storeDir(), commitState.metaDataStoreIndex() + 1, false );
}
}
return catchupResult;
}
} }


public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File destDir ) public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File destDir )
Expand Down
Expand Up @@ -20,22 +20,16 @@
package org.neo4j.causalclustering.catchup.storecopy; package org.neo4j.causalclustering.catchup.storecopy;


import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException; import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.lang.String.format;

public class StoreCopyClient public class StoreCopyClient
{ {
private final CatchUpClient catchUpClient; private final CatchUpClient catchUpClient;
Expand Down Expand Up @@ -77,7 +71,7 @@ public boolean onFileContent( CompletableFuture<Long> signal, FileChunk fileChun
public void onFileStreamingComplete( CompletableFuture<Long> signal, public void onFileStreamingComplete( CompletableFuture<Long> signal,
StoreCopyFinishedResponse response ) StoreCopyFinishedResponse response )
{ {
log.info( "Finished streaming %s", destination ); log.info( "Finished streaming" );
signal.complete( response.lastCommittedTxBeforeStoreCopy() ); signal.complete( response.lastCommittedTxBeforeStoreCopy() );
} }
} ); } );
Expand Down
Expand Up @@ -204,7 +204,7 @@ private CoreStateDownloader createCoreStateDownloader( LifeSupport servicesToSto


RemoteStore remoteStore = new RemoteStore( RemoteStore remoteStore = new RemoteStore(
logProvider, platformModule.fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient, logProvider ), logProvider, platformModule.fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient, logProvider ),
new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), platformModule.monitors ); new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), platformModule.monitors, localDatabase );


CopiedStoreRecovery copiedStoreRecovery = platformModule.life.add( CopiedStoreRecovery copiedStoreRecovery = platformModule.life.add(
new CopiedStoreRecovery( platformModule.config, platformModule.kernelExtensions.listFactories(), platformModule.pageCache ) ); new CopiedStoreRecovery( platformModule.config, platformModule.kernelExtensions.listFactories(), platformModule.pageCache ) );
Expand Down
Expand Up @@ -77,9 +77,16 @@ void downloadSnapshot( MemberId source ) throws StoreCopyFailedException
/* Extract some key properties before shutting it down. */ /* Extract some key properties before shutting it down. */
boolean isEmptyStore = localDatabase.isEmpty(); boolean isEmptyStore = localDatabase.isEmpty();


if ( !isEmptyStore ) /*
* There is no reason to try to recover if there are no transaction logs and in fact it is
* also problematic for the initial transaction pull during the snapshot download because the
* kernel will create a transaction log with a header where previous index points to the same
* index as that written down into the metadata store. This is problematic because we have no
* guarantee that there are later transactions and we need at least one transaction in
* the log to figure out the Raft log index (see {@link RecoverConsensusLogIndex}).
*/
if ( localDatabase.hasTxLogs() )
{ {
/* make sure it's recovered before we start messing with catchup */
localDatabase.start(); localDatabase.start();
localDatabase.stop(); localDatabase.stop();
} }
Expand Down Expand Up @@ -120,7 +127,7 @@ public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot
else else
{ {
StoreId localStoreId = localDatabase.storeId(); StoreId localStoreId = localDatabase.storeId();
CatchupResult catchupResult = remoteStore.tryCatchingUp( fromAddress, localStoreId, localDatabase.storeDir() ); CatchupResult catchupResult = remoteStore.tryCatchingUp( fromAddress, localStoreId );


if ( catchupResult == E_TRANSACTION_PRUNED ) if ( catchupResult == E_TRANSACTION_PRUNED )
{ {
Expand Down
Expand Up @@ -227,7 +227,7 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule
new RemoteStore( platformModule.logging.getInternalLogProvider(), fileSystem, platformModule.pageCache, new RemoteStore( platformModule.logging.getInternalLogProvider(), fileSystem, platformModule.pageCache,
new StoreCopyClient( catchUpClient, logProvider ), new StoreCopyClient( catchUpClient, logProvider ),
new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(), new TxPullClient( catchUpClient, platformModule.monitors ), new TransactionLogCatchUpFactory(),
platformModule.monitors ); platformModule.monitors, localDatabase );


CopiedStoreRecovery copiedStoreRecovery = CopiedStoreRecovery copiedStoreRecovery =
new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(),
Expand Down

0 comments on commit bcdec94

Please sign in to comment.