Skip to content

Commit

Permalink
Simplified handling of ComExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
lutovich committed Feb 5, 2015
1 parent e92e078 commit 3ff5b7f
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 76 deletions.
17 changes: 6 additions & 11 deletions enterprise/com/src/main/java/org/neo4j/com/Client.java
Expand Up @@ -26,9 +26,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -85,7 +83,7 @@ public abstract class Client<T> extends LifecycleAdapter implements ChannelPipel
private final int maxUnusedChannels;
private final StoreId storeId;
private ResourceReleaser resourcePoolReleaser;
private final List<ComExceptionHandler> comExceptionHandlers;
private ComExceptionHandler comExceptionHandler;

private final RequestMonitor requestMonitor;

Expand All @@ -104,7 +102,7 @@ public Client( String hostNameOrIp, int port, Logging logging, Monitors monitors
this.readTimeout = readTimeout;
// ResourcePool no longer controls max concurrent channels. Use this value for the pool size
this.maxUnusedChannels = maxConcurrentChannels;
this.comExceptionHandlers = new ArrayList<ComExceptionHandler>( 2 );
this.comExceptionHandler = ComExceptionHandler.NO_OP;
this.address = new InetSocketAddress( hostNameOrIp, port );
this.protocol = new Protocol( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() );

Expand Down Expand Up @@ -189,7 +187,7 @@ public void stop()
bootstrap.releaseExternalResources();
bossExecutor.shutdownNow();
workerExecutor.shutdownNow();
comExceptionHandlers.clear();
comExceptionHandler = ComExceptionHandler.NO_OP;
msgLog.logMessage( toString() + " shutdown", true );
}

Expand Down Expand Up @@ -249,10 +247,7 @@ protected <R> Response<R> sendRequest( RequestType<T> type, RequestContext conte
{
failure = e;
success = false;
for ( ComExceptionHandler handler : comExceptionHandlers )
{
handler.handle( e );
}
comExceptionHandler.handle( e );
throw e;
}
catch ( Throwable e )
Expand Down Expand Up @@ -335,9 +330,9 @@ public ChannelPipeline getPipeline() throws Exception
return pipeline;
}

public void addComExceptionHandler( ComExceptionHandler handler )
public void setComExceptionHandler( ComExceptionHandler handler )
{
comExceptionHandlers.add( handler );
comExceptionHandler = (handler == null) ? ComExceptionHandler.NO_OP : handler;
}

protected byte getInternalProtocolVersion()
Expand Down
Expand Up @@ -21,5 +21,13 @@

public interface ComExceptionHandler
{
static ComExceptionHandler NO_OP = new ComExceptionHandler()
{
@Override
public void handle( ComException exception )
{
}
};

void handle( ComException exception );
}
Expand Up @@ -406,7 +406,7 @@ public Response<?> answer( InvocationOnMock _ ) throws ComException

life.add( builder.server( communication ) );
MadeUpClient client = life.add( builder.client() );
client.addComExceptionHandler( handler );
client.setComExceptionHandler( handler );

life.start();

Expand Down
Expand Up @@ -85,6 +85,8 @@
import org.neo4j.kernel.ha.com.master.DefaultSlaveFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.master.Slaves;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.ha.com.slave.MasterClientResolver;
import org.neo4j.kernel.ha.id.HaIdGeneratorFactory;
import org.neo4j.kernel.ha.lock.LockManagerModeSwitcher;
import org.neo4j.kernel.ha.management.ClusterDatabaseInfoProvider;
Expand Down Expand Up @@ -486,12 +488,33 @@ protected IdGeneratorFactory createIdGeneratorFactory()
{
idGeneratorFactory = new HaIdGeneratorFactory( masterDelegateInvocationHandler, logging,
requestContextFactory );
highAvailabilityModeSwitcher =
new HighAvailabilityModeSwitcher( new SwitchToSlave(logging.getConsoleLog( HighAvailabilityModeSwitcher.class ), config, getDependencyResolver(), (HaIdGeneratorFactory) idGeneratorFactory,
logging, masterDelegateInvocationHandler, clusterMemberAvailability, requestContextFactory, clusterClient ),
new SwitchToMaster( logging, msgLog, this,
(HaIdGeneratorFactory) idGeneratorFactory, config, getDependencyResolver(), masterDelegateInvocationHandler, clusterMemberAvailability ),
clusterClient, clusterMemberAvailability, logging.getMessagesLog( HighAvailabilityModeSwitcher.class ));

InvalidEpochExceptionHandler invalidEpochHandler = new InvalidEpochExceptionHandler()
{
@Override
public void handle()
{
highAvailabilityModeSwitcher.forceElections();
}
};

MasterClientResolver masterClientResolver = new MasterClientResolver( logging, invalidEpochHandler,
config.get( HaSettings.read_timeout ).intValue(),
config.get( HaSettings.lock_read_timeout ).intValue(),
config.get( HaSettings.max_concurrent_channels_per_slave ),
config.get( HaSettings.com_chunk_size ).intValue() );

SwitchToSlave switchToSlave = new SwitchToSlave( logging.getConsoleLog( HighAvailabilityModeSwitcher.class ),
config, getDependencyResolver(), (HaIdGeneratorFactory) idGeneratorFactory, logging,
masterDelegateInvocationHandler, clusterMemberAvailability, requestContextFactory,
masterClientResolver );

SwitchToMaster switchToMaster = new SwitchToMaster( logging, msgLog, this,
(HaIdGeneratorFactory) idGeneratorFactory, config, getDependencyResolver(),
masterDelegateInvocationHandler, clusterMemberAvailability );

highAvailabilityModeSwitcher = new HighAvailabilityModeSwitcher( switchToSlave, switchToMaster, clusterClient,
clusterMemberAvailability, logging.getMessagesLog( HighAvailabilityModeSwitcher.class ) );

clusterClient.addBindingListener( highAvailabilityModeSwitcher );
memberStateMachine.addHighAvailabilityMemberListener( highAvailabilityModeSwitcher );
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.cluster.BindingListener;
Expand Down Expand Up @@ -81,6 +82,7 @@ public static InstanceId getServerId( URI haUri )
private volatile URI me;
private volatile Future<?> modeSwitcherFuture;
private volatile HighAvailabilityMemberState currentTargetState;
private final AtomicBoolean canAskForElections = new AtomicBoolean( true );

public HighAvailabilityModeSwitcher( SwitchToSlave switchToSlave,
SwitchToMaster switchToMaster,
Expand All @@ -105,7 +107,7 @@ public void listeningAt( URI myUri )
@Override
public synchronized void init() throws Throwable
{
modeSwitcherExecutor = Executors.newSingleThreadScheduledExecutor( named( "HA Mode switcher" ) );
modeSwitcherExecutor = createExecutor();

haCommunicationLife.init();
}
Expand Down Expand Up @@ -203,6 +205,15 @@ public void instanceStops( HighAvailabilityMemberChangeEvent event )
}
}

public void forceElections()
{
if ( canAskForElections.compareAndSet( true, false ) )
{
clusterMemberAvailability.memberIsUnavailable( HighAvailabilityModeSwitcher.SLAVE );
election.performRoleElections();
}
}

private void stateChanged( HighAvailabilityMemberChangeEvent event ) throws ExecutionException, InterruptedException
{
availableMasterId = event.getServerHaUri();
Expand Down Expand Up @@ -286,6 +297,7 @@ public void run()
try
{
masterHaURI = switchToMaster.switchToMaster( haCommunicationLife, me, cancellationHandle );
canAskForElections.set( true );
}
catch ( Throwable e )
{
Expand Down Expand Up @@ -346,6 +358,7 @@ public void run()
else
{
slaveHaURI = resultingSlaveHaURI;
canAskForElections.set( true );
}
}
catch ( MismatchingStoreIdException e )
Expand Down Expand Up @@ -396,6 +409,11 @@ private synchronized void startModeSwitching( Runnable switcher, CancellationHan
modeSwitcherFuture = modeSwitcherExecutor.submit( switcher );
}

ScheduledExecutorService createExecutor()
{
return Executors.newSingleThreadScheduledExecutor( named( "HA Mode switcher" ) );
}

private static class CancellationHandle implements CancellationRequest
{
private volatile boolean cancelled = false;
Expand Down
Expand Up @@ -105,14 +105,12 @@ public class SwitchToSlave
private final DelegateInvocationHandler<Master> masterDelegateHandler;
private final ClusterMemberAvailability clusterMemberAvailability;
private final RequestContextFactory requestContextFactory;
private final ClusterClient clusterClient;

private MasterClientResolver masterClientResolver;
private final MasterClientResolver masterClientResolver;

public SwitchToSlave( ConsoleLogger console, Config config, DependencyResolver resolver, HaIdGeneratorFactory
idGeneratorFactory, Logging logging, DelegateInvocationHandler<Master> masterDelegateHandler,
ClusterMemberAvailability clusterMemberAvailability, RequestContextFactory
requestContextFactory, ClusterClient clusterClient )
ClusterMemberAvailability clusterMemberAvailability, RequestContextFactory requestContextFactory,
MasterClientResolver masterClientResolver )
{
this.console = console;
this.config = config;
Expand All @@ -123,7 +121,7 @@ public SwitchToSlave( ConsoleLogger console, Config config, DependencyResolver r
this.requestContextFactory = requestContextFactory;
this.msgLog = logging.getMessagesLog( getClass() );
this.masterDelegateHandler = masterDelegateHandler;
this.clusterClient = clusterClient;
this.masterClientResolver = masterClientResolver;
}

/**
Expand All @@ -146,13 +144,6 @@ public URI switchToSlave( LifeSupport haCommunicationLife, URI me, URI masterUri

assert masterUri != null; // since we are here it must already have been set from outside

this.masterClientResolver = new MasterClientResolver( logging, clusterClient, clusterMemberAvailability, msgLog,
config.get( HaSettings.read_timeout ).intValue(),
config.get( HaSettings.lock_read_timeout ).intValue(),
config.get( HaSettings.max_concurrent_channels_per_slave ).intValue(),
config.get( HaSettings.com_chunk_size ).intValue() );


HaXaDataSourceManager xaDataSourceManager = resolver.resolveDependency(
HaXaDataSourceManager.class );
idGeneratorFactory.switchToSlave();
Expand Down
@@ -0,0 +1,25 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha.com.slave;

public interface InvalidEpochExceptionHandler
{
void handle();
}
Expand Up @@ -93,5 +93,5 @@ public Response<Long> commitSingleResourceTransaction( RequestContext context, f
public Response<Void> copyTransactions( RequestContext context, final String ds, final long startTxId,
final long endTxId );

public void addComExceptionHandler( ComExceptionHandler handler );
public void setComExceptionHandler( ComExceptionHandler handler );
}
Expand Up @@ -22,21 +22,18 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.cluster.client.ClusterClient;
import org.neo4j.cluster.member.ClusterMemberAvailability;
import org.neo4j.com.ComException;
import org.neo4j.com.ComExceptionHandler;
import org.neo4j.com.IllegalProtocolVersionException;
import org.neo4j.kernel.ha.MasterClient196;
import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher;
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.impl.nioneo.store.StoreId;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.logging.Logging;
import org.neo4j.kernel.monitoring.Monitors;

public class MasterClientResolver implements MasterClientFactory
public class MasterClientResolver implements MasterClientFactory, ComExceptionHandler
{
private volatile MasterClientFactory currentFactory;
private volatile ProtocolVersionCombo currentVersion;
Expand All @@ -45,8 +42,7 @@ public class MasterClientResolver implements MasterClientFactory
private final Map<ProtocolVersionCombo,MasterClientFactory> protocolToFactoryMapping;
private final StringLogger log;

private final ClusterClient clusterClient;
private final ClusterMemberAvailability clusterMemberAvailability;
private final InvalidEpochExceptionHandler invalidEpochHandler;

@Override
public MasterClient instantiate( String hostNameOrIp, int port, Monitors monitors, StoreId storeId, LifeSupport life )
Expand All @@ -57,11 +53,28 @@ public MasterClient instantiate( String hostNameOrIp, int port, Monitors monitor
}

MasterClient result = currentFactory.instantiate( hostNameOrIp, port, monitors, storeId, life );
result.addComExceptionHandler( new MismatchingProtocolVersionHandler() );
result.addComExceptionHandler( new InvalidEpochHandler() );
result.setComExceptionHandler( this );
return result;
}

@Override
public void handle( ComException exception )
{
if ( exception instanceof IllegalProtocolVersionException )
{
log.info( "Handling " + exception + ", will pick new master client" );

IllegalProtocolVersionException illegalProtocolVersion = (IllegalProtocolVersionException) exception;
getFor( illegalProtocolVersion.getReceived(), 2 );
}
else if ( exception instanceof InvalidEpochException )
{
log.info( "Handling " + exception + ", will go to PENDING and ask for election" );

invalidEpochHandler.handle();
}
}

private static final class ProtocolVersionCombo implements Comparable<ProtocolVersionCombo>
{
final int applicationProtocol;
Expand Down Expand Up @@ -105,13 +118,11 @@ public int compareTo( ProtocolVersionCombo o )
static final ProtocolVersionCombo PC_196 = new ProtocolVersionCombo( 7, 2 );
}

public MasterClientResolver( Logging logging, ClusterClient clusterClient,
ClusterMemberAvailability clusterMemberAvailability, StringLogger msgLog,
public MasterClientResolver( Logging logging, InvalidEpochExceptionHandler invalidEpochHandler,
int readTimeout, int lockReadTimeout, int channels, int chunkSize )
{
this.log = msgLog;
this.clusterClient = clusterClient;
this.clusterMemberAvailability = clusterMemberAvailability;
this.log = logging.getMessagesLog( getClass() );
this.invalidEpochHandler = invalidEpochHandler;
protocolToFactoryMapping = new HashMap<ProtocolVersionCombo, MasterClientFactory>();
protocolToFactoryMapping.put( ProtocolVersionCombo.PC_18, new F18( logging, readTimeout, lockReadTimeout,
channels, chunkSize ) );
Expand Down Expand Up @@ -195,36 +206,6 @@ public MasterClient instantiate( String hostNameOrIp, int port, Monitors monitor
}
}

private class MismatchingProtocolVersionHandler implements ComExceptionHandler
{
@Override
public void handle( ComException exception )
{
if ( exception instanceof IllegalProtocolVersionException )
{
log.info( "Handling " + exception + ", will pick new master client" );

IllegalProtocolVersionException illegalProtocolVersion = (IllegalProtocolVersionException) exception;
getFor( illegalProtocolVersion.getReceived(), 2 );
}
}
}

private class InvalidEpochHandler implements ComExceptionHandler
{
@Override
public void handle( ComException exception )
{
if ( exception instanceof InvalidEpochException )
{
log.info( "Handling " + exception + ", will go to PENDING and ask for election" );

clusterMemberAvailability.memberIsUnavailable( HighAvailabilityModeSwitcher.SLAVE );
clusterClient.performRoleElections();
}
}
}

public void enableDowngradeBarrier()
{
downgradeForbidden = true;
Expand Down

0 comments on commit 3ff5b7f

Please sign in to comment.