Skip to content

Commit

Permalink
Probably made read replica catch up from read replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwebber committed Feb 8, 2017
1 parent 865adff commit 962ebee
Show file tree
Hide file tree
Showing 39 changed files with 740 additions and 214 deletions.
Expand Up @@ -435,6 +435,8 @@ public void start()
}
catch ( Throwable e )
{
System.out.println("problems --> ");
e.printStackTrace();
currentStatus = changedStatus( instance, currentStatus, LifecycleStatus.STOPPED );
if( e instanceof LifecycleException )
{
Expand Down
Expand Up @@ -31,8 +31,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.discovery.CoreAddresses;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.discovery.CatchupServerAddress;
import org.neo4j.causalclustering.discovery.ReadReplicaTopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
Expand All @@ -48,7 +48,7 @@
public class CatchUpClient extends LifecycleAdapter
{
private final LogProvider logProvider;
private final TopologyService discoveryService;
private final ReadReplicaTopologyService discoveryService;
private final Log log;
private final Clock clock;
private final Monitors monitors;
Expand All @@ -57,7 +57,7 @@ public class CatchUpClient extends LifecycleAdapter

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, Clock clock,
public CatchUpClient( ReadReplicaTopologyService discoveryService, LogProvider logProvider, Clock clock,
long inactivityTimeoutMillis, Monitors monitors )
{
this.logProvider = logProvider;
Expand All @@ -68,12 +68,13 @@ public CatchUpClient( TopologyService discoveryService, LogProvider logProvider,
this.monitors = monitors;
}

public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
public <T> T makeBlockingRequest( MemberId upstream, CatchUpRequest request,
CatchUpResponseCallback<T> responseHandler ) throws CatchUpClientException
{
CompletableFuture<T> future = new CompletableFuture<>();
Optional<AdvertisedSocketAddress> catchUpAddress =
discoveryService.coreServers().find( target ).map( CoreAddresses::getCatchupServer );
discoveryService.allServers().find( upstream ).map( CatchupServerAddress::getCatchupServer );


CatchUpChannel channel = pool.acquire( catchUpAddress.orElseThrow(
() -> new CatchUpClientException( "Cannot find the target member socket address" ) ) );
Expand All @@ -93,7 +94,7 @@ public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
channel.send( request );

String operation = String.format( "Timed out executing operation %s on %s (%s)",
request, target, catchUpAddress.get() );
request, upstream, catchUpAddress.get() );

return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
}
Expand Down
Expand Up @@ -137,11 +137,8 @@ public synchronized void start() throws Throwable

workerGroup = new NioEventLoopGroup( 0, threadFactory );

ServerBootstrap bootstrap = new ServerBootstrap()
.group( workerGroup )
.channel( NioServerSocketChannel.class )
.localAddress( listenAddress.socketAddress() )
.childHandler( new ChannelInitializer<SocketChannel>()
ServerBootstrap bootstrap = new ServerBootstrap().group( workerGroup ).channel( NioServerSocketChannel.class )
.localAddress( listenAddress.socketAddress() ).childHandler( new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel( SocketChannel ch )
Expand Down Expand Up @@ -175,14 +172,21 @@ protected void initChannel( SocketChannel ch )
transactionIdStoreSupplier, logicalTransactionStoreSupplier, txPullBatchSize,
monitors, logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() );

pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier, fs, pageCache, logProvider, storeCopyCheckPointMutex ) );

pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );
pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) );

if ( coreState != null )
{
pipeline.addLast( new CoreSnapshotRequestHandler( protocol, coreState ) );
}

pipeline.addLast( new ExceptionLoggingHandler( log ) );
pipeline.addLast( new ExceptionMonitoringHandler(
monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class, CatchupServer.class ) ) );
monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class,
CatchupServer.class ) ) );
pipeline.addLast( new ExceptionSwallowingHandler() );
}
} );
Expand All @@ -191,28 +195,31 @@ protected void initChannel( SocketChannel ch )
{
channel = bootstrap.bind().syncUninterruptibly().channel();
}
catch( Exception e )
catch ( Exception e )
{
// thanks to netty we need to catch everything and do an instanceof because it does not declare properly
// checked exception but it still throws them with some black magic at runtime.
//noinspection ConstantConditions
if ( e instanceof BindException )
{
userLog.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress );
log.error( "Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address + " with value: " + listenAddress, e );
userLog.error(
"Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address +
" with value: " + listenAddress );
log.error(
"Address is already bound for setting: " + CausalClusteringSettings.transaction_listen_address +
" with value: " + listenAddress, e );
throw e;
}
}
}

private ChannelInboundHandler decoders( CatchupServerProtocol protocol )
{
RequestDecoderDispatcher<State> decoderDispatcher =
new RequestDecoderDispatcher<>( protocol, logProvider );
RequestDecoderDispatcher<State> decoderDispatcher = new RequestDecoderDispatcher<>( protocol, logProvider );
decoderDispatcher.register( State.TX_PULL, new TxPullRequestDecoder() );
decoderDispatcher.register( State.GET_STORE, new GetStoreRequestDecoder() );
decoderDispatcher.register( State.GET_STORE_ID, new SimpleRequestDecoder( GetStoreIdRequest::new ) );
decoderDispatcher.register( State.GET_CORE_SNAPSHOT, new SimpleRequestDecoder( CoreSnapshotRequest::new) );
decoderDispatcher.register( State.GET_CORE_SNAPSHOT, new SimpleRequestDecoder( CoreSnapshotRequest::new ) );
return decoderDispatcher;
}

Expand Down
Expand Up @@ -287,10 +287,10 @@ public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,

private void copyStore()
{
MemberId core;
MemberId upstream;
try
{
core = selectionStrategyPipeline.bestUpstreamDatabase();
upstream = selectionStrategyPipeline.bestUpstreamDatabase();
}
catch ( UpstreamDatabaseSelectionException e )
{
Expand All @@ -299,10 +299,10 @@ private void copyStore()
}

StoreId localStoreId = localDatabase.storeId();
downloadDatabase( core, localStoreId );
downloadDatabase( upstream, localStoreId );
}

private void downloadDatabase( MemberId core, StoreId localStoreId )
private void downloadDatabase( MemberId upstream, StoreId localStoreId )
{
try
{
Expand All @@ -316,11 +316,11 @@ private void downloadDatabase( MemberId core, StoreId localStoreId )

try
{
storeCopyProcess.replaceWithStoreFrom( core, localStoreId );
storeCopyProcess.replaceWithStoreFrom( upstream, localStoreId );
}
catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e )
{
log.warn( String.format( "Error copying store from: %s. Will retry shortly.", core ) );
log.warn( String.format( "Error copying store from: %s. Will retry shortly.", upstream ) );
return;
}

Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import org.neo4j.helpers.AdvertisedSocketAddress;

public interface CatchupServerAddress
{
AdvertisedSocketAddress getCatchupServer();
}
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery;

import java.util.Optional;

import org.neo4j.causalclustering.identity.MemberId;

public class ClusterTopology
{
private final CoreTopology coreTopology;
private final ReadReplicaTopology readReplicaTopology;

public ClusterTopology( CoreTopology coreTopology, ReadReplicaTopology readReplicaTopology )
{
this.coreTopology = coreTopology;
this.readReplicaTopology = readReplicaTopology;
}

public Optional<CatchupServerAddress> find( MemberId upstream )
{
Optional<CoreAddresses> coreAddresses = coreTopology.find( upstream );
Optional<ReadReplicaAddresses> readReplicaAddresses = readReplicaTopology.find( upstream );

if ( coreAddresses.isPresent() )
{
return Optional.of( coreAddresses.get() );
}
else if ( readReplicaAddresses.isPresent() )
{
return Optional.of( readReplicaAddresses.get() );
}
else
{
return Optional.empty();
}
}
}
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.helpers.AdvertisedSocketAddress;

public class CoreAddresses implements ClientConnector
public class CoreAddresses implements CatchupServerAddress, ClientConnector
{
private final AdvertisedSocketAddress raftServer;
private final AdvertisedSocketAddress catchupServer;
Expand All @@ -40,6 +40,7 @@ public AdvertisedSocketAddress getRaftServer()
return raftServer;
}

@Override
public AdvertisedSocketAddress getCatchupServer()
{
return catchupServer;
Expand Down
Expand Up @@ -21,10 +21,8 @@

import org.neo4j.causalclustering.identity.ClusterId;

public interface CoreTopologyService extends TopologyService
public interface CoreTopologyService extends ReadReplicaTopologyService
{
ReadReplicaTopology readReplicas();

void addCoreTopologyListener( Listener listener );

/**
Expand Down
Expand Up @@ -30,6 +30,7 @@ public interface DiscoveryServiceFactory
CoreTopologyService coreTopologyService( Config config, MemberId myself, JobScheduler jobScheduler,
LogProvider logProvider, LogProvider userLogProvider );

TopologyService readReplicaDiscoveryService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate );
ReadReplicaTopologyService readReplicaTopologyService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout,
long readReplicaRefreshRate, MemberId myself );
}

0 comments on commit 962ebee

Please sign in to comment.