diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupProtocolServerInstaller.java index 5bf2b3eb26d44..c8302bffe3e1f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupProtocolServerInstaller.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import org.neo4j.causalclustering.catchup.storecopy.FileChunkEncoder; @@ -60,10 +59,10 @@ public class CatchupProtocolServerInstaller implements ProtocolInstaller { - public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider, - Function handlerFactory ) + public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider, CatchupServerHandler catchupServerHandler ) { - super( APPLICATION_PROTOCOL, modifiers -> new CatchupProtocolServerInstaller( pipelineBuilderFactory, modifiers, logProvider, handlerFactory ) ); + super( APPLICATION_PROTOCOL, + modifiers -> new CatchupProtocolServerInstaller( pipelineBuilderFactory, modifiers, logProvider, catchupServerHandler ) ); } } @@ -72,23 +71,22 @@ public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider private final Log log; private final LogProvider logProvider; - private final Function handlerFactory; + private final CatchupServerHandler catchupServerHandler; - CatchupProtocolServerInstaller( NettyPipelineBuilderFactory pipelineBuilderFactory, List> modifiers, - LogProvider logProvider, Function handlerFactory ) + private CatchupProtocolServerInstaller( NettyPipelineBuilderFactory pipelineBuilderFactory, List> modifiers, + LogProvider logProvider, CatchupServerHandler catchupServerHandler ) { this.pipelineBuilderFactory = pipelineBuilderFactory; this.modifiers = modifiers; this.log = logProvider.getLog( getClass() ); this.logProvider = logProvider; - this.handlerFactory = handlerFactory; + this.catchupServerHandler = catchupServerHandler; } @Override public void install( Channel channel ) throws Exception { CatchupServerProtocol state = new CatchupServerProtocol(); - CatchupServerHandler handler = handlerFactory.apply( state ); pipelineBuilderFactory.server( channel, log ) .modify( modifiers ) @@ -106,12 +104,12 @@ public void install( Channel channel ) throws Exception .add( "in_req_type", serverMessageHandler( state ) ) .add( "dec_req_dispatch", requestDecoders( state ) ) .add( "out_chunked_write", new ChunkedWriteHandler() ) - .add( "hnd_req_tx", handler.txPullRequestHandler() ) - .add( "hnd_req_store_id", handler.getStoreIdRequestHandler() ) - .add( "hnd_req_store_listing", handler.storeListingRequestHandler() ) - .add( "hnd_req_store_file", handler.getStoreFileRequestHandler() ) - .add( "hnd_req_index_snapshot", handler.getIndexSnapshotRequestHandler() ) - .add( "hnd_req_snapshot", handler.snapshotHandler().map( Collections::singletonList ).orElse( emptyList() ) ) + .add( "hnd_req_tx", catchupServerHandler.txPullRequestHandler( state ) ) + .add( "hnd_req_store_id", catchupServerHandler.getStoreIdRequestHandler( state ) ) + .add( "hnd_req_store_listing", catchupServerHandler.storeListingRequestHandler( state ) ) + .add( "hnd_req_store_file", catchupServerHandler.getStoreFileRequestHandler( state ) ) + .add( "hnd_req_index_snapshot", catchupServerHandler.getIndexSnapshotRequestHandler( state ) ) + .add( "hnd_req_snapshot", catchupServerHandler.snapshotHandler( state ).map( Collections::singletonList ).orElse( emptyList() ) ) .install(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java index 868b1c5ce1d96..0f2ce6806549a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelInboundHandler; import java.util.Collection; -import java.util.function.Function; import org.neo4j.causalclustering.net.Server; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; @@ -47,7 +46,7 @@ public class CatchupServerBuilder { - private final Function handlerFactory; + private final CatchupServerHandler catchupServerHandler; private LogProvider debugLogProvider = NullLogProvider.getInstance(); private LogProvider userLogProvider = NullLogProvider.getInstance(); private NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VOID_WRAPPER ); @@ -57,9 +56,9 @@ public class CatchupServerBuilder private ListenSocketAddress listenAddress; private String serverName = "catchup-server"; - public CatchupServerBuilder( Function handlerFactory ) + public CatchupServerBuilder( CatchupServerHandler catchupServerHandler ) { - this.handlerFactory = handlerFactory; + this.catchupServerHandler = catchupServerHandler; } public CatchupServerBuilder catchupProtocols( ApplicationSupportedProtocols catchupProtocols ) @@ -116,7 +115,7 @@ public Server build() ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), modifierProtocols ); CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, debugLogProvider, - handlerFactory ); + catchupServerHandler ); ProtocolInstallerRepository protocolInstallerRepository = new ProtocolInstallerRepository<>( singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerHandler.java index d9b900f7440de..c082ae61fc7ae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerHandler.java @@ -25,15 +25,15 @@ public interface CatchupServerHandler { - ChannelHandler txPullRequestHandler(); + ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol ); - ChannelHandler getStoreIdRequestHandler(); + ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupServerProtocol ); - ChannelHandler storeListingRequestHandler(); + ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ); - ChannelHandler getStoreFileRequestHandler(); + ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol ); - ChannelHandler getIndexSnapshotRequestHandler(); + ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol ); - Optional snapshotHandler(); + Optional snapshotHandler( CatchupServerProtocol catchupServerProtocol ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java index 67d6bcb9167f7..34a4315747492 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java @@ -47,64 +47,77 @@ public class RegularCatchupServerHandler implements CatchupServerHandler { - private final GetStoreIdRequestHandler storeIdRequestHandler; - private final GetStoreFileRequestHandler storeFileRequestHandler; - private final GetIndexSnapshotRequestHandler indexSnapshotRequestHandler; - private final CoreSnapshotRequestHandler snapshotHandler; - private final PrepareStoreCopyRequestHandler storeListingRequestHandler; - private final TxPullRequestHandler txPullRequestHandler; - public RegularCatchupServerHandler( CatchupServerProtocol protocol, Monitors monitors, LogProvider logProvider, Supplier storeIdSupplier, + private final Monitors monitors; + private final LogProvider logProvider; + private final Supplier storeIdSupplier; + private final Supplier transactionIdStoreSupplier; + private final Supplier logicalTransactionStoreSupplier; + private final Supplier dataSourceSupplier; + private final BooleanSupplier dataSourceAvailabilitySupplier; + private final FileSystemAbstraction fs; + private final PageCache pageCache; + private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; + private final CoreSnapshotService snapshotService; + private final Supplier checkPointerSupplier; + + public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier storeIdSupplier, Supplier transactionIdStoreSupplier, Supplier logicalTransactionStoreSupplier, Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, PageCache pageCache, StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, Supplier checkPointerSupplier ) { - this.snapshotHandler = (snapshotService != null) ? new CoreSnapshotRequestHandler( protocol, snapshotService ) : null; - this.txPullRequestHandler = new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier, - logicalTransactionStoreSupplier, monitors, logProvider ); - this.storeIdRequestHandler = new GetStoreIdRequestHandler( protocol, storeIdSupplier ); - PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider = new PrepareStoreCopyFilesProvider( pageCache, fs ); - this.storeListingRequestHandler = new PrepareStoreCopyRequestHandler( protocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier, - prepareStoreCopyFilesProvider ); - this.storeFileRequestHandler = new GetStoreFileRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), - pageCache, fs, logProvider ); - this.indexSnapshotRequestHandler = new GetIndexSnapshotRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier, - new StoreFileStreamingProtocol(), pageCache, fs ); + + this.monitors = monitors; + this.logProvider = logProvider; + this.storeIdSupplier = storeIdSupplier; + this.transactionIdStoreSupplier = transactionIdStoreSupplier; + this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier; + this.dataSourceSupplier = dataSourceSupplier; + this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier; + this.fs = fs; + this.pageCache = pageCache; + this.storeCopyCheckPointMutex = storeCopyCheckPointMutex; + this.snapshotService = snapshotService; + this.checkPointerSupplier = checkPointerSupplier; } @Override - public ChannelHandler txPullRequestHandler() + public ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return txPullRequestHandler; + return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier, + logicalTransactionStoreSupplier, monitors, logProvider ); } @Override - public ChannelHandler getStoreIdRequestHandler() + public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return storeIdRequestHandler; + return new GetStoreIdRequestHandler( catchupServerProtocol, storeIdSupplier ); } @Override - public ChannelHandler storeListingRequestHandler() + public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return storeListingRequestHandler; + return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier, + new PrepareStoreCopyFilesProvider( pageCache, fs ) ); } @Override - public ChannelHandler getStoreFileRequestHandler() + public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return storeFileRequestHandler; + return new GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache, fs, + logProvider ); } @Override - public ChannelHandler getIndexSnapshotRequestHandler() + public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return indexSnapshotRequestHandler; + return new GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache, + fs ); } @Override - public Optional snapshotHandler() + public Optional snapshotHandler( CatchupServerProtocol catchupServerProtocol ) { - return Optional.ofNullable( snapshotHandler ); + return Optional.ofNullable( (snapshotService != null) ? new CoreSnapshotRequestHandler( catchupServerProtocol, snapshotService ) : null ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 8620ca2d6295d..906de275d354b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -188,14 +188,14 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols ); ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), supportedModifierProtocols ); - Function handlerFactory = state -> new RegularCatchupServerHandler( state, platformModule.monitors, + CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, snapshotService, new CheckpointerSupplier( platformModule.dependencies ) ); CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory, - logProvider, handlerFactory ); + logProvider, catchupServerHandler ); ProtocolInstallerRepository protocolInstallerRepository = new ProtocolInstallerRepository<>( singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); @@ -203,7 +203,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( catchupProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, serverPipelineBuilderFactory, logProvider ); - catchupServer = new CatchupServerBuilder( handlerFactory ) + catchupServer = new CatchupServerBuilder( catchupServerHandler ) .serverHandler( installedProtocolsHandler ) .catchupProtocols( supportedCatchupProtocols ) .modifierProtocols( supportedModifierProtocols ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 0861f4dab5d6f..d14cb677f07a0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -316,13 +316,13 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols ); - Function handlerFactory = state -> new RegularCatchupServerHandler( state, platformModule.monitors, + RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null, new CheckpointerSupplier( platformModule.dependencies ) ); CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory, - logProvider, handlerFactory ); + logProvider, catchupServerHandler ); ProtocolInstallerRepository serverProtocolInstallerRepository = new ProtocolInstallerRepository<>( singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeCatchupServer.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeCatchupServer.java index 2a77d4260ea8e..dbcf399207ca2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeCatchupServer.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeCatchupServer.java @@ -53,15 +53,12 @@ class TestCatchupServerHandler implements CatchupServerHandler private final Set indexFiles = new HashSet<>(); private final Map pathToRequestCountMapping = new HashMap<>(); private final Log log; - final CatchupServerProtocol protocol; private TestDirectory testDirectory; private FileSystemAbstraction fileSystemAbstraction; - TestCatchupServerHandler( LogProvider logProvider, CatchupServerProtocol protocol, TestDirectory testDirectory, - FileSystemAbstraction fileSystemAbstraction ) + TestCatchupServerHandler( LogProvider logProvider, TestDirectory testDirectory, FileSystemAbstraction fileSystemAbstraction ) { log = logProvider.getLog( TestCatchupServerHandler.class ); - this.protocol = protocol; this.testDirectory = testDirectory; this.fileSystemAbstraction = fileSystemAbstraction; } @@ -82,7 +79,7 @@ public int getRequestCount( String file ) } @Override - public ChannelHandler getStoreFileRequestHandler() + public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -95,14 +92,14 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, GetSto { if ( handleFileDoesNotExist( channelHandlerContext, getStoreFileRequest ) ) { - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); return; } handleFileExists( channelHandlerContext, getStoreFileRequest.file() ); } finally { - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); } } }; @@ -118,12 +115,6 @@ private boolean handleFileDoesNotExist( ChannelHandlerContext channelHandlerCont failed( channelHandlerContext ); return true; } - else if ( file.getRemainingNoResponse() > 0 ) - { - log.info( "FakeServer not going to response for file %s", getStoreFileRequest.file() ); - file.setRemainingNoResponse( file.getRemainingNoResponse() - 1 ); - return true; // no response - } return false; } @@ -164,19 +155,19 @@ private StoreResource storeResourceFromEntry( File file ) } @Override - public ChannelHandler txPullRequestHandler() + public ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new ChannelInboundHandlerAdapter(); } @Override - public ChannelHandler getStoreIdRequestHandler() + public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new ChannelInboundHandlerAdapter(); } @Override - public ChannelHandler storeListingRequestHandler() + public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -192,13 +183,13 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, Prepar PrimitiveLongSet indexIds = Primitive.longSet(); indexIds.add( 13 ); channelHandlerContext.writeAndFlush( PrepareStoreCopyResponse.success( files, indexIds, transactionId ) ); - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); } }; } @Override - public ChannelHandler getIndexSnapshotRequestHandler() + public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -220,14 +211,14 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, GetInd } finally { - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); } } }; } @Override - public Optional snapshotHandler() + public Optional snapshotHandler( CatchupServerProtocol catchupServerProtocol ) { return Optional.empty(); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeFile.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeFile.java index 1c45beec4d340..9facb44cb4d02 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeFile.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/FakeFile.java @@ -78,26 +78,6 @@ public void setContent( String content ) this.content = content; } - /** - * Number of times that requests for this file will result in communication breakdown (i.e. no response) - * - * @return - */ - int getRemainingNoResponse() - { - return remainingNoResponse; - } - - void setRemainingNoResponse( int remainingNoResponse ) - { - this.remainingNoResponse = remainingNoResponse; - } - - public void setRelativePath( Path relativePath ) - { - this.relativePath = relativePath; - } - /** * Clear response that the file has failed to copy (safe connection close, communication, ...) * diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java index 183e72e9cedc3..6be6b9a60fd75 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java @@ -68,9 +68,7 @@ import org.neo4j.ports.allocation.PortAuthority; import org.neo4j.test.rule.TestDirectory; -import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -107,7 +105,7 @@ private static void writeContents( FileSystemAbstraction fileSystemAbstraction, @Before public void setup() { - serverHandler = new TestCatchupServerHandler( logProvider, new CatchupServerProtocol(), testDirectory, fsa ); + serverHandler = new TestCatchupServerHandler( logProvider, testDirectory, fsa ); serverHandler.addFile( fileA ); serverHandler.addFile( fileB ); serverHandler.addIndexFile( indexFileA ); @@ -116,7 +114,7 @@ public void setup() writeContents( fsa, relative( indexFileA.getFilename() ), indexFileA.getContent() ); ListenSocketAddress listenAddress = new ListenSocketAddress( "localhost", PortAuthority.allocatePort() ); - catchupServer = new CatchupServerBuilder( protocol -> serverHandler ).listenAddress( listenAddress ).build(); + catchupServer = new CatchupServerBuilder( serverHandler ).listenAddress( listenAddress ).build(); catchupServer.start(); CatchUpClient catchUpClient = new CatchupClientBuilder().build(); @@ -179,27 +177,6 @@ public void failedFileCopyShouldRetry() throws StoreCopyFailedException, IOExcep assertEquals( 1, serverHandler.getRequestCount( fileA.getFilename() ) ); } - @Test - public void reconnectingWorks() throws StoreCopyFailedException, IOException - { - // given local client has a store - InMemoryStoreStreamProvider storeFileStream = new InMemoryStoreStreamProvider(); - - // and file B is broken once (after retry it works) - fileB.setRemainingNoResponse( 1 ); - - // when catchup is performed for valid transactionId and StoreId - CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress( from( catchupServer.address().getPort() ) ); - subject.copyStoreFiles( catchupAddressProvider, serverHandler.getStoreId(), storeFileStream, () -> defaultTerminationCondition ); - - // then the catchup is possible to complete - assertEquals( fileContent( relative( fileA.getFilename() ) ), clientFileContents( storeFileStream, fileA.getFilename() ) ); - assertEquals( fileContent( relative( fileB.getFilename() ) ), clientFileContents( storeFileStream, fileB.getFilename() ) ); - - // and verify file was requested more than once - assertThat( serverHandler.getRequestCount( fileB.getFilename() ), greaterThan( 1 ) ); - } - @Test public void shouldNotAppendToFileWhenRetryingWithNewFile() throws Throwable { @@ -211,10 +188,10 @@ public void shouldNotAppendToFileWhenRetryingWithNewFile() throws Throwable Iterator contents = Iterators.iterator( unfinishedContent, finishedContent ); // and - TestCatchupServerHandler halfWayFailingServerhandler = new TestCatchupServerHandler( logProvider, new CatchupServerProtocol(), testDirectory, fsa ) + TestCatchupServerHandler halfWayFailingServerhandler = new TestCatchupServerHandler( logProvider, testDirectory, fsa ) { @Override - public ChannelHandler getStoreFileRequestHandler() + public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -238,7 +215,7 @@ protected void channelRead0( ChannelHandlerContext ctx, GetStoreFileRequest msg StoreCopyFinishedResponse.Status status = contents.hasNext() ? StoreCopyFinishedResponse.Status.E_UNKNOWN : StoreCopyFinishedResponse.Status.SUCCESS; new StoreFileStreamingProtocol().end( ctx, status ); - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); } private void sendFile( ChannelHandlerContext ctx, File file, PageCache pageCache ) @@ -252,7 +229,7 @@ private void sendFile( ChannelHandlerContext ctx, File file, PageCache pageCache } @Override - public ChannelHandler storeListingRequestHandler() + public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -261,13 +238,13 @@ protected void channelRead0( ChannelHandlerContext ctx, PrepareStoreCopyRequest { ctx.write( ResponseMessageType.PREPARE_STORE_COPY_RESPONSE ); ctx.writeAndFlush( PrepareStoreCopyResponse.success( new File[]{new File( fileName )}, new Empty.EmptyPrimitiveLongSet(), 1 ) ); - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + catchupServerProtocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); } }; } @Override - public ChannelHandler getIndexSnapshotRequestHandler() + public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol ) { return new SimpleChannelInboundHandler() { @@ -286,7 +263,7 @@ protected void channelRead0( ChannelHandlerContext ctx, GetIndexFilesRequest msg { // when ListenSocketAddress listenAddress = new ListenSocketAddress( "localhost", PortAuthority.allocatePort() ); - halfWayFailingServer = new CatchupServerBuilder( protocol -> halfWayFailingServerhandler ).listenAddress( listenAddress ).build(); + halfWayFailingServer = new CatchupServerBuilder( halfWayFailingServerhandler ).listenAddress( listenAddress ).build(); halfWayFailingServer.start(); CatchupAddressProvider addressProvider = diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java index 1fc5f6b318c55..53fa9b5d9a6b9 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java @@ -93,13 +93,13 @@ private static ChildInitializer childInitializer( FileSystemAbstraction fileSyst StoreId storeId = new StoreId( kernelStoreId.getCreationTime(), kernelStoreId.getRandomId(), kernelStoreId.getUpgradeTime(), kernelStoreId.getUpgradeId() ); - Function handlerFactory = state -> new RegularCatchupServerHandler( state, new Monitors(), logProvider, + RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( new Monitors(), logProvider, () -> storeId, dependencies.provideDependency( TransactionIdStore.class ), dependencies.provideDependency( LogicalTransactionStore.class ), dataSource, availability, fileSystem, pageCache, storeCopyCheckPointMutex, null, checkPointer ); NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VoidPipelineWrapperFactory.VOID_WRAPPER ); CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, logProvider, - handlerFactory ); + catchupServerHandler ); ProtocolInstallerRepository protocolInstallerRepository = new ProtocolInstallerRepository<>( singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );