Skip to content

Commit

Permalink
When possible, catch up from another server instead of deleting store.
Browse files Browse the repository at this point in the history
  • Loading branch information
apcj committed Aug 10, 2016
1 parent 77c326a commit 0d13fc5
Show file tree
Hide file tree
Showing 32 changed files with 511 additions and 146 deletions.
Expand Up @@ -19,12 +19,14 @@
*/ */
package org.neo4j.kernel.impl.util; package org.neo4j.kernel.impl.util;


import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;


import org.neo4j.collection.primitive.PrimitiveIntIterator; import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.cursor.Cursor; import org.neo4j.cursor.Cursor;
import org.neo4j.cursor.IOCursor;
import org.neo4j.graphdb.Resource; import org.neo4j.graphdb.Resource;


public class Cursors public class Cursors
Expand Down Expand Up @@ -139,6 +141,29 @@ public T get()
}; };
} }


public static <T> IOCursor<T> io( Cursor<T> cursor )
{
return new IOCursor<T>()
{
@Override
public boolean next() throws IOException
{
return cursor.next();
}

@Override
public void close() throws IOException
{
cursor.close();
}

@Override
public T get()
{
return cursor.get();
}
};
}
public static <T> PrimitiveIntIterator intIterator( final Cursor<T> resourceCursor, final ToIntFunction<T> map ) public static <T> PrimitiveIntIterator intIterator( final Cursor<T> resourceCursor, final ToIntFunction<T> map )
{ {
return new CursorPrimitiveIntIterator<>( resourceCursor, map ); return new CursorPrimitiveIntIterator<>( resourceCursor, map );
Expand Down
Expand Up @@ -139,7 +139,7 @@ protected void initChannel( SocketChannel ch ) throws Exception
pipeline.addLast( new TxPullRequestDecoder( protocol ) ); pipeline.addLast( new TxPullRequestDecoder( protocol ) );
pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier,
transactionIdStoreSupplier, logicalTransactionStoreSupplier, transactionIdStoreSupplier, logicalTransactionStoreSupplier,
monitors) ); monitors, logProvider ) );


pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestDecoder( protocol ) ); pipeline.addLast( new GetStoreRequestDecoder( protocol ) );
Expand Down
Expand Up @@ -100,9 +100,9 @@ public CompletableFuture<CoreSnapshot> requestCoreSnapshot( MemberId serverAddre
return coreSnapshotFuture; return coreSnapshotFuture;
} }


public void pollForTransactions( MemberId serverAddress, long lastTransactionId ) public void pollForTransactions( MemberId serverAddress, StoreId storeId, long lastTransactionId )
{ {
TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId ); TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId, storeId );
send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest ); send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest );
pullRequestMonitor.txPullRequest( lastTransactionId ); pullRequestMonitor.txPullRequest( lastTransactionId );
} }
Expand Down Expand Up @@ -168,9 +168,9 @@ public void onFileStreamingComplete( long lastCommittedTxBeforeStoreCopy )
} }


@Override @Override
public void onTxStreamingComplete( long lastTransactionId ) public void onTxStreamingComplete( long lastTransactionId, boolean success )
{ {
txStreamCompleteListeners.notify( listener -> listener.onTxStreamingComplete( lastTransactionId ) ); txStreamCompleteListeners.notify( listener -> listener.onTxStreamingComplete( lastTransactionId, success ) );
} }


@Override @Override
Expand Down
Expand Up @@ -43,22 +43,24 @@ public class LocalDatabase implements Supplier<StoreId>, Lifecycle
private final DataSourceManager dataSourceManager; private final DataSourceManager dataSourceManager;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier; private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final Supplier<DatabaseHealth> databaseHealthSupplier; private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final Log log;


private volatile StoreId storeId; private volatile StoreId storeId;
private volatile DatabaseHealth databaseHealth; private volatile DatabaseHealth databaseHealth;
private final Log log;


public LocalDatabase( File storeDir, CopiedStoreRecovery copiedStoreRecovery, StoreFiles storeFiles, public LocalDatabase( File storeDir, CopiedStoreRecovery copiedStoreRecovery, StoreFiles storeFiles,
DataSourceManager dataSourceManager, Supplier<TransactionIdStore> transactionIdStoreSupplier, DataSourceManager dataSourceManager,
Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider ) Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<DatabaseHealth> databaseHealthSupplier,
LogProvider logProvider )
{ {
this.storeDir = storeDir; this.storeDir = storeDir;
this.copiedStoreRecovery = copiedStoreRecovery; this.copiedStoreRecovery = copiedStoreRecovery;
this.storeFiles = storeFiles; this.storeFiles = storeFiles;
this.dataSourceManager = dataSourceManager; this.dataSourceManager = dataSourceManager;
this.transactionIdStoreSupplier = transactionIdStoreSupplier; this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.databaseHealthSupplier = databaseHealthSupplier; this.databaseHealthSupplier = databaseHealthSupplier;
log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.storeId = StoreId.DEFAULT; this.storeId = StoreId.DEFAULT;
} }


Expand Down Expand Up @@ -116,22 +118,41 @@ private DatabaseHealth getDatabaseHealth()
return databaseHealth; return databaseHealth;
} }


public void copyStoreFrom( MemberId from, StoreFetcher storeFetcher ) throws StoreCopyFailedException public void bringUpToDateOrReplaceStoreFrom( MemberId source, StoreId wantedStoreId, StoreFetcher storeFetcher ) throws StoreCopyFailedException
{ {
try try
{ {
storeFiles.delete( storeDir ); boolean successfullyCaughtUp = false;
TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( storeDir ); if ( wantedStoreId.equals( storeId ) )
storeFetcher.copyStore( from, tempStore.storeDir() ); {
copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() ); successfullyCaughtUp = tryToCatchUp( source, storeFetcher );
storeFiles.moveTo( tempStore.storeDir(), storeDir ); }

if ( !successfullyCaughtUp )
{
storeFiles.delete( storeDir );
copyWholeStoreFrom( source, wantedStoreId, storeFetcher );
}
} }
catch ( IOException e ) catch ( IOException e )
{ {
throw new StoreCopyFailedException( e ); throw new StoreCopyFailedException( e );
} }
} }


private boolean tryToCatchUp( MemberId source, StoreFetcher storeFetcher ) throws IOException, StoreCopyFailedException
{
return storeFetcher.tryCatchingUp( source, storeId, storeDir );
}

private void copyWholeStoreFrom( MemberId source, StoreId wantedStoreId, StoreFetcher storeFetcher ) throws IOException, StoreCopyFailedException
{
TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( storeDir );
storeFetcher.copyStore( source, wantedStoreId, tempStore.storeDir() );
copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() );
storeFiles.moveTo( tempStore.storeDir(), storeDir );
}

public boolean isEmpty() public boolean isEmpty()
{ {
return transactionIdStoreSupplier.get().getLastCommittedTransactionId() == TransactionIdStore.BASE_TX_ID; return transactionIdStoreSupplier.get().getLastCommittedTransactionId() == TransactionIdStore.BASE_TX_ID;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


Expand Down Expand Up @@ -56,7 +57,28 @@ public StoreFetcher( LogProvider logProvider,
log = logProvider.getLog( getClass() ); log = logProvider.getLog( getClass() );
} }


void copyStore( MemberId from, File storeDir ) throws StoreCopyFailedException boolean tryCatchingUp( MemberId from, StoreId storeId, File storeDir ) throws StoreCopyFailedException, IOException
{
ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir );
long lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId();

try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
{
log.info( "Pulling transactions from: %d", lastCommittedTransactionId );
try
{
long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastCommittedTransactionId, writer );
log.info( "Txs streamed up to %d", lastPulledTxId );
return true;
}
catch ( StoreCopyFailedException e )
{
return false;
}
}
}

void copyStore( MemberId from, StoreId storeId, File storeDir ) throws StoreCopyFailedException
{ {
try try
{ {
Expand All @@ -67,7 +89,7 @@ void copyStore( MemberId from, File storeDir ) throws StoreCopyFailedException
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) ) try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
{ {
log.info( "Pulling transactions from: %d", lastFlushedTxId - 1 ); log.info( "Pulling transactions from: %d", lastFlushedTxId - 1 );
long lastPulledTxId = txPullClient.pullTransactions( from, lastFlushedTxId - 1, writer ); long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastFlushedTxId - 1, writer );
log.info( "Txs streamed up to %d", lastPulledTxId ); log.info( "Txs streamed up to %d", lastPulledTxId );
} }
} }
Expand Down
Expand Up @@ -19,11 +19,15 @@
*/ */
package org.neo4j.coreedge.catchup.tx; package org.neo4j.coreedge.catchup.tx;


import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CoreClient; import org.neo4j.coreedge.catchup.CoreClient;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout;
import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.TimeoutName; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.TimeoutName;
import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy; import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand All @@ -48,6 +52,8 @@
*/ */
public class TxPollingClient extends LifecycleAdapter implements TxPullListener public class TxPollingClient extends LifecycleAdapter implements TxPullListener
{ {
private final Supplier<StoreId> localDatabase;

enum Timeouts implements TimeoutName enum Timeouts implements TimeoutName
{ {
TX_PULLER_TIMEOUT TX_PULLER_TIMEOUT
Expand Down Expand Up @@ -89,9 +95,12 @@ enum States
private long unexpectedCount; private long unexpectedCount;
private boolean streamingCompleted; private boolean streamingCompleted;


public TxPollingClient( LogProvider logProvider, CoreClient coreClient, CoreMemberSelectionStrategy connectionStrategy, public TxPollingClient( LogProvider logProvider, Supplier<StoreId> localDatabase,
RenewableTimeoutService timeoutService, long txPullIntervalMillis, BatchingTxApplier applier ) CoreClient coreClient, CoreMemberSelectionStrategy connectionStrategy,
RenewableTimeoutService timeoutService, long txPullIntervalMillis,
BatchingTxApplier applier )
{ {
this.localDatabase = localDatabase;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.coreClient = coreClient; this.coreClient = coreClient;
this.connectionStrategy = connectionStrategy; this.connectionStrategy = connectionStrategy;
Expand Down Expand Up @@ -135,7 +144,7 @@ public synchronized void onTxReceived( TxPullResponse tx )
* End of tx responses received off the network. * End of tx responses received off the network.
*/ */
@Override @Override
public synchronized void onTxStreamingComplete( long ignored ) public synchronized void onTxStreamingComplete( long ignored, boolean success )
{ {
state.handler.onTxStreamingComplete( this ); state.handler.onTxStreamingComplete( this );
} }
Expand Down Expand Up @@ -179,7 +188,8 @@ public void onTimeout( TxPollingClient ctx )
try try
{ {
transactionServer = ctx.connectionStrategy.coreMember(); transactionServer = ctx.connectionStrategy.coreMember();
ctx.coreClient.pollForTransactions( transactionServer, ctx.applier.lastAppliedTxId() ); ctx.coreClient.pollForTransactions( transactionServer, ctx.localDatabase.get(),
ctx.applier.lastAppliedTxId() );
} }
catch ( Exception e ) catch ( Exception e )
{ {
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.neo4j.coreedge.catchup.CoreClient; import org.neo4j.coreedge.catchup.CoreClient;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;


public class TxPullClient public class TxPullClient
{ {
Expand All @@ -35,19 +37,28 @@ public TxPullClient( CoreClient coreClient )
this.coreClient = coreClient; this.coreClient = coreClient;
} }


public long pullTransactions( MemberId from, long startTxId, TxPullResponseListener txPullResponseListener ) public long pullTransactions( MemberId from, StoreId storeId, long startTxId, TxPullResponseListener txPullResponseListener )
throws StoreCopyFailedException throws StoreCopyFailedException
{ {
coreClient.addTxPullResponseListener( txPullResponseListener ); coreClient.addTxPullResponseListener( txPullResponseListener );


CompletableFuture<Long> txId = new CompletableFuture<>(); CompletableFuture<Long> txId = new CompletableFuture<>();


TxStreamCompleteListener streamCompleteListener = txId::complete; TxStreamCompleteListener streamCompleteListener = ( value, success ) -> {
if ( success )
{
txId.complete( value );
}
else
{
txId.completeExceptionally( new NoSuchTransactionException( startTxId ) );
}
};
coreClient.addTxStreamCompleteListener( streamCompleteListener ); coreClient.addTxStreamCompleteListener( streamCompleteListener );


try try
{ {
coreClient.pollForTransactions( from, startTxId ); coreClient.pollForTransactions( from, storeId, startTxId );
return txId.get(); return txId.get();
} }
catch ( InterruptedException | ExecutionException e ) catch ( InterruptedException | ExecutionException e )
Expand Down
Expand Up @@ -19,28 +19,35 @@
*/ */
package org.neo4j.coreedge.catchup.tx; package org.neo4j.coreedge.catchup.tx;


import org.neo4j.coreedge.messaging.Message; import java.util.Objects;


import org.neo4j.coreedge.catchup.RequestMessageType; import org.neo4j.coreedge.catchup.RequestMessageType;

import org.neo4j.coreedge.identity.StoreId;
import static java.lang.String.format; import org.neo4j.coreedge.messaging.Message;


public class TxPullRequest implements Message public class TxPullRequest implements Message
{ {
public static final RequestMessageType MESSAGE_TYPE = RequestMessageType.TX_PULL_REQUEST; public static final RequestMessageType MESSAGE_TYPE = RequestMessageType.TX_PULL_REQUEST;


private long txId; private long txId;
private final StoreId storeId;


public TxPullRequest( long txId ) public TxPullRequest( long txId, StoreId storeId )
{ {
this.txId = txId; this.txId = txId;
this.storeId = storeId;
} }


public long txId() public long txId()
{ {
return txId; return txId;
} }


public StoreId storeId()
{
return storeId;
}

@Override @Override
public boolean equals( Object o ) public boolean equals( Object o )
{ {
Expand All @@ -52,23 +59,19 @@ public boolean equals( Object o )
{ {
return false; return false;
} }

TxPullRequest that = (TxPullRequest) o; TxPullRequest that = (TxPullRequest) o;

return txId == that.txId && Objects.equals( storeId, that.storeId );
return txId == that.txId;
} }


@Override @Override
public int hashCode() public int hashCode()
{ {
int result = (int) (txId ^ (txId >>> 32)); return Objects.hash( txId, storeId );
result = 31 * result;
return result;
} }


@Override @Override
public String toString() public String toString()
{ {
return format( "TxPullRequest{txId=%d}", txId ); return String.format( "TxPullRequest{txId=%d, storeId=%s}", txId, storeId );
} }
} }
Expand Up @@ -27,6 +27,9 @@
import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageDecoder;


import org.neo4j.coreedge.catchup.CatchupServerProtocol; import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal;


import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage.TX_PULL; import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage.TX_PULL;


Expand All @@ -45,7 +48,8 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out
if ( protocol.isExpecting( TX_PULL ) ) if ( protocol.isExpecting( TX_PULL ) )
{ {
long txId = msg.readLong(); long txId = msg.readLong();
out.add( new TxPullRequest( txId ) ); StoreId storeId = StoreIdMarshal.unmarshal( new NetworkReadableClosableChannelNetty4( msg ) );
out.add( new TxPullRequest( txId, storeId ) );
} }
else else
{ {
Expand Down

0 comments on commit 0d13fc5

Please sign in to comment.