Skip to content

Commit

Permalink
Make sure that handlers share state on test catchup server
Browse files Browse the repository at this point in the history
This caused wrong decoders to be used in the test server. Eventually requests would
time out which meant that the channel would be replaced. Hence tests
would eventually be successful but much slower.

This refactors the catchup server handler by adding the protocol in the methods
instead of the constructor. For more easy to use.
  • Loading branch information
RagnarW committed Mar 29, 2018
1 parent 2af74eb commit fe1217e
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 135 deletions.
Expand Up @@ -27,7 +27,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.catchup.storecopy.FileChunkEncoder;
Expand Down Expand Up @@ -60,10 +59,10 @@ public class CatchupProtocolServerInstaller implements ProtocolInstaller<Orienta

public static class Factory extends ProtocolInstaller.Factory<Orientation.Server,CatchupProtocolServerInstaller>
{
public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider,
Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory )
public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider, CatchupServerHandler catchupServerHandler )
{
super( APPLICATION_PROTOCOL, modifiers -> new CatchupProtocolServerInstaller( pipelineBuilderFactory, modifiers, logProvider, handlerFactory ) );
super( APPLICATION_PROTOCOL,
modifiers -> new CatchupProtocolServerInstaller( pipelineBuilderFactory, modifiers, logProvider, catchupServerHandler ) );
}
}

Expand All @@ -72,23 +71,22 @@ public Factory( NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider
private final Log log;

private final LogProvider logProvider;
private final Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory;
private final CatchupServerHandler catchupServerHandler;

CatchupProtocolServerInstaller( NettyPipelineBuilderFactory pipelineBuilderFactory, List<ModifierProtocolInstaller<Orientation.Server>> modifiers,
LogProvider logProvider, Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory )
private CatchupProtocolServerInstaller( NettyPipelineBuilderFactory pipelineBuilderFactory, List<ModifierProtocolInstaller<Orientation.Server>> modifiers,
LogProvider logProvider, CatchupServerHandler catchupServerHandler )
{
this.pipelineBuilderFactory = pipelineBuilderFactory;
this.modifiers = modifiers;
this.log = logProvider.getLog( getClass() );
this.logProvider = logProvider;
this.handlerFactory = handlerFactory;
this.catchupServerHandler = catchupServerHandler;
}

@Override
public void install( Channel channel ) throws Exception
{
CatchupServerProtocol state = new CatchupServerProtocol();
CatchupServerHandler handler = handlerFactory.apply( state );

pipelineBuilderFactory.server( channel, log )
.modify( modifiers )
Expand All @@ -106,12 +104,12 @@ public void install( Channel channel ) throws Exception
.add( "in_req_type", serverMessageHandler( state ) )
.add( "dec_req_dispatch", requestDecoders( state ) )
.add( "out_chunked_write", new ChunkedWriteHandler() )
.add( "hnd_req_tx", handler.txPullRequestHandler() )
.add( "hnd_req_store_id", handler.getStoreIdRequestHandler() )
.add( "hnd_req_store_listing", handler.storeListingRequestHandler() )
.add( "hnd_req_store_file", handler.getStoreFileRequestHandler() )
.add( "hnd_req_index_snapshot", handler.getIndexSnapshotRequestHandler() )
.add( "hnd_req_snapshot", handler.snapshotHandler().map( Collections::singletonList ).orElse( emptyList() ) )
.add( "hnd_req_tx", catchupServerHandler.txPullRequestHandler( state ) )
.add( "hnd_req_store_id", catchupServerHandler.getStoreIdRequestHandler( state ) )
.add( "hnd_req_store_listing", catchupServerHandler.storeListingRequestHandler( state ) )
.add( "hnd_req_store_file", catchupServerHandler.getStoreFileRequestHandler( state ) )
.add( "hnd_req_index_snapshot", catchupServerHandler.getIndexSnapshotRequestHandler( state ) )
.add( "hnd_req_snapshot", catchupServerHandler.snapshotHandler( state ).map( Collections::singletonList ).orElse( emptyList() ) )
.install();
}

Expand Down
Expand Up @@ -22,7 +22,6 @@
import io.netty.channel.ChannelInboundHandler;

import java.util.Collection;
import java.util.function.Function;

import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
Expand All @@ -47,7 +46,7 @@

public class CatchupServerBuilder
{
private final Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory;
private final CatchupServerHandler catchupServerHandler;
private LogProvider debugLogProvider = NullLogProvider.getInstance();
private LogProvider userLogProvider = NullLogProvider.getInstance();
private NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VOID_WRAPPER );
Expand All @@ -57,9 +56,9 @@ public class CatchupServerBuilder
private ListenSocketAddress listenAddress;
private String serverName = "catchup-server";

public CatchupServerBuilder( Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory )
public CatchupServerBuilder( CatchupServerHandler catchupServerHandler )
{
this.handlerFactory = handlerFactory;
this.catchupServerHandler = catchupServerHandler;
}

public CatchupServerBuilder catchupProtocols( ApplicationSupportedProtocols catchupProtocols )
Expand Down Expand Up @@ -116,7 +115,7 @@ public Server build()
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), modifierProtocols );

CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, debugLogProvider,
handlerFactory );
catchupServerHandler );

ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = new ProtocolInstallerRepository<>(
singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );
Expand Down
Expand Up @@ -25,15 +25,15 @@

public interface CatchupServerHandler
{
ChannelHandler txPullRequestHandler();
ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol );

ChannelHandler getStoreIdRequestHandler();
ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupServerProtocol );

ChannelHandler storeListingRequestHandler();
ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol );

ChannelHandler getStoreFileRequestHandler();
ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol );

ChannelHandler getIndexSnapshotRequestHandler();
ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol );

Optional<ChannelHandler> snapshotHandler();
Optional<ChannelHandler> snapshotHandler( CatchupServerProtocol catchupServerProtocol );
}
Expand Up @@ -47,64 +47,77 @@

public class RegularCatchupServerHandler implements CatchupServerHandler
{
private final GetStoreIdRequestHandler storeIdRequestHandler;
private final GetStoreFileRequestHandler storeFileRequestHandler;
private final GetIndexSnapshotRequestHandler indexSnapshotRequestHandler;
private final CoreSnapshotRequestHandler snapshotHandler;
private final PrepareStoreCopyRequestHandler storeListingRequestHandler;
private final TxPullRequestHandler txPullRequestHandler;

public RegularCatchupServerHandler( CatchupServerProtocol protocol, Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier,
private final Monitors monitors;
private final LogProvider logProvider;
private final Supplier<StoreId> storeIdSupplier;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier;
private final Supplier<NeoStoreDataSource> dataSourceSupplier;
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CoreSnapshotService snapshotService;
private final Supplier<CheckPointer> checkPointerSupplier;

public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, CoreSnapshotService snapshotService, Supplier<CheckPointer> checkPointerSupplier )
{
this.snapshotHandler = (snapshotService != null) ? new CoreSnapshotRequestHandler( protocol, snapshotService ) : null;
this.txPullRequestHandler = new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier,
logicalTransactionStoreSupplier, monitors, logProvider );
this.storeIdRequestHandler = new GetStoreIdRequestHandler( protocol, storeIdSupplier );
PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider = new PrepareStoreCopyFilesProvider( pageCache, fs );
this.storeListingRequestHandler = new PrepareStoreCopyRequestHandler( protocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier,
prepareStoreCopyFilesProvider );
this.storeFileRequestHandler = new GetStoreFileRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(),
pageCache, fs, logProvider );
this.indexSnapshotRequestHandler = new GetIndexSnapshotRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier,
new StoreFileStreamingProtocol(), pageCache, fs );

this.monitors = monitors;
this.logProvider = logProvider;
this.storeIdSupplier = storeIdSupplier;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier;
this.dataSourceSupplier = dataSourceSupplier;
this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier;
this.fs = fs;
this.pageCache = pageCache;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.snapshotService = snapshotService;
this.checkPointerSupplier = checkPointerSupplier;
}

@Override
public ChannelHandler txPullRequestHandler()
public ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return txPullRequestHandler;
return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier,
logicalTransactionStoreSupplier, monitors, logProvider );
}

@Override
public ChannelHandler getStoreIdRequestHandler()
public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return storeIdRequestHandler;
return new GetStoreIdRequestHandler( catchupServerProtocol, storeIdSupplier );
}

@Override
public ChannelHandler storeListingRequestHandler()
public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return storeListingRequestHandler;
return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, storeCopyCheckPointMutex, dataSourceSupplier,
new PrepareStoreCopyFilesProvider( pageCache, fs ) );
}

@Override
public ChannelHandler getStoreFileRequestHandler()
public ChannelHandler getStoreFileRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return storeFileRequestHandler;
return new GetStoreFileRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache, fs,
logProvider );
}

@Override
public ChannelHandler getIndexSnapshotRequestHandler()
public ChannelHandler getIndexSnapshotRequestHandler( CatchupServerProtocol catchupServerProtocol )
{
return indexSnapshotRequestHandler;
return new GetIndexSnapshotRequestHandler( catchupServerProtocol, dataSourceSupplier, checkPointerSupplier, new StoreFileStreamingProtocol(), pageCache,
fs );
}

@Override
public Optional<ChannelHandler> snapshotHandler()
public Optional<ChannelHandler> snapshotHandler( CatchupServerProtocol catchupServerProtocol )
{
return Optional.ofNullable( snapshotHandler );
return Optional.ofNullable( (snapshotService != null) ? new CoreSnapshotRequestHandler( catchupServerProtocol, snapshotService ) : null );
}
}
Expand Up @@ -188,22 +188,22 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols );
ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository( ModifierProtocols.values(), supportedModifierProtocols );

Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory = state -> new RegularCatchupServerHandler( state, platformModule.monitors,
CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, snapshotService,
new CheckpointerSupplier( platformModule.dependencies ) );

CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory,
logProvider, handlerFactory );
logProvider, catchupServerHandler );

ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = new ProtocolInstallerRepository<>(
singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );

HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( catchupProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, serverPipelineBuilderFactory, logProvider );

catchupServer = new CatchupServerBuilder( handlerFactory )
catchupServer = new CatchupServerBuilder( catchupServerHandler )
.serverHandler( installedProtocolsHandler )
.catchupProtocols( supportedCatchupProtocols )
.modifierProtocols( supportedModifierProtocols )
Expand Down
Expand Up @@ -316,13 +316,13 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,

ApplicationProtocolRepository catchupProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), supportedCatchupProtocols );

Function<CatchupServerProtocol,CatchupServerHandler> handlerFactory = state -> new RegularCatchupServerHandler( state, platformModule.monitors,
RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors,
logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
fileSystem, platformModule.pageCache, platformModule.storeCopyCheckPointMutex, null, new CheckpointerSupplier( platformModule.dependencies ) );

CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( serverPipelineBuilderFactory,
logProvider, handlerFactory );
logProvider, catchupServerHandler );

ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> serverProtocolInstallerRepository = new ProtocolInstallerRepository<>(
singletonList( catchupProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );
Expand Down

0 comments on commit fe1217e

Please sign in to comment.