diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Cursors.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Cursors.java index c5cd559e881ea..0b9affb694a48 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Cursors.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Cursors.java @@ -19,12 +19,14 @@ */ package org.neo4j.kernel.impl.util; +import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.function.ToIntFunction; import org.neo4j.collection.primitive.PrimitiveIntIterator; import org.neo4j.cursor.Cursor; +import org.neo4j.cursor.IOCursor; import org.neo4j.graphdb.Resource; public class Cursors @@ -139,6 +141,29 @@ public T get() }; } + public static IOCursor io( Cursor cursor ) + { + return new IOCursor() + { + @Override + public boolean next() throws IOException + { + return cursor.next(); + } + + @Override + public void close() throws IOException + { + cursor.close(); + } + + @Override + public T get() + { + return cursor.get(); + } + }; + } public static PrimitiveIntIterator intIterator( final Cursor resourceCursor, final ToIntFunction map ) { return new CursorPrimitiveIntIterator<>( resourceCursor, map ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java index c7c40b8e8c34d..fe225e91df46b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchupServer.java @@ -139,7 +139,7 @@ protected void initChannel( SocketChannel ch ) throws Exception pipeline.addLast( new TxPullRequestDecoder( protocol ) ); pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, transactionIdStoreSupplier, logicalTransactionStoreSupplier, - monitors) ); + monitors, logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new GetStoreRequestDecoder( protocol ) ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java index 5d7ebdc897891..54082195c3435 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CoreClient.java @@ -100,9 +100,9 @@ public CompletableFuture requestCoreSnapshot( MemberId serverAddre return coreSnapshotFuture; } - public void pollForTransactions( MemberId serverAddress, long lastTransactionId ) + public void pollForTransactions( MemberId serverAddress, StoreId storeId, long lastTransactionId ) { - TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId ); + TxPullRequest txPullRequest = new TxPullRequest( lastTransactionId, storeId ); send( serverAddress, RequestMessageType.TX_PULL_REQUEST, txPullRequest ); pullRequestMonitor.txPullRequest( lastTransactionId ); } @@ -168,9 +168,9 @@ public void onFileStreamingComplete( long lastCommittedTxBeforeStoreCopy ) } @Override - public void onTxStreamingComplete( long lastTransactionId ) + public void onTxStreamingComplete( long lastTransactionId, boolean success ) { - txStreamCompleteListeners.notify( listener -> listener.onTxStreamingComplete( lastTransactionId ) ); + txStreamCompleteListeners.notify( listener -> listener.onTxStreamingComplete( lastTransactionId, success ) ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java index 408c6e328c3d5..608e035c63573 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabase.java @@ -43,14 +43,16 @@ public class LocalDatabase implements Supplier, Lifecycle private final DataSourceManager dataSourceManager; private final Supplier transactionIdStoreSupplier; private final Supplier databaseHealthSupplier; + private final Log log; private volatile StoreId storeId; private volatile DatabaseHealth databaseHealth; - private final Log log; public LocalDatabase( File storeDir, CopiedStoreRecovery copiedStoreRecovery, StoreFiles storeFiles, - DataSourceManager dataSourceManager, Supplier transactionIdStoreSupplier, - Supplier databaseHealthSupplier, LogProvider logProvider ) + DataSourceManager dataSourceManager, + Supplier transactionIdStoreSupplier, + Supplier databaseHealthSupplier, + LogProvider logProvider ) { this.storeDir = storeDir; this.copiedStoreRecovery = copiedStoreRecovery; @@ -58,7 +60,7 @@ public LocalDatabase( File storeDir, CopiedStoreRecovery copiedStoreRecovery, St this.dataSourceManager = dataSourceManager; this.transactionIdStoreSupplier = transactionIdStoreSupplier; this.databaseHealthSupplier = databaseHealthSupplier; - log = logProvider.getLog( getClass() ); + this.log = logProvider.getLog( getClass() ); this.storeId = StoreId.DEFAULT; } @@ -116,15 +118,21 @@ private DatabaseHealth getDatabaseHealth() return databaseHealth; } - public void copyStoreFrom( MemberId from, StoreFetcher storeFetcher ) throws StoreCopyFailedException + public void bringUpToDateOrReplaceStoreFrom( MemberId source, StoreId wantedStoreId, StoreFetcher storeFetcher ) throws StoreCopyFailedException { try { - storeFiles.delete( storeDir ); - TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( storeDir ); - storeFetcher.copyStore( from, tempStore.storeDir() ); - copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() ); - storeFiles.moveTo( tempStore.storeDir(), storeDir ); + boolean successfullyCaughtUp = false; + if ( wantedStoreId.equals( storeId ) ) + { + successfullyCaughtUp = tryToCatchUp( source, storeFetcher ); + } + + if ( !successfullyCaughtUp ) + { + storeFiles.delete( storeDir ); + copyWholeStoreFrom( source, wantedStoreId, storeFetcher ); + } } catch ( IOException e ) { @@ -132,6 +140,19 @@ public void copyStoreFrom( MemberId from, StoreFetcher storeFetcher ) throws Sto } } + private boolean tryToCatchUp( MemberId source, StoreFetcher storeFetcher ) throws IOException, StoreCopyFailedException + { + return storeFetcher.tryCatchingUp( source, storeId, storeDir ); + } + + private void copyWholeStoreFrom( MemberId source, StoreId wantedStoreId, StoreFetcher storeFetcher ) throws IOException, StoreCopyFailedException + { + TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( storeDir ); + storeFetcher.copyStore( source, wantedStoreId, tempStore.storeDir() ); + copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() ); + storeFiles.moveTo( tempStore.storeDir(), storeDir ); + } + public boolean isEmpty() { return transactionIdStoreSupplier.get().getLastCommittedTransactionId() == TransactionIdStore.BASE_TX_ID; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java index b4ab18e17aa66..e981b93bad665 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcher.java @@ -29,6 +29,7 @@ import org.neo4j.coreedge.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -56,7 +57,28 @@ public StoreFetcher( LogProvider logProvider, log = logProvider.getLog( getClass() ); } - void copyStore( MemberId from, File storeDir ) throws StoreCopyFailedException + boolean tryCatchingUp( MemberId from, StoreId storeId, File storeDir ) throws StoreCopyFailedException, IOException + { + ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); + long lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId(); + + try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) ) + { + log.info( "Pulling transactions from: %d", lastCommittedTransactionId ); + try + { + long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastCommittedTransactionId, writer ); + log.info( "Txs streamed up to %d", lastPulledTxId ); + return true; + } + catch ( StoreCopyFailedException e ) + { + return false; + } + } + } + + void copyStore( MemberId from, StoreId storeId, File storeDir ) throws StoreCopyFailedException { try { @@ -67,7 +89,7 @@ void copyStore( MemberId from, File storeDir ) throws StoreCopyFailedException try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) ) { log.info( "Pulling transactions from: %d", lastFlushedTxId - 1 ); - long lastPulledTxId = txPullClient.pullTransactions( from, lastFlushedTxId - 1, writer ); + long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastFlushedTxId - 1, writer ); log.info( "Txs streamed up to %d", lastPulledTxId ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java index cc78093381e3b..a2f7c56eb7f8c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java @@ -19,11 +19,15 @@ */ package org.neo4j.coreedge.catchup.tx; +import java.util.function.Supplier; + import org.neo4j.coreedge.catchup.CoreClient; +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; import org.neo4j.coreedge.core.consensus.schedule.RenewableTimeoutService.TimeoutName; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; @@ -48,6 +52,8 @@ */ public class TxPollingClient extends LifecycleAdapter implements TxPullListener { + private final Supplier localDatabase; + enum Timeouts implements TimeoutName { TX_PULLER_TIMEOUT @@ -89,9 +95,12 @@ enum States private long unexpectedCount; private boolean streamingCompleted; - public TxPollingClient( LogProvider logProvider, CoreClient coreClient, CoreMemberSelectionStrategy connectionStrategy, - RenewableTimeoutService timeoutService, long txPullIntervalMillis, BatchingTxApplier applier ) + public TxPollingClient( LogProvider logProvider, Supplier localDatabase, + CoreClient coreClient, CoreMemberSelectionStrategy connectionStrategy, + RenewableTimeoutService timeoutService, long txPullIntervalMillis, + BatchingTxApplier applier ) { + this.localDatabase = localDatabase; this.log = logProvider.getLog( getClass() ); this.coreClient = coreClient; this.connectionStrategy = connectionStrategy; @@ -135,7 +144,7 @@ public synchronized void onTxReceived( TxPullResponse tx ) * End of tx responses received off the network. */ @Override - public synchronized void onTxStreamingComplete( long ignored ) + public synchronized void onTxStreamingComplete( long ignored, boolean success ) { state.handler.onTxStreamingComplete( this ); } @@ -179,7 +188,8 @@ public void onTimeout( TxPollingClient ctx ) try { transactionServer = ctx.connectionStrategy.coreMember(); - ctx.coreClient.pollForTransactions( transactionServer, ctx.applier.lastAppliedTxId() ); + ctx.coreClient.pollForTransactions( transactionServer, ctx.localDatabase.get(), + ctx.applier.lastAppliedTxId() ); } catch ( Exception e ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java index 070f09cc3cbf2..adb4a439be565 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java @@ -25,6 +25,8 @@ import org.neo4j.coreedge.catchup.CoreClient; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException; public class TxPullClient { @@ -35,19 +37,28 @@ public TxPullClient( CoreClient coreClient ) this.coreClient = coreClient; } - public long pullTransactions( MemberId from, long startTxId, TxPullResponseListener txPullResponseListener ) + public long pullTransactions( MemberId from, StoreId storeId, long startTxId, TxPullResponseListener txPullResponseListener ) throws StoreCopyFailedException { coreClient.addTxPullResponseListener( txPullResponseListener ); CompletableFuture txId = new CompletableFuture<>(); - TxStreamCompleteListener streamCompleteListener = txId::complete; + TxStreamCompleteListener streamCompleteListener = ( value, success ) -> { + if ( success ) + { + txId.complete( value ); + } + else + { + txId.completeExceptionally( new NoSuchTransactionException( startTxId ) ); + } + }; coreClient.addTxStreamCompleteListener( streamCompleteListener ); try { - coreClient.pollForTransactions( from, startTxId ); + coreClient.pollForTransactions( from, storeId, startTxId ); return txId.get(); } catch ( InterruptedException | ExecutionException e ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequest.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequest.java index 778679b81a88e..9fe33cfd2aeeb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequest.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequest.java @@ -19,21 +19,23 @@ */ package org.neo4j.coreedge.catchup.tx; -import org.neo4j.coreedge.messaging.Message; +import java.util.Objects; import org.neo4j.coreedge.catchup.RequestMessageType; - -import static java.lang.String.format; +import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.coreedge.messaging.Message; public class TxPullRequest implements Message { public static final RequestMessageType MESSAGE_TYPE = RequestMessageType.TX_PULL_REQUEST; private long txId; + private final StoreId storeId; - public TxPullRequest( long txId ) + public TxPullRequest( long txId, StoreId storeId ) { this.txId = txId; + this.storeId = storeId; } public long txId() @@ -41,6 +43,11 @@ public long txId() return txId; } + public StoreId storeId() + { + return storeId; + } + @Override public boolean equals( Object o ) { @@ -52,23 +59,19 @@ public boolean equals( Object o ) { return false; } - TxPullRequest that = (TxPullRequest) o; - - return txId == that.txId; + return txId == that.txId && Objects.equals( storeId, that.storeId ); } @Override public int hashCode() { - int result = (int) (txId ^ (txId >>> 32)); - result = 31 * result; - return result; + return Objects.hash( txId, storeId ); } @Override public String toString() { - return format( "TxPullRequest{txId=%d}", txId ); + return String.format( "TxPullRequest{txId=%d, storeId=%s}", txId, storeId ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestDecoder.java index 349c7d851847b..9c703c8257b1d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestDecoder.java @@ -27,6 +27,9 @@ import io.netty.handler.codec.MessageToMessageDecoder; import org.neo4j.coreedge.catchup.CatchupServerProtocol; +import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; +import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal; import static org.neo4j.coreedge.catchup.CatchupServerProtocol.NextMessage.TX_PULL; @@ -45,7 +48,8 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out if ( protocol.isExpecting( TX_PULL ) ) { long txId = msg.readLong(); - out.add( new TxPullRequest( txId ) ); + StoreId storeId = StoreIdMarshal.unmarshal( new NetworkReadableClosableChannelNetty4( msg ) ); + out.add( new TxPullRequest( txId, storeId ) ); } else { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncoder.java index 36f41f8099dbf..29e4c26e7a7ab 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncoder.java @@ -25,6 +25,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; +import org.neo4j.coreedge.messaging.NetworkFlushableChannelNetty4; +import org.neo4j.coreedge.messaging.marsalling.storeid.StoreIdMarshal; + public class TxPullRequestEncoder extends MessageToMessageEncoder { @Override @@ -32,6 +35,7 @@ protected void encode( ChannelHandlerContext ctx, TxPullRequest request, List { @@ -41,18 +44,20 @@ public class TxPullRequestHandler extends SimpleChannelInboundHandler storeIdSupplier, Supplier transactionIdStoreSupplier, Supplier logicalTransactionStoreSupplier, - Monitors monitors ) + Monitors monitors, LogProvider logProvider ) { this.protocol = protocol; this.storeId = storeIdSupplier.get(); this.transactionIdStore = transactionIdStoreSupplier.get(); this.logicalTransactionStore = logicalTransactionStoreSupplier.get(); this.monitor = monitors.newMonitor( TxPullRequestsMonitor.class ); + this.log = logProvider.getLog( getClass() ); } @Override @@ -60,11 +65,12 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg { long startTxId = Math.max( msg.txId(), TransactionIdStore.BASE_TX_ID ); long endTxId = startTxId; + boolean success = true; if ( transactionIdStore.getLastCommittedTransactionId() > startTxId ) { try ( IOCursor cursor = - logicalTransactionStore.getTransactions( startTxId + 1 ) ) + logicalTransactionStore.getTransactions( startTxId + 1 ) ) { while ( cursor.next() ) { @@ -75,10 +81,14 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg } ctx.flush(); } + catch ( NoSuchTransactionException e ) + { + success = false; + log.info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", endTxId ); + } } - ctx.write( ResponseMessageType.TX_STREAM_FINISHED ); - ctx.write( new TxStreamFinishedResponse( endTxId ) ); + ctx.write( new TxStreamFinishedResponse( endTxId, success ) ); ctx.flush(); monitor.increment(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamCompleteListener.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamCompleteListener.java index 75bd8c808088b..f3c82abaa01a8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamCompleteListener.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamCompleteListener.java @@ -21,5 +21,5 @@ public interface TxStreamCompleteListener { - void onTxStreamingComplete( long lastTransactionId ); + void onTxStreamingComplete( long lastTransactionId, boolean success ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponse.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponse.java index 4fe6aa7d01f90..a29c863e2aa50 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponse.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponse.java @@ -22,10 +22,17 @@ class TxStreamFinishedResponse { private final long lastTransactionIdSent; + private final boolean success; - TxStreamFinishedResponse( long lastTransactionIdSent ) + TxStreamFinishedResponse( long lastTransactionIdSent, boolean success ) { this.lastTransactionIdSent = lastTransactionIdSent; + this.success = success; + } + + boolean isSuccess() + { + return success; } long lastTransactionIdSent() @@ -47,7 +54,7 @@ public boolean equals( Object o ) TxStreamFinishedResponse that = (TxStreamFinishedResponse) o; - return lastTransactionIdSent == that.lastTransactionIdSent; + return lastTransactionIdSent == that.lastTransactionIdSent && success == that.success; } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java index 9b97ddf0788f8..3ff0a9c941bfd 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseDecoder.java @@ -45,7 +45,8 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out if ( protocol.isExpecting( NextMessage.TX_STREAM_FINISHED ) ) { long lastTransactionIdSent = msg.readLong(); - out.add( new TxStreamFinishedResponse( lastTransactionIdSent ) ); + boolean success = msg.readBoolean(); + out.add( new TxStreamFinishedResponse( lastTransactionIdSent, success ) ); } else { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncoder.java index 84ce0e8e37aad..5169cb12d6391 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncoder.java @@ -33,6 +33,7 @@ protected void encode( ChannelHandlerContext ctx, TxStreamFinishedResponse respo { ByteBuf encoded = ctx.alloc().buffer(); encoded.writeLong( response.lastTransactionIdSent() ); + encoded.writeBoolean( response.isSuccess() ); out.add( encoded ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseHandler.java index 699f42975e5e7..40b94da4a3aa1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseHandler.java @@ -41,7 +41,7 @@ public TxStreamFinishedResponseHandler( CatchupClientProtocol protocol, TxStream @Override protected void channelRead0( ChannelHandlerContext ctx, TxStreamFinishedResponse msg ) throws Exception { - listener.onTxStreamingComplete( msg.lastTransactionIdSent() ); + listener.onTxStreamingComplete( msg.lastTransactionIdSent(), msg.isSuccess() ); protocol.expect( NextMessage.MESSAGE_TYPE ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java index 8917836d7fde1..bc6f8400b46a9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java @@ -78,7 +78,7 @@ public void handle( RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) ConsensusOutcome outcome = raftMachine.handle( storeIdAwareMessage.message() ); if ( outcome.needsFreshSnapshot() ) { - notifyNeedFreshSnapshot(); + notifyNeedFreshSnapshot( storeId ); } else { @@ -123,7 +123,7 @@ public void handle( RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) */ log.info( "StoreId mismatch but store was empty so downloading new store from %s. Expected: " + "%s, Encountered: %s. ", message.from(), storeId, localDatabase.storeId() ); - downloadSnapshot( message.from() ); + downloadSnapshot( message.from(), storeId ); } else { @@ -148,11 +148,11 @@ private synchronized void notifyCommitted( long commitIndex ) applicationProcess.notifyCommitted( commitIndex ); } - private synchronized void notifyNeedFreshSnapshot() + private synchronized void notifyNeedFreshSnapshot( StoreId storeId ) { try { - downloadSnapshot( someoneElse.coreMember() ); + downloadSnapshot( someoneElse.coreMember(), storeId ); } catch ( CoreMemberSelectionException e ) { @@ -164,16 +164,18 @@ private synchronized void notifyNeedFreshSnapshot() * Attempts to download a fresh snapshot from another core instance. * * @param source The source address to attempt a download of a snapshot from. + * @param storeId */ - private void downloadSnapshot( MemberId source ) + private void downloadSnapshot( MemberId source, StoreId storeId ) { try { applicationProcess.sync(); - downloader.downloadSnapshot( source, this ); + downloader.downloadSnapshot( source, storeId, this ); } catch ( InterruptedException | StoreCopyFailedException e ) { + e.printStackTrace(); log.error( "Failed to download snapshot", e ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java index 7b7295ebac607..5ede135d12e7c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java @@ -28,6 +28,7 @@ import org.neo4j.coreedge.catchup.storecopy.StoreFetcher; import org.neo4j.coreedge.core.state.CoreState; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -49,7 +50,7 @@ public CoreStateDownloader( LocalDatabase localDatabase, StoreFetcher storeFetch this.log = logProvider.getLog( getClass() ); } - public synchronized void downloadSnapshot( MemberId source, CoreState coreState ) + public synchronized void downloadSnapshot( MemberId source, StoreId storeId, CoreState coreState ) throws InterruptedException, StoreCopyFailedException { try @@ -76,7 +77,7 @@ public synchronized void downloadSnapshot( MemberId source, CoreState coreState throw new StoreCopyFailedException( e ); } - localDatabase.copyStoreFrom( source, storeFetcher ); // this deletes the current store + localDatabase.bringUpToDateOrReplaceStoreFrom( source, storeId, storeFetcher ); // this deletes the current store log.info( "Replaced store with one downloaded from %s", source ); /* We install the snapshot after the store has been downloaded, diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java index b64036fe898e9..22a3a7d2b7af1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EdgeStartupProcess.java @@ -28,6 +28,7 @@ import org.neo4j.coreedge.core.state.machines.tx.RetryStrategy; import org.neo4j.coreedge.discovery.EdgeTopologyService; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionException; import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy; import org.neo4j.kernel.configuration.Config; @@ -80,7 +81,8 @@ public void start() throws Throwable if ( localDatabase.isEmpty() ) { localDatabase.stop(); - localDatabase.copyStoreFrom( memberId, storeFetcher ); + StoreId storeId = storeFetcher.storeId( memberId ); + localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); localDatabase.start(); } else diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java index 02afaecfbc070..30dcfe43f32c8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/edge/EnterpriseEdgeEditionModule.java @@ -200,7 +200,16 @@ public void registerProcedures( Procedures procedures ) ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "tx-applier", NEW_THREAD ), batchingTxApplier ); DelayedRenewableTimeoutService txPullerTimeoutService = new DelayedRenewableTimeoutService( Clock.systemUTC(), logProvider ); - TxPollingClient txPuller = new TxPollingClient( logProvider, + + LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, + new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), platformModule + .pageCache ), + new StoreFiles( new DefaultFileSystemAbstraction() ), + platformModule.dataSourceManager, + dependencies.provideDependency( TransactionIdStore.class ), + databaseHealthSupplier, logProvider ); + + TxPollingClient txPuller = new TxPollingClient( logProvider, localDatabase, edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService, config.get( CoreEdgeClusterSettings.pull_interval ), batchingTxApplier ); @@ -215,12 +224,7 @@ edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ), new TransactionLogCatchUpFactory() ); life.add( new EdgeStartupProcess( storeFetcher, - new LocalDatabase( platformModule.storeDir, - new CopiedStoreRecovery( config, platformModule.kernelExtensions.listFactories(), platformModule.pageCache ), - new StoreFiles( new DefaultFileSystemAbstraction() ), - platformModule.dataSourceManager, - dependencies.provideDependency( TransactionIdStore.class ), - databaseHealthSupplier, logProvider ), + localDatabase, txPulling, new ConnectToRandomCoreMember( discoveryService ), new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, discoveryService, config ) ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java index 6cfe39dd9e933..186ed0f60b6d9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/LocalDatabaseTest.java @@ -19,10 +19,10 @@ */ package org.neo4j.coreedge.catchup.storecopy; -import org.junit.Test; - import java.io.File; +import org.junit.Test; + import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; @@ -36,9 +36,14 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import static org.neo4j.function.Suppliers.singleton; public class LocalDatabaseTest @@ -99,10 +104,67 @@ public void shouldThrowWhenDifferentStoreIds() throws Throwable } } + @Test + public void shouldCatchUpStoreIfPossible() throws Throwable + { + // given + File storeDir = new File( "directory" ); + StoreFiles storeFiles = mock( StoreFiles.class ); + StoreId storeId = new StoreId( 1, 2, 4, 5 ); + + DataSourceManager dataSourceManager = mock( DataSourceManager.class ); + NeoStoreDataSource neoStoreDataSource = mock( NeoStoreDataSource.class ); + when( dataSourceManager.getDataSource() ).thenReturn( neoStoreDataSource ); + when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 3, 4, 5 ) ); + + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 12L ); + + LocalDatabase localDatabase = new LocalDatabase( storeDir, mock( CopiedStoreRecovery.class ), + storeFiles, dataSourceManager, () -> transactionIdStore, () -> mock( DatabaseHealth.class ), + NullLogProvider.getInstance() ); + + MemberId memberId = mock( MemberId.class ); + StoreFetcher storeFetcher = mock( StoreFetcher.class ); + when( storeFetcher.tryCatchingUp( memberId, storeId, storeDir ) ).thenReturn( true ); + + // when + localDatabase.start(); + localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); + + verify( storeFiles, never() ).delete( storeDir ); + verify( storeFetcher, never() ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) ); + } + + @Test + public void shouldCopyStoreIfCatchupFails() throws Exception + { + // given + StoreFiles storeFiles = mock( StoreFiles.class ); + StoreId storeId = new StoreId( 6, 7, 8, 9 ); + + File storeDir = new File( "directory" ); + LocalDatabase localDatabase = new LocalDatabase( storeDir, mock( CopiedStoreRecovery.class ), + storeFiles, null, singleton( mock( TransactionIdStore.class ) ), () -> mock( DatabaseHealth.class ), + NullLogProvider.getInstance() ); + + MemberId memberId = mock( MemberId.class ); + StoreFetcher storeFetcher = mock( StoreFetcher.class ); + when( storeFetcher.tryCatchingUp( memberId, storeId, storeDir ) ).thenReturn( false ); + + // when + localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); + + // then + verify( storeFiles ).delete( any( File.class ) ); + verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) ); + } + @Test public void storeCopyShouldResetStoreId() throws Throwable { // given + StoreId storeId = new StoreId( 6, 7, 8, 9 ); NeoStoreDataSource mockDatasource = mock( NeoStoreDataSource.class ); when( mockDatasource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 3, 4, 5 ) ); StoreId copied = new StoreId( 5, 6, 7, 8 ); @@ -117,7 +179,7 @@ public void storeCopyShouldResetStoreId() throws Throwable // and stopped, the store copied, and restarted localDatabase.stop(); - localDatabase.copyStoreFrom( memberId, storeFetcher ); + localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); when( mockDatasource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 5, 6, 3, 7, 8 ) ); localDatabase.start(); @@ -159,6 +221,7 @@ public void beingStoppedShouldReturnNonMatchingStoreId() throws Throwable public void askingForStoreIdWhileStoreCopyingShouldStillLeaveNewStoreIdAfterCopyCompletes() throws Throwable { // given + StoreId storeId = new StoreId( 6, 7, 8, 9 ); NeoStoreDataSource mockDatasource = mock( NeoStoreDataSource.class ); when( mockDatasource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); StoreId copied = new StoreId( 5, 6, 7, 8 ); @@ -176,7 +239,7 @@ public void askingForStoreIdWhileStoreCopyingShouldStillLeaveNewStoreIdAfterCopy // and the storeId is asked for localDatabase.storeId(); // and the store copy happens - localDatabase.copyStoreFrom( memberId, storeFetcher ); + localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); when( mockDatasource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 5, 6, 12, 7, 8 ) ); localDatabase.start(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcherTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcherTest.java index 2ad98aa5cdbb6..9527b0beb52bd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcherTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/storecopy/StoreFetcherTest.java @@ -19,17 +19,18 @@ */ package org.neo4j.coreedge.catchup.storecopy; -import org.junit.Test; - import java.io.File; import java.io.IOException; import java.util.UUID; +import org.junit.Test; + import org.neo4j.coreedge.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.coreedge.catchup.tx.TransactionLogCatchUpWriter; import org.neo4j.coreedge.catchup.tx.TxPullClient; import org.neo4j.coreedge.catchup.tx.TxPullResponseListener; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.logging.LogProvider; @@ -49,6 +50,7 @@ public class StoreFetcherTest public void shouldCopyStoreFilesAndPullTransactions() throws Exception { // given + StoreId storeId = new StoreId( 1, 2, 3, 4 ); StoreCopyClient storeCopyClient = mock( StoreCopyClient.class ); TxPullClient txPullClient = mock( TxPullClient.class ); TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class ); @@ -59,11 +61,11 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception // when MemberId localhost = new MemberId( UUID.randomUUID() ); - fetcher.copyStore( localhost, new File( "destination" ) ); + fetcher.copyStore( localhost, storeId, new File( "destination" ) ); // then verify( storeCopyClient ).copyStoreFiles( eq( localhost ), any( StoreFileStreams.class ) ); - verify( txPullClient ).pullTransactions( eq( localhost ), anyLong(), any( TxPullResponseListener.class ) ); + verify( txPullClient ).pullTransactions( eq( localhost ), eq( storeId ), anyLong(), any( TxPullResponseListener.class ) ); } @Test @@ -72,7 +74,7 @@ public void shouldSetLastPulledTransactionId() throws Exception // given long lastFlushedTxId = 12; long lastPulledTxId = 34; - + StoreId storeId = new StoreId( 1, 2, 3, 4 ); MemberId localhost = new MemberId( UUID.randomUUID() ); StoreCopyClient storeCopyClient = mock( StoreCopyClient.class ); @@ -80,7 +82,7 @@ public void shouldSetLastPulledTransactionId() throws Exception .thenReturn( lastFlushedTxId ); TxPullClient txPullClient = mock( TxPullClient.class ); - when( txPullClient.pullTransactions( eq( localhost ), anyLong(), any( TxPullResponseListener.class ) ) ) + when( txPullClient.pullTransactions( eq( localhost ), eq( storeId ), anyLong(), any( TxPullResponseListener.class ) ) ) .thenReturn( lastPulledTxId ); TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class ); @@ -90,16 +92,17 @@ public void shouldSetLastPulledTransactionId() throws Exception storeCopyClient, txPullClient, factory( writer ) ); // when - fetcher.copyStore( localhost, new File( "destination" ) ); + fetcher.copyStore( localhost, storeId, new File( "destination" ) ); // then - verify( txPullClient ).pullTransactions( eq( localhost ), eq( lastFlushedTxId - 1 ), any( TxPullResponseListener.class ) ); + verify( txPullClient ).pullTransactions( eq( localhost ), eq( storeId ), eq( lastFlushedTxId - 1 ), any( TxPullResponseListener.class ) ); } @Test public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception { // given + StoreId storeId = new StoreId( 1, 2, 3, 4 ); StoreCopyClient storeCopyClient = mock( StoreCopyClient.class ); TxPullClient txPullClient = mock( TxPullClient.class ); TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class ); @@ -109,12 +112,12 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception storeCopyClient, txPullClient, factory( writer ) ); doThrow( StoreCopyFailedException.class ).when( txPullClient ) - .pullTransactions( any( MemberId.class ), anyLong(), any( TransactionLogCatchUpWriter.class ) ); + .pullTransactions( any( MemberId.class ), eq( storeId ), anyLong(), any( TransactionLogCatchUpWriter.class ) ); // when try { - fetcher.copyStore( null, null ); + fetcher.copyStore( null, storeId, null ); } catch ( StoreCopyFailedException e ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java index 68a7068d4cba2..f7200584239a3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java @@ -53,8 +53,10 @@ public class TxPollingClientTest private final ControlledRenewableTimeoutService timeoutService = new ControlledRenewableTimeoutService(); private final long txPullTimeoutMillis = 100; + private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); - private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), coreClient, serverSelection, + private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), () -> storeId, + coreClient, serverSelection, timeoutService, txPullTimeoutMillis, txApplier ); @Before @@ -76,7 +78,7 @@ public void shouldSendPullRequestOnTick() throws Throwable timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( coreClient ).pollForTransactions( any( MemberId.class ), eq( lastAppliedTxId ) ); + verify( coreClient ).pollForTransactions( any( MemberId.class ), eq( storeId ), eq( lastAppliedTxId ) ); } @Test @@ -89,7 +91,7 @@ public void shouldNotScheduleNewPullIfThereIsWorkPending() throws Exception timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( coreClient, never() ).pollForTransactions( any( MemberId.class ), anyLong() ); + verify( coreClient, never() ).pollForTransactions( any( MemberId.class ), eq( storeId ), anyLong() ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncodeDecodeTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncodeDecodeTest.java index a0ca983a4673b..0ef9c0d52b961 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncodeDecodeTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestEncodeDecodeTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.neo4j.coreedge.catchup.CatchupServerProtocol; +import org.neo4j.coreedge.identity.StoreId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; @@ -42,7 +43,7 @@ public void shouldEncodeAndDecodePullRequestMessage() // given final long arbitraryId = 23; - TxPullRequest sent = new TxPullRequest( arbitraryId ); + TxPullRequest sent = new TxPullRequest( arbitraryId, new StoreId( 1, 2, 3, 4 ) ); // when channel.writeOutbound( sent ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandlerTest.java new file mode 100644 index 0000000000000..e3af55b698f28 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPullRequestHandlerTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.catchup.tx; + +import io.netty.channel.ChannelHandlerContext; +import org.junit.Test; + +import org.neo4j.coreedge.catchup.CatchupServerProtocol; +import org.neo4j.coreedge.catchup.ResponseMessageType; +import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; +import org.neo4j.kernel.impl.transaction.command.Commands; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; +import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException; +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; +import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.AssertableLogProvider; +import org.neo4j.logging.NullLogProvider; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import static org.neo4j.kernel.impl.transaction.command.Commands.createNode; +import static org.neo4j.kernel.impl.util.Cursors.cursor; +import static org.neo4j.kernel.impl.util.Cursors.io; +import static org.neo4j.logging.AssertableLogProvider.inLog; + +public class TxPullRequestHandlerTest +{ + @Test + public void shouldRespondWithStreamOfTransactions() throws Exception + { + // given + StoreId storeId = new StoreId( 1, 2, 3, 4 ); + + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); + + LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); + when( logicalTransactionStore.getTransactions( 13L ) ).thenReturn( io( cursor( + tx( 13 ), + tx( 14 ), + tx( 15 ) + ) ) ); + + TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), + () -> storeId, () -> transactionIdStore, () -> logicalTransactionStore, + new Monitors(), NullLogProvider.getInstance() ); + ChannelHandlerContext context = mock( ChannelHandlerContext.class ); + + // when + txPullRequestHandler.channelRead0( context, new TxPullRequest( 12, storeId ) ); + + // then + verify( context, times( 3 ) ).write( ResponseMessageType.TX ); + verify( context ).write( new TxPullResponse( storeId, tx( 13 ) ) ); + verify( context ).write( new TxPullResponse( storeId, tx( 14 ) ) ); + verify( context ).write( new TxPullResponse( storeId, tx( 15 ) ) ); + + verify( context ).write( ResponseMessageType.TX_STREAM_FINISHED ); + verify( context ).write( new TxStreamFinishedResponse( 15, true ) ); + } + + @Test + public void shouldRespondWithoutTransactionsIfTheyDoNotExist() throws Exception + { + // given + StoreId storeId = new StoreId( 1, 2, 3, 4 ); + + TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); + when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); + + LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); + when( logicalTransactionStore.getTransactions( 13L ) ).thenThrow( new NoSuchTransactionException( 13 ) ); + + AssertableLogProvider logProvider = new AssertableLogProvider(); + TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), + () -> storeId, () -> transactionIdStore, () -> logicalTransactionStore, + new Monitors(), logProvider ); + ChannelHandlerContext context = mock( ChannelHandlerContext.class ); + + // when + txPullRequestHandler.channelRead0( context, new TxPullRequest( 12, storeId ) ); + + // then + verify( context ).write( ResponseMessageType.TX_STREAM_FINISHED ); + verify( context ).write( new TxStreamFinishedResponse( 12, false ) ); + logProvider.assertAtLeastOnce( inLog( TxPullRequestHandler.class ) + .info( "Failed to serve TxPullRequest for tx %d because the transaction does not exist.", 12L ) ); + } + + private static CommittedTransactionRepresentation tx( int id ) + { + return new CommittedTransactionRepresentation( + new LogEntryStart( id, id, id, id - 1, new byte[]{}, LogPosition.UNSPECIFIED ), + Commands.transactionRepresentation( createNode( 0 ) ), + new OnePhaseCommit( id, id ) ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java index 57cd750e2c2f4..daa20fed19bf3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxStreamFinishedResponseEncodeDecodeTest.java @@ -42,7 +42,7 @@ public void shouldEncodeAndDecodePullRequestMessage() // given final long arbitraryId = 23; - TxStreamFinishedResponse sent = new TxStreamFinishedResponse( arbitraryId ); + TxStreamFinishedResponse sent = new TxStreamFinishedResponse( arbitraryId, true ); // when channel.writeOutbound( sent ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java index 14eb6a7b87cc3..079301c00b5ec 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/HazelcastClusterTopologyTest.java @@ -19,11 +19,6 @@ */ package org.neo4j.coreedge.discovery; -import com.hazelcast.client.impl.MemberImpl; -import com.hazelcast.core.Member; -import com.hazelcast.nio.Address; -import org.junit.Test; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -33,17 +28,23 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import com.hazelcast.client.impl.MemberImpl; +import com.hazelcast.core.Member; +import com.hazelcast.nio.Address; +import org.junit.Test; + import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.core.state.machines.tx.ConstantTimeRetryStrategy; -import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; +import org.neo4j.coreedge.catchup.storecopy.StoreFetcher; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; +import org.neo4j.coreedge.core.state.machines.tx.ConstantTimeRetryStrategy; +import org.neo4j.coreedge.edge.EdgeStartupProcess; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy; -import org.neo4j.coreedge.edge.EdgeStartupProcess; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.collection.Pair; import org.neo4j.kernel.configuration.Config; -import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.NullLog; import org.neo4j.logging.NullLogProvider; @@ -56,6 +57,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.buildMemberAttributes; import static org.neo4j.coreedge.discovery.HazelcastClusterTopology.extractMemberAttributes; @@ -65,6 +67,7 @@ public class HazelcastClusterTopologyTest public void edgeServersShouldRegisterThemselvesWithTheTopologyWhenTheyStart() throws Throwable { // given + MemberId memberId = new MemberId( UUID.randomUUID() ); final Map params = new HashMap<>(); params.put( new GraphDatabaseSettings.BoltConnector( "bolt" ).type.name(), "BOLT" ); @@ -78,11 +81,15 @@ public void edgeServersShouldRegisterThemselvesWithTheTopologyWhenTheyStart() th // when final CoreMemberSelectionStrategy connectionStrategy = mock( CoreMemberSelectionStrategy.class ); - when( connectionStrategy.coreMember() ).thenReturn( new MemberId( UUID.randomUUID() ) ); + when( connectionStrategy.coreMember() ).thenReturn( memberId ); LocalDatabase localDatabase = mock( LocalDatabase.class ); when( localDatabase.isEmpty() ).thenReturn( true ); - final EdgeStartupProcess startupProcess = new EdgeStartupProcess( null, + + StoreFetcher storeFetcher = mock( StoreFetcher.class ); + when( storeFetcher.storeId( memberId ) ).thenReturn( new StoreId( 1, 2, 3, 4 ) ); + + final EdgeStartupProcess startupProcess = new EdgeStartupProcess( storeFetcher, localDatabase, mock( Lifecycle.class ), connectionStrategy, @@ -131,9 +138,9 @@ public void shouldCollectMembersAsAMap() throws Exception coreMembers.add( memberId ); Config config = Config.defaults(); HashMap settings = new HashMap<>(); - settings.put( CoreEdgeClusterSettings.transaction_advertised_address.name(), "tx:" + (i + 1 )); - settings.put( CoreEdgeClusterSettings.raft_advertised_address.name(), "raft:" + (i + 1 )); - settings.put( GraphDatabaseSettings.bolt_advertised_address.name(), "bolt:" + (i + 1)); + settings.put( CoreEdgeClusterSettings.transaction_advertised_address.name(), "tx:" + (i + 1) ); + settings.put( CoreEdgeClusterSettings.raft_advertised_address.name(), "raft:" + (i + 1) ); + settings.put( GraphDatabaseSettings.bolt_advertised_address.name(), "bolt:" + (i + 1) ); config.augment( settings ); hazelcastMembers.add( new MemberImpl( new Address( "localhost", i ), null, buildMemberAttributes( memberId, config ).getAttributes() ) ); @@ -147,9 +154,9 @@ public void shouldCollectMembersAsAMap() throws Exception for ( int i = 0; i < 5; i++ ) { CoreAddresses coreAddresses = coreMemberMap.get( coreMembers.get( i ) ); - assertEquals( new AdvertisedSocketAddress( "tx:" + (i + 1 )), coreAddresses.getCatchupServer() ); - assertEquals( new AdvertisedSocketAddress( "raft:" + (i + 1 )), coreAddresses.getRaftServer() ); - assertEquals( new AdvertisedSocketAddress( "bolt:" + (i + 1 )), coreAddresses.getBoltServer() ); + assertEquals( new AdvertisedSocketAddress( "tx:" + (i + 1) ), coreAddresses.getCatchupServer() ); + assertEquals( new AdvertisedSocketAddress( "raft:" + (i + 1) ), coreAddresses.getRaftServer() ); + assertEquals( new AdvertisedSocketAddress( "bolt:" + (i + 1) ), coreAddresses.getBoltServer() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java index 1bb5a388dae65..dd40a9d62df3a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/edge/EdgeStartupProcessTest.java @@ -19,18 +19,12 @@ */ package org.neo4j.coreedge.edge; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; -import static org.neo4j.helpers.collection.Iterators.asSet; - import java.util.UUID; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; + import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.StoreFetcher; import org.neo4j.coreedge.core.state.machines.tx.ConstantTimeRetryStrategy; @@ -45,16 +39,29 @@ import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.NullLogProvider; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import static org.neo4j.helpers.collection.Iterators.asSet; + public class EdgeStartupProcessTest { @Test public void startShouldReplaceTheEmptyLocalStoreWithStoreFromCoreMemberAndStartPolling() throws Throwable { // given + MemberId memberId = new MemberId( UUID.randomUUID() ); + StoreId storeId = new StoreId( 1, 2, 3, 4 ); + StoreFetcher storeFetcher = mock( StoreFetcher.class ); + when( storeFetcher.storeId( memberId ) ).thenReturn( storeId ); + LocalDatabase localDatabase = mock( LocalDatabase.class ); - MemberId memberId = new MemberId( UUID.randomUUID() ); TopologyService hazelcastTopology = mock( TopologyService.class ); ClusterTopology clusterTopology = mock( ClusterTopology.class ); @@ -76,7 +83,7 @@ txPulling, new AlwaysChooseFirstMember( hazelcastTopology ), // then Mockito.verify( localDatabase ).isEmpty(); Mockito.verify( localDatabase ).stop(); - Mockito.verify( localDatabase ).copyStoreFrom( memberId, storeFetcher ); + Mockito.verify( localDatabase ).bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher ); Mockito.verify( localDatabase, times( 2 ) ).start(); // once for initial start, once for after store copy Mockito.verify( txPulling ).start(); Mockito.verifyNoMoreInteractions( localDatabase, txPulling ); @@ -90,7 +97,8 @@ public void startShouldNotReplaceTheNonEmptyLocalStoreWithStoreFromCoreMemberAnd MemberId memberId = new MemberId( UUID.randomUUID() ); LocalDatabase localDatabase = Mockito.mock( LocalDatabase.class ); Mockito.when( localDatabase.isEmpty() ).thenReturn( false ); - Mockito.doThrow( IllegalStateException.class ).when( localDatabase ).ensureSameStoreId( memberId, storeFetcher ); + Mockito.doThrow( IllegalStateException.class ).when( localDatabase ).ensureSameStoreId( memberId, + storeFetcher ); TopologyService hazelcastTopology = Mockito.mock( TopologyService.class ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java index 99ed7f314c313..282dc5ebbfd3c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.neo4j.coreedge.discovery.Cluster; @@ -162,7 +161,7 @@ public void laggingFollowerShouldDownloadSnapshot() throws Exception cluster.removeCoreMemberWithMemberId( 0 ); - createSomeData( 100, cluster ); + SampleData.createSomeData( 100, cluster ); for ( CoreClusterMember db : cluster.coreMembers() ) { @@ -197,7 +196,7 @@ public void badFollowerShouldNotJoinCluster() throws Exception cluster.removeCoreMemberWithMemberId( 0 ); changeStoreId( storeDir ); - createSomeData( 100, cluster ); + SampleData.createSomeData( 100, cluster ); for ( CoreClusterMember db : cluster.coreMembers() ) { @@ -227,7 +226,7 @@ public void aNewServerShouldJoinTheClusterByDownloadingASnapshot() throws Except tx.success(); } ); - createSomeData( 100, cluster ); + SampleData.createSomeData( 100, cluster ); for ( CoreClusterMember db : cluster.coreMembers() ) { @@ -252,19 +251,6 @@ private List storeDirs( Collection dbs ) return dbs.stream().map( CoreClusterMember::storeDir ).collect( Collectors.toList() ); } - private void createSomeData( int items, Cluster cluster ) throws TimeoutException, InterruptedException - { - for ( int i = 0; i < items; i++ ) - { - cluster.coreTx( ( db, tx ) -> - { - Node node = db.createNode( label( "boo" ) ); - node.setProperty( "foobar", "baz_bat" ); - tx.success(); - } ); - } - } - private void changeStoreId( File storeDir ) throws IOException { File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java index 584f67283ed12..2fddc16bae1fe 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CorePruningIT.java @@ -33,7 +33,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog.SEGMENTED_LOG_DIRECTORY_NAME; -import static org.neo4j.coreedge.scenarios.CoreToCoreCopySnapshotIT.createData; +import static org.neo4j.coreedge.scenarios.SampleData.createData; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_strategy; import static org.neo4j.test.assertion.Assert.assertEventually; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index 0420444897263..f0287bafb8416 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -24,25 +24,25 @@ import org.junit.Rule; import org.junit.Test; + import org.neo4j.coreedge.catchup.storecopy.StoreFiles; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.CoreGraphDatabase; import org.neo4j.coreedge.core.consensus.roles.Role; import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.CoreClusterMember; -import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.RelationshipType; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.test.DbRepresentation; import org.neo4j.test.coreedge.ClusterRule; -import static org.junit.Assert.assertEquals; import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; + import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_frequency; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_strategy; +import static org.neo4j.coreedge.scenarios.SampleData.createData; import static org.neo4j.helpers.collection.MapUtil.stringMap; public class CoreToCoreCopySnapshotIT @@ -152,21 +152,6 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except follower.start(); } - static void createData( GraphDatabaseService db, int size ) - { - for ( int i = 0; i < size; i++ ) - { - Node node1 = db.createNode(); - Node node2 = db.createNode(); - - node1.setProperty( "hej", "svej" ); - node2.setProperty( "tjabba", "tjena" ); - - Relationship rel = node1.createRelationshipTo( node2, RelationshipType.withName( "halla" ) ); - rel.setProperty( "this", "that" ); - } - } - private int getOldestLogIdOn( CoreClusterMember clusterMember ) throws TimeoutException { return clusterMember.getLogFileNames().firstKey().intValue(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/SampleData.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/SampleData.java new file mode 100644 index 0000000000000..be817eda29666 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/SampleData.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.scenarios; + +import java.util.concurrent.TimeoutException; + +import org.neo4j.coreedge.discovery.Cluster; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; + +import static org.neo4j.graphdb.Label.label; + +class SampleData +{ + static void createSomeData( int items, Cluster cluster ) throws TimeoutException, InterruptedException + { + for ( int i = 0; i < items; i++ ) + { + cluster.coreTx( ( db, tx ) -> + { + Node node = db.createNode( label( "boo" ) ); + node.setProperty( "foobar", "baz_bat" ); + tx.success(); + } ); + } + } + + static void createData( GraphDatabaseService db, int size ) + { + for ( int i = 0; i < size; i++ ) + { + Node node1 = db.createNode(); + Node node2 = db.createNode(); + + node1.setProperty( "hej", "svej" ); + node2.setProperty( "tjabba", "tjena" ); + + Relationship rel = node1.createRelationshipTo( node2, RelationshipType.withName( "halla" ) ); + rel.setProperty( "this", "that" ); + } + } +}