Skip to content

Commit

Permalink
Use upstream strategy for store copy
Browse files Browse the repository at this point in the history
* Move upstream strategies to a separate package that is accessible for
 both cores and read replicas.

* Read replicas are bound to use upstream strategies for any store copy

* Cores will use leader for the primary address and upstream strategy
for the secondary address.

* Store copy client now uses secondary for individual files and index
requests.

* Check active connection before sending on CatchupClient
  • Loading branch information
RagnarW committed Mar 22, 2018
1 parent 9fde820 commit 1cb430b
Show file tree
Hide file tree
Showing 51 changed files with 713 additions and 411 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -40,6 +41,7 @@
import org.neo4j.causalclustering.core.SupportedProtocolCreator;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol.ApplicationProtocols;
Expand Down Expand Up @@ -112,7 +114,9 @@ private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config c
{
CatchUpClient catchUpClient = new CatchUpClient( logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, channelInitializer( config ) );
TxPullClient txPullClient = new TxPullClient( catchUpClient, monitors );
StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider );
ExponentialBackoffStrategy backOffStrategy =
new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS );
StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider, backOffStrategy );

RemoteStore remoteStore = new RemoteStore(
logProvider, fileSystemAbstraction, pageCache, storeCopyClient,
Expand Down
Expand Up @@ -39,10 +39,6 @@
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.IpFamily;
import org.neo4j.causalclustering.discovery.SharedDiscoveryServiceFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.impl.store.format.standard.Standard;
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.rule.SuppressOutput;
Expand Down Expand Up @@ -76,14 +72,14 @@ public class ClusterSeedingIT
private File baseBackupDir;

@Parameterized.Parameters( name = "{0}" )
public static Object[][] data() throws Exception
public static Object[][] data()
{
return new Object[][]{{new NoStore(), true}, {new EmptyBackupStore(), false}, {new BackupStoreWithSomeData(), false},
{new BackupStoreWithSomeDataButNoTransactionLogs(), false}};
}

@Before
public void setup() throws Exception
public void setup()
{
this.fileCopyDetector = new FileCopyDetector();
backupCluster = new Cluster( testDir.directory( "cluster-for-backup" ), 3, 0,
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.catchup;

import java.net.ConnectException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -43,27 +44,51 @@ class CatchUpChannelPool<CHANNEL extends CatchUpChannelPool.Channel>
this.factory = factory;
}

CHANNEL acquire( AdvertisedSocketAddress catchUpAddress )
CHANNEL acquire( AdvertisedSocketAddress catchUpAddress ) throws Exception
{
CHANNEL channel = getIdleChannel( catchUpAddress );

if ( channel == null )
{
channel = factory.apply( catchUpAddress );
try
{
channel.connect();
assertActive( channel, catchUpAddress );
}
catch ( Exception e )
{
channel.close();
throw e;
}
}

addActiveChannel( channel );

return channel;
}

private void assertActive( CHANNEL channel, AdvertisedSocketAddress address ) throws ConnectException
{
if ( !channel.isActive() )
{
throw new ConnectException( "Unable to connect to " + address );
}
}

private synchronized CHANNEL getIdleChannel( AdvertisedSocketAddress catchUpAddress )
{
CHANNEL channel = null;
LinkedList<CHANNEL> channels = idleChannels.get( catchUpAddress );
if ( channels != null )
{
channel = channels.poll();
while ( (channel = channels.poll()) != null )
{
if ( channel.isActive() )
{
break;
}
}
if ( channels.isEmpty() )
{
idleChannels.remove( catchUpAddress );
Expand Down Expand Up @@ -116,6 +141,10 @@ interface Channel
{
AdvertisedSocketAddress destination();

void connect() throws Exception;

boolean isActive();

void close();
}
}
Expand Up @@ -22,15 +22,19 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.IOException;
import java.net.ConnectException;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.BiConsumer;

import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
Expand Down Expand Up @@ -68,56 +72,79 @@ public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpReque
{
CompletableFuture<T> future = new CompletableFuture<>();

CatchUpChannel channel = pool.acquire( upstream );

future.whenComplete( ( result, e ) ->
CatchUpChannel channel = null;
try
{
if ( e == null )
{
pool.release( channel );
}
else
channel = pool.acquire( upstream );
channel.setResponseHandler( responseHandler, future );
future.whenComplete( new ReleaseOnComplete( channel ) );
channel.send( request );
}
catch ( Exception e )
{
if ( channel != null )
{
pool.dispose( channel );
}
} );
throw new CatchUpClientException( "Failed to send request", e );
}
String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream );
return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
}

channel.setResponseHandler( responseHandler, future );
channel.send( request );
private class ReleaseOnComplete implements BiConsumer<Object,Throwable>
{
private CatchUpChannel catchUpChannel;

String operation = format( "Completed exceptionally when executing operation %s on %s ", request, upstream );
ReleaseOnComplete( CatchUpChannel catchUpChannel )
{
this.catchUpChannel = catchUpChannel;
}

return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis, log );
@Override
public void accept( Object o, Throwable throwable )
{
if ( throwable == null )
{
pool.release( catchUpChannel );
}
else
{
pool.dispose( catchUpChannel );
}
}
}

private class CatchUpChannel implements CatchUpChannelPool.Channel
{
private final TrackingResponseHandler handler;
private final AdvertisedSocketAddress destination;
private Channel nettyChannel;
private final Bootstrap bootstrap;

CatchUpChannel( AdvertisedSocketAddress destination )
{
this.destination = destination;
handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock );
Bootstrap bootstrap = new Bootstrap()
bootstrap = new Bootstrap()
.group( eventLoopGroup )
.channel( NioSocketChannel.class )
.handler( channelInitializer.apply( handler ) );

ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() );
nettyChannel = channelFuture.awaitUninterruptibly().channel();
}

void setResponseHandler( CatchUpResponseCallback responseHandler, CompletableFuture<?> requestOutcomeSignal )
{
handler.setResponseHandler( responseHandler, requestOutcomeSignal );
}

void send( CatchUpRequest request )
void send( CatchUpRequest request ) throws ConnectException
{
if ( !isActive() )
{
throw new ConnectException( "Channel is not connected" );
}
nettyChannel.write( request.messageType() );
nettyChannel.writeAndFlush( request );
nettyChannel.writeAndFlush( request ).addListener( ChannelFutureListener.CLOSE_ON_FAILURE );
}

Optional<Long> millisSinceLastResponse()
Expand All @@ -131,10 +158,26 @@ public AdvertisedSocketAddress destination()
return destination;
}

@Override
public void connect() throws Exception
{
ChannelFuture channelFuture = bootstrap.connect( destination.socketAddress() );
nettyChannel = channelFuture.sync().channel();
}

@Override
public boolean isActive()
{
return nettyChannel.isActive();
}

@Override
public void close()
{
nettyChannel.close();
if ( nettyChannel != null )
{
nettyChannel.close();
}
}
}

Expand Down

0 comments on commit 1cb430b

Please sign in to comment.