diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java index 1e6d5476d509d..758d7ea37ee8e 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java @@ -73,6 +73,7 @@ import org.neo4j.kernel.ha.cluster.SwitchToSlave; import org.neo4j.kernel.ha.cluster.member.ClusterMembers; import org.neo4j.kernel.ha.cluster.member.HighAvailabilitySlaves; +import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.DefaultSlaveFactory; import org.neo4j.kernel.ha.com.master.Master; @@ -370,11 +371,16 @@ public void elected( String role, InstanceId instanceId, URI electedMember ) clusterEventsDelegateInvocationHandler.setDelegate( localClusterEvents ); clusterMemberAvailabilityDelegateInvocationHandler.setDelegate( localClusterMemberAvailability ); - members = dependencies.satisfyDependency( new ClusterMembers( clusterClient, clusterClient, clusterEvents, - config.get( ClusterSettings.server_id ) ) ); - memberStateMachine = paxosLife.add( new HighAvailabilityMemberStateMachine( memberContext, availabilityGuard, - members, clusterEvents, clusterClient, logging.getMessagesLog( HighAvailabilityMemberStateMachine.class - ) ) ); + ObservedClusterMembers observedMembers = new ObservedClusterMembers( logging, clusterClient, clusterClient, + clusterEvents, config.get( ClusterSettings.server_id ) ); + + HighAvailabilityMemberStateMachine stateMachine = new HighAvailabilityMemberStateMachine( memberContext, + availabilityGuard, observedMembers, clusterEvents, clusterClient, + logging.getMessagesLog( HighAvailabilityMemberStateMachine.class ) ); + + members = dependencies.satisfyDependency( new ClusterMembers( observedMembers, stateMachine ) ); + + memberStateMachine = paxosLife.add( stateMachine ); HighAvailabilityConsoleLogger highAvailabilityConsoleLogger = new HighAvailabilityConsoleLogger( logging .getConsoleLog( HighAvailabilityConsoleLogger.class ), config.get( ClusterSettings @@ -714,12 +720,12 @@ public HighAvailabilityMemberState getInstanceState() public String role() { - return members.getSelf().getHARole(); + return members.getCurrentMemberRole(); } public boolean isMaster() { - return memberStateMachine.getCurrentState() == HighAvailabilityMemberState.MASTER; + return HighAvailabilityModeSwitcher.MASTER.equals( role() ); } @Override diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java index f58bfd7e0a2d0..f488f176a6d5b 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java @@ -28,7 +28,7 @@ import org.neo4j.helpers.Listeners; import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.AvailabilityGuard; -import org.neo4j.kernel.ha.cluster.member.ClusterMembers; +import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers; import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -48,7 +48,7 @@ public class HighAvailabilityMemberStateMachine extends LifecycleAdapter impleme private final AvailabilityGuard availabilityGuard; private final ClusterMemberEvents events; private final StringLogger logger; - private final ClusterMembers members; + private final ObservedClusterMembers observedMembers; private final Election election; private Iterable memberListeners = Listeners.newListeners(); @@ -57,12 +57,14 @@ public class HighAvailabilityMemberStateMachine extends LifecycleAdapter impleme public HighAvailabilityMemberStateMachine( HighAvailabilityMemberContext context, AvailabilityGuard availabilityGuard, - ClusterMembers members, ClusterMemberEvents events, Election election, + ObservedClusterMembers observedMembers, + ClusterMemberEvents events, + Election election, StringLogger logger ) { this.context = context; this.availabilityGuard = availabilityGuard; - this.members = members; + this.observedMembers = observedMembers; this.events = events; this.election = election; this.logger = logger; @@ -337,12 +339,12 @@ public void notify( HighAvailabilityMemberListener listener ) private long getAliveCount() { - return Iterables.count( Iterables.filter( ClusterMembers.ALIVE, members.getMembers() ) ); + return Iterables.count( observedMembers.getAliveMembers() ); } private long getTotalCount() { - return Iterables.count( members.getMembers() ); + return Iterables.count( observedMembers.getMembers() ); } } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ClusterMembers.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ClusterMembers.java index 5f77d9b34e75c..a0588d7434d45 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ClusterMembers.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ClusterMembers.java @@ -19,40 +19,22 @@ */ package org.neo4j.kernel.ha.cluster.member; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - import org.neo4j.cluster.InstanceId; -import org.neo4j.cluster.member.ClusterMemberEvents; -import org.neo4j.cluster.member.ClusterMemberListener; -import org.neo4j.cluster.protocol.cluster.Cluster; -import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; -import org.neo4j.cluster.protocol.cluster.ClusterListener; -import org.neo4j.cluster.protocol.heartbeat.Heartbeat; -import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener; +import org.neo4j.function.Function; import org.neo4j.helpers.Predicate; +import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState; +import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine; import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher; -import org.neo4j.kernel.impl.store.StoreId; -import org.neo4j.kernel.impl.util.CopyOnWriteHashMap; /** - * Keeps an up to date list of members, their roles and availability for - * display for example in JMX. + * Keeps a list of members, their roles and availability for display for example in JMX or REST. + *

+ * Member state info is based on {@link ObservedClusterMembers} and {@link HighAvailabilityMemberStateMachine}. + * State of the current member is always valid, all other instances are only 'best effort'. */ public class ClusterMembers { - public static final Predicate ALIVE = new Predicate() - { - @Override - public boolean accept( ClusterMember item ) - { - return item.isAlive(); - } - }; - - private final InstanceId me; - public static Predicate inRole( final String role ) { return new Predicate() @@ -77,161 +59,70 @@ public boolean accept( ClusterMember item ) }; } - private final Map members = new CopyOnWriteHashMap<>(); - - public ClusterMembers( Cluster cluster, Heartbeat heartbeat, ClusterMemberEvents events, InstanceId me ) - { - this.me = me; - cluster.addClusterListener( new HAMClusterListener() ); - heartbeat.addHeartbeatListener( new HAMHeartbeatListener() ); - events.addClusterMemberListener( new HAMClusterMemberListener() ); - } + private final ObservedClusterMembers observedClusterMembers; + private final HighAvailabilityMemberStateMachine stateMachine; - public Iterable getMembers() + public ClusterMembers( ObservedClusterMembers observedClusterMembers, + HighAvailabilityMemberStateMachine stateMachine ) { - return members.values(); + this.observedClusterMembers = observedClusterMembers; + this.stateMachine = stateMachine; } - public ClusterMember getSelf() + public ClusterMember getCurrentMember() { - for ( ClusterMember clusterMember : getMembers() ) + ClusterMember currentMember = observedClusterMembers.getCurrentMember(); + if ( currentMember == null ) { - if ( clusterMember.getInstanceId().equals( me ) ) - { - return clusterMember; - } + return null; } - return null; - } - - public synchronized void waitForEvent( long timeout ) throws InterruptedException - { - wait( timeout ); + String currentRole = roleOf( stateMachine.getCurrentState() ); + return currentMember.availableAs( currentRole, currentMember.getHAUri(), currentMember.getStoreId() ); } - private synchronized void eventOccurred() + public String getCurrentMemberRole() { - notifyAll(); + ClusterMember currentMember = getCurrentMember(); + return (currentMember == null) ? HighAvailabilityModeSwitcher.UNKNOWN : currentMember.getHARole(); } - private ClusterMember getMember( InstanceId server ) + public Iterable getMembers() { - ClusterMember clusterMember = members.get( server ); - if ( clusterMember == null ) - { - throw new IllegalStateException( "Member " + server + " not found in " + new HashMap<>( members ) ); - } - return clusterMember; + return getActualMembers( observedClusterMembers.getMembers() ); } - private class HAMClusterListener extends ClusterListener.Adapter + public Iterable getAliveMembers() { - @Override - public void enteredCluster( ClusterConfiguration configuration ) - { - Map newMembers = new HashMap<>(); - for ( InstanceId memberClusterId : configuration.getMemberIds() ) - { - newMembers.put( memberClusterId, new ClusterMember( memberClusterId ) ); - } - members.clear(); - members.putAll( newMembers ); - } - - @Override - public void leftCluster() - { - members.clear(); - } - - @Override - public void joinedCluster( InstanceId member, URI memberUri ) - { - members.put( member, new ClusterMember( member ) ); - } - - @Override - public void leftCluster( InstanceId instanceId, URI member ) - { - members.remove( instanceId ); - } + return getActualMembers( observedClusterMembers.getAliveMembers() ); } - private class HAMClusterMemberListener extends ClusterMemberListener.Adapter + private Iterable getActualMembers( Iterable members ) { - private InstanceId masterId = null; - - @Override - public void coordinatorIsElected( InstanceId coordinatorId ) - { - if ( coordinatorId.equals( this.masterId ) ) - { - return; - } - this.masterId = coordinatorId; - Map newMembers = new HashMap<>(); - for ( Map.Entry memberEntry : members.entrySet() ) - { - newMembers.put( memberEntry.getKey(), memberEntry.getValue().unavailableAs( - HighAvailabilityModeSwitcher.MASTER ).unavailableAs( HighAvailabilityModeSwitcher.SLAVE ) ); - } - members.clear(); - members.putAll( newMembers ); - } - - @Override - public void memberIsAvailable( String role, InstanceId instanceId, URI roleUri, StoreId storeId ) - { - members.put( instanceId, getMember( instanceId ).availableAs( role, roleUri, storeId ) ); - eventOccurred(); - } - - @Override - public void memberIsUnavailable( String role, InstanceId unavailableId ) + final ClusterMember currentMember = getCurrentMember(); + if ( currentMember == null ) { - ClusterMember member; - try - { - member = getMember( unavailableId ); - members.put( unavailableId, member.unavailableAs( role ) ); - } - catch ( IllegalStateException e ) - { - // Unknown member - } + return members; } - - @Override - public void memberIsFailed( InstanceId instanceId ) + return Iterables.map( new Function() { - // Make it unavailable for all its current roles - ClusterMember member = getMember( instanceId ); - for ( String role : member.getRoles() ) + @Override + public ClusterMember apply( ClusterMember member ) throws RuntimeException { - member = member.unavailableAs( role ); // ClusterMember is copy-on-write + return currentMember.getInstanceId().equals( member.getInstanceId() ) ? currentMember : member; } - members.put( instanceId, member ); // replace with the new copy - } + }, members ); } - private class HAMHeartbeatListener extends HeartbeatListener.Adapter + private static String roleOf( HighAvailabilityMemberState state ) { - @Override - public void failed( InstanceId server ) + switch ( state ) { - if ( members.containsKey( server ) ) - { - members.put( server, getMember( server ).failed() ); - } - } - - @Override - public void alive( InstanceId server ) - { - if ( members.containsKey( server ) ) - { - members.put( server, getMember( server ).alive() ); - } + case MASTER: + return HighAvailabilityModeSwitcher.MASTER; + case SLAVE: + return HighAvailabilityModeSwitcher.SLAVE; + default: + return HighAvailabilityModeSwitcher.UNKNOWN; } } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java index 1cc01c6f4e3c3..6b8f4bf7f34d8 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java @@ -89,9 +89,8 @@ public Iterable getSlaves() // Return all cluster members which are currently SLAVEs, // are alive, and convert to Slave with a cache if possible return map( withDefaults( slaveForMember(), Functions.map( slaves ) ), - filter( ClusterMembers.ALIVE, filter( inRole( HighAvailabilityModeSwitcher.SLAVE ), - clusterMembers.getMembers() ) ) ); + clusterMembers.getAliveMembers() ) ); } @Override diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembers.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembers.java new file mode 100644 index 0000000000000..9c9d044119d6e --- /dev/null +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembers.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.ha.cluster.member; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.neo4j.cluster.InstanceId; +import org.neo4j.cluster.member.ClusterMemberEvents; +import org.neo4j.cluster.member.ClusterMemberListener; +import org.neo4j.cluster.protocol.cluster.Cluster; +import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; +import org.neo4j.cluster.protocol.cluster.ClusterListener; +import org.neo4j.cluster.protocol.heartbeat.Heartbeat; +import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener; +import org.neo4j.helpers.Predicate; +import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher; +import org.neo4j.kernel.impl.store.StoreId; +import org.neo4j.kernel.impl.util.CopyOnWriteHashMap; +import org.neo4j.kernel.impl.util.StringLogger; +import org.neo4j.kernel.logging.Logging; + +/** + * Keeps list of members, their roles and availability. + * List is based on different notifications about cluster and HA events. + * List is basically a 'best guess' of the cluster state because message ordering is not guaranteed. + * This class should be used only when imprecise state is acceptable. + *

+ * For up-to-date cluster state use {@link ClusterMembers}. + * + * @see ClusterMembers + */ +public class ObservedClusterMembers +{ + private static final Predicate ALIVE = new Predicate() + { + @Override + public boolean accept( ClusterMember item ) + { + return item.isAlive(); + } + }; + + private final StringLogger log; + private final InstanceId me; + private final Map members = new CopyOnWriteHashMap<>(); + + public ObservedClusterMembers( Logging logging, Cluster cluster, Heartbeat heartbeat, ClusterMemberEvents events, + InstanceId me ) + { + this.me = me; + this.log = logging.getMessagesLog( getClass() ); + cluster.addClusterListener( new HAMClusterListener() ); + heartbeat.addHeartbeatListener( new HAMHeartbeatListener() ); + events.addClusterMemberListener( new HAMClusterMemberListener() ); + } + + public Iterable getMembers() + { + return members.values(); + } + + public Iterable getAliveMembers() + { + return Iterables.filter( ALIVE, members.values() ); + } + + public ClusterMember getCurrentMember() + { + for ( ClusterMember clusterMember : getMembers() ) + { + if ( clusterMember.getInstanceId().equals( me ) ) + { + return clusterMember; + } + } + return null; + } + + private ClusterMember getMember( InstanceId server ) + { + ClusterMember clusterMember = members.get( server ); + if ( clusterMember == null ) + { + throw new IllegalStateException( "Member " + server + " not found in " + new HashMap<>( members ) ); + } + return clusterMember; + } + + private class HAMClusterListener extends ClusterListener.Adapter + { + @Override + public void enteredCluster( ClusterConfiguration configuration ) + { + Map newMembers = new HashMap<>(); + for ( InstanceId memberClusterId : configuration.getMemberIds() ) + { + newMembers.put( memberClusterId, new ClusterMember( memberClusterId ) ); + } + members.clear(); + members.putAll( newMembers ); + } + + @Override + public void leftCluster() + { + members.clear(); + } + + @Override + public void joinedCluster( InstanceId member, URI memberUri ) + { + members.put( member, new ClusterMember( member ) ); + } + + @Override + public void leftCluster( InstanceId instanceId, URI member ) + { + members.remove( instanceId ); + } + } + + private class HAMClusterMemberListener extends ClusterMemberListener.Adapter + { + private InstanceId masterId = null; + + @Override + public void coordinatorIsElected( InstanceId coordinatorId ) + { + if ( coordinatorId.equals( this.masterId ) ) + { + return; + } + this.masterId = coordinatorId; + Map newMembers = new HashMap<>(); + for ( Map.Entry memberEntry : members.entrySet() ) + { + newMembers.put( memberEntry.getKey(), memberEntry.getValue().unavailableAs( + HighAvailabilityModeSwitcher.MASTER ).unavailableAs( HighAvailabilityModeSwitcher.SLAVE ) ); + } + members.clear(); + members.putAll( newMembers ); + } + + @Override + public void memberIsAvailable( String role, InstanceId instanceId, URI roleUri, StoreId storeId ) + { + members.put( instanceId, getMember( instanceId ).availableAs( role, roleUri, storeId ) ); + } + + @Override + public void memberIsUnavailable( String role, InstanceId unavailableId ) + { + ClusterMember member; + try + { + member = getMember( unavailableId ); + members.put( unavailableId, member.unavailableAs( role ) ); + } + catch ( IllegalStateException e ) + { + log.warn( "Unknown member with id '" + unavailableId + "' reported unavailable as '" + role + "'" ); + } + } + + @Override + public void memberIsFailed( InstanceId instanceId ) + { + // Make it unavailable for all its current roles + ClusterMember member = getMember( instanceId ); + for ( String role : member.getRoles() ) + { + member = member.unavailableAs( role ); + } + members.put( instanceId, member ); + } + } + + private class HAMHeartbeatListener extends HeartbeatListener.Adapter + { + @Override + public void failed( InstanceId server ) + { + if ( members.containsKey( server ) ) + { + members.put( server, getMember( server ).failed() ); + } + } + + @Override + public void alive( InstanceId server ) + { + if ( members.containsKey( server ) ) + { + members.put( server, getMember( server ).alive() ); + } + } + } +} diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/management/ClusterDatabaseInfoProvider.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/management/ClusterDatabaseInfoProvider.java index 8b921bf3c0b20..208bd41fad42f 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/management/ClusterDatabaseInfoProvider.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/management/ClusterDatabaseInfoProvider.java @@ -44,14 +44,16 @@ public ClusterDatabaseInfoProvider( ClusterMembers members, LastTxIdGetter txIdG public ClusterDatabaseInfo getInfo() { - ClusterMember self = members.getSelf(); - if (self == null) + ClusterMember currentMember = members.getCurrentMember(); + if (currentMember == null) + { return null; + } - return new ClusterDatabaseInfo( new ClusterMemberInfo( self.getInstanceId().toString(), self.getHAUri() != null, - true, self.getHARole(), - Iterables.toArray(String.class, Iterables.map( Functions.TO_STRING, self.getRoleURIs() ) ), - Iterables.toArray(String.class, Iterables.map( Functions.TO_STRING, self.getRoles() ) ) ), + return new ClusterDatabaseInfo( new ClusterMemberInfo( currentMember.getInstanceId().toString(), + currentMember.getHAUri() != null, true, currentMember.getHARole(), + Iterables.toArray(String.class, Iterables.map( Functions.TO_STRING, currentMember.getRoleURIs() ) ), + Iterables.toArray(String.class, Iterables.map( Functions.TO_STRING, currentMember.getRoles() ) ) ), txIdGetter.getLastTxId(), lastUpdateTime.getLastUpdateTime() ); } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/impl/ha/ClusterManager.java b/enterprise/ha/src/main/java/org/neo4j/kernel/impl/ha/ClusterManager.java index 929f9b394deef..f66d3517faffa 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/impl/ha/ClusterManager.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/impl/ha/ClusterManager.java @@ -40,7 +40,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -75,6 +74,7 @@ import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher; import org.neo4j.kernel.ha.cluster.member.ClusterMember; import org.neo4j.kernel.ha.cluster.member.ClusterMembers; +import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers; import org.neo4j.kernel.ha.com.master.Slaves; import org.neo4j.kernel.impl.transaction.log.LogRotationControl; import org.neo4j.kernel.impl.util.StringLogger; @@ -89,7 +89,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; - import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.io.fs.FileUtils.copyRecursively; @@ -435,7 +434,7 @@ public boolean accept( ManagedCluster cluster ) } } - for ( ClusterMembers clusterMembers : cluster.getArbiters() ) + for ( ObservedClusterMembers clusterMembers : cluster.getArbiters() ) { if ( count( clusterMembers.getMembers() ) < nrOfMembers ) { @@ -790,7 +789,7 @@ public class ManagedCluster extends LifecycleAdapter private final Clusters.Cluster spec; private final String name; private final Map members = new ConcurrentHashMap<>(); - private final List arbiters = new ArrayList<>(); + private final List arbiters = new ArrayList<>(); ManagedCluster( Clusters.Cluster spec ) throws URISyntaxException, IOException { @@ -842,7 +841,7 @@ public HighlyAvailableGraphDatabase apply( HighlyAvailableGraphDatabaseProxy fro }, members.values() ); } - public Iterable getArbiters() + public Iterable getArbiters() { return arbiters; } @@ -1028,20 +1027,21 @@ public void stop() throws Throwable clientLogging, new NotElectableElectionCredentialsProvider(), objectStreamFactory, objectStreamFactory ); - arbiters.add( new ClusterMembers( clusterClient, clusterClient, new ClusterMemberEvents() - { - @Override - public void addClusterMemberListener( ClusterMemberListener listener ) - { - // noop - } - - @Override - public void removeClusterMemberListener( ClusterMemberListener listener ) - { - // noop - } - }, clusterClient.getServerId() ) ); + arbiters.add( new ObservedClusterMembers( clientLogging, clusterClient, clusterClient, + new ClusterMemberEvents() + { + @Override + public void addClusterMemberListener( ClusterMemberListener listener ) + { + // noop + } + + @Override + public void removeClusterMemberListener( ClusterMemberListener listener ) + { + // noop + } + }, clusterClient.getServerId() ) ); life.add( new FutureLifecycleAdapter<>( clusterClient ) ); } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachineTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachineTest.java index db086ff64697f..221673a07ebf7 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachineTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachineTest.java @@ -29,8 +29,6 @@ import java.net.URI; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -58,6 +56,7 @@ import org.neo4j.kernel.ha.UpdatePuller; import org.neo4j.kernel.ha.cluster.member.ClusterMember; import org.neo4j.kernel.ha.cluster.member.ClusterMembers; +import org.neo4j.kernel.ha.cluster.member.ObservedClusterMembers; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.HandshakeResult; import org.neo4j.kernel.ha.com.master.Master; @@ -104,7 +103,6 @@ public void shouldStartFromPending() throws Exception assertThat( memberStateMachine.getCurrentState(), equalTo( HighAvailabilityMemberState.PENDING ) ); } - @Test public void shouldMoveToToMasterFromPendingOnMasterElectedForItself() throws Throwable { @@ -258,7 +256,7 @@ public void whenInMasterStateLosingQuorumShouldPutInPending() throws Throwable HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); - ClusterMembers members = mockClusterMembers( me, other ); + ObservedClusterMembers members = mockClusterMembers( me, other ); ClusterMemberEvents events = mock( ClusterMemberEvents.class ); ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events ); @@ -293,7 +291,7 @@ public void whenInSlaveStateLosingQuorumShouldPutInPending() throws Throwable InstanceId other = new InstanceId( 2 ); HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); - ClusterMembers members = mockClusterMembers( me, other ); + ObservedClusterMembers members = mockClusterMembers( me, other ); ClusterMemberEvents events = mock( ClusterMemberEvents.class ); ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events ); @@ -328,7 +326,7 @@ public void whenInToMasterStateLosingQuorumShouldPutInPending() throws Throwable InstanceId other = new InstanceId( 2 ); HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); - ClusterMembers members = mockClusterMembers( me, other ); + ObservedClusterMembers members = mockClusterMembers( me, other ); ClusterMemberEvents events = mock( ClusterMemberEvents.class ); ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events ); @@ -362,7 +360,7 @@ public void whenInToSlaveStateLosingQuorumShouldPutInPending() throws Throwable InstanceId other = new InstanceId( 2 ); HighAvailabilityMemberContext context = new SimpleHighAvailabilityMemberContext( me, false ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); - ClusterMembers members = mockClusterMembers( me, other ); + ObservedClusterMembers members = mockClusterMembers( me, other ); ClusterMemberEvents events = mock( ClusterMemberEvents.class ); ClusterMemberListenerContainer memberListenerContainer = mockAddClusterMemberListener( events ); @@ -417,13 +415,15 @@ public void whenHAModeSwitcherSwitchesToSlaveTheOtherModeSwitcherDoNotGetTheOldM HighAvailabilityMemberContext context = mock( HighAvailabilityMemberContext.class ); when( context.getMyId() ).thenReturn( me ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); - ClusterMembers members = mock( ClusterMembers.class ); + ObservedClusterMembers members = mock( ObservedClusterMembers.class ); ClusterMember masterMember = mock( ClusterMember.class ); when( masterMember.getHARole() ).thenReturn( "master" ); when( masterMember.hasRole( "master" ) ).thenReturn( true ); when( masterMember.getInstanceId() ).thenReturn( new InstanceId( 2 ) ); when( masterMember.getStoreId() ).thenReturn( storeId ); - when( members.getMembers() ).thenReturn( Arrays.asList( new ClusterMember( me ), masterMember ) ); + ClusterMember self = new ClusterMember( me ); + when( members.getMembers() ).thenReturn( Arrays.asList( self, masterMember ) ); + when( members.getCurrentMember() ).thenReturn( self ); DependencyResolver dependencyResolver = mock( DependencyResolver.class ); FileSystemAbstraction fs = mock( FileSystemAbstraction.class ); when( fs.fileExists( any( File.class ) ) ).thenReturn( true ); @@ -435,7 +435,7 @@ public void whenHAModeSwitcherSwitchesToSlaveTheOtherModeSwitcherDoNotGetTheOldM when( dependencyResolver.resolveDependency( NeoStoreDataSource.class ) ).thenReturn( dataSource ); when( dependencyResolver.resolveDependency( TransactionIdStore.class ) ). thenReturn( new DeadSimpleTransactionIdStore() ); - when( dependencyResolver.resolveDependency( ClusterMembers.class ) ).thenReturn( members ); + when( dependencyResolver.resolveDependency( ObservedClusterMembers.class ) ).thenReturn( members ); UpdatePuller updatePuller = mock( UpdatePuller.class ); when( updatePuller.tryPullUpdates() ).thenReturn( true ); when( dependencyResolver.resolveDependency( UpdatePuller.class ) ).thenReturn( updatePuller ); @@ -449,6 +449,9 @@ public void whenHAModeSwitcherSwitchesToSlaveTheOtherModeSwitcherDoNotGetTheOldM HighAvailabilityMemberStateMachine stateMachine = new HighAvailabilityMemberStateMachine( context, guard, members, events, election, logger ); + ClusterMembers clusterMembers = new ClusterMembers( members, stateMachine ); + when( dependencyResolver.resolveDependency( ClusterMembers.class ) ).thenReturn( clusterMembers ); + stateMachine.init(); stateMachine.start(); @@ -561,10 +564,9 @@ protected Object getMasterImpl( LifeSupport life ) // Then latch.await(); assertTrue( "mode switch failed", switchedSuccessfully.get() ); - Master expected = masterClient; Master actual = ref.get(); // let's test the toString()s since there are too many wrappers of proxies - assertEquals( expected.toString(), actual.toString() ); + assertEquals( masterClient.toString(), actual.toString() ); stateMachine.stop(); stateMachine.shutdown(); @@ -574,18 +576,20 @@ protected Object getMasterImpl( LifeSupport life ) otherModeSwitcher.shutdown(); } - private ClusterMembers mockClusterMembers( InstanceId me, InstanceId other ) + private ObservedClusterMembers mockClusterMembers( InstanceId me, InstanceId other ) { - ClusterMembers members = mock( ClusterMembers.class ); - List membersList = new LinkedList<>(); + ObservedClusterMembers members = mock( ObservedClusterMembers.class ); + // we cannot set outside of the package the isAlive to return false. So do it with a mock - ClusterMember otherMemberMock = mock( ClusterMember.class ); - when( otherMemberMock.getInstanceId() ).thenReturn( other ); - when( otherMemberMock.isAlive() ).thenReturn( false ); - membersList.add( otherMemberMock ); + ClusterMember otherMember = mock( ClusterMember.class ); + when( otherMember.getInstanceId() ).thenReturn( other ); + when( otherMember.isAlive() ).thenReturn( false ); + + ClusterMember thisMember = new ClusterMember( me ); + + when( members.getMembers() ).thenReturn( Arrays.asList( otherMember, thisMember ) ); + when( members.getAliveMembers() ).thenReturn( Collections.singleton( thisMember ) ); - membersList.add( new ClusterMember( me ) ); - when( members.getMembers() ).thenReturn( membersList ); return members; } @@ -617,7 +621,7 @@ private HighAvailabilityMemberStateMachine buildMockedStateMachine ( HighAvailab } private HighAvailabilityMemberStateMachine buildMockedStateMachine( HighAvailabilityMemberContext context, - ClusterMemberEvents events, ClusterMembers clusterMembers, AvailabilityGuard guard ) + ClusterMemberEvents events, ObservedClusterMembers clusterMembers, AvailabilityGuard guard ) { return new StateMachineBuilder().withContext( context ).withEvents( events ).withClusterMembers( clusterMembers ).withGuard( guard ).build(); @@ -627,7 +631,7 @@ private class StateMachineBuilder { HighAvailabilityMemberContext context = mock( HighAvailabilityMemberContext.class ); ClusterMemberEvents events = mock( ClusterMemberEvents.class ); - ClusterMembers clusterMembers = mock( ClusterMembers.class ); + ObservedClusterMembers clusterMembers = mock( ObservedClusterMembers.class ); AvailabilityGuard guard = mock( AvailabilityGuard.class ); Election election = mock( Election.class ); StringLogger logger = mock( StringLogger.class ); @@ -644,7 +648,7 @@ public StateMachineBuilder withEvents(ClusterMemberEvents events) return this; } - public StateMachineBuilder withClusterMembers(ClusterMembers clusterMember) + public StateMachineBuilder withClusterMembers(ObservedClusterMembers clusterMember) { this.clusterMembers = clusterMember; return this; diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMembersTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMembersTest.java index 9215c188035a6..9383457ffec1a 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMembersTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMembersTest.java @@ -20,432 +20,108 @@ package org.neo4j.kernel.ha.cluster.member; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; -import org.neo4j.backup.OnlineBackupKernelExtension; import org.neo4j.cluster.InstanceId; -import org.neo4j.cluster.member.ClusterMemberEvents; -import org.neo4j.cluster.member.ClusterMemberListener; -import org.neo4j.cluster.protocol.cluster.Cluster; -import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; -import org.neo4j.cluster.protocol.cluster.ClusterListener; -import org.neo4j.cluster.protocol.heartbeat.Heartbeat; -import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener; import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState; +import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine; +import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher; import org.neo4j.kernel.impl.store.StoreId; -import org.neo4j.kernel.impl.util.StringLogger; -import static java.net.URI.create; -import static java.util.Arrays.asList; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.hasItems; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.neo4j.helpers.collection.Iterables.count; -import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.MASTER; -import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.SLAVE; -import static org.neo4j.kernel.ha.cluster.member.ClusterMemberMatcher.sameMemberAs; +import static org.mockito.Mockito.when; public class ClusterMembersTest { - private static InstanceId clusterId1 = new InstanceId( 1 ); - private static InstanceId clusterId2 = new InstanceId( 2 ); - private static InstanceId clusterId3 = new InstanceId( 3 ); - private static URI clusterUri1 = create( "cluster://server1" ); - private static URI clusterUri2 = create( "cluster://server2" ); - private static URI clusterUri3 = create( "cluster://server3" ); - private static URI haUri1 = create( "ha://server1?serverId="+clusterId1.toIntegerIndex() ); + private final ObservedClusterMembers observedClusterMembers = mock( ObservedClusterMembers.class ); + private final HighAvailabilityMemberStateMachine stateMachine = mock( HighAvailabilityMemberStateMachine.class ); + private final ClusterMembers clusterMembers = new ClusterMembers( observedClusterMembers, stateMachine ); @Test - public void shouldRegisterItselfOnListeners() throws Exception + public void currentInstanceStateUpdated() { - // given - Cluster cluster = mock( Cluster.class ); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); + ClusterMember currentInstance = createClusterMember( 1, HighAvailabilityModeSwitcher.UNKNOWN ); - // when - new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); + when( observedClusterMembers.getAliveMembers() ).thenReturn( Collections.singletonList( currentInstance ) ); + when( observedClusterMembers.getCurrentMember() ).thenReturn( currentInstance ); + when( stateMachine.getCurrentState() ).thenReturn( HighAvailabilityMemberState.MASTER ); - // then - verify( cluster ).addClusterListener( Mockito.any() ); - verify( heartbeat ).addHeartbeatListener( Mockito.any() ); - verify( clusterMemberEvents ).addClusterMemberListener( Mockito.any() ); + ClusterMember self = clusterMembers.getCurrentMember(); + assertEquals( HighAvailabilityModeSwitcher.MASTER, self.getHARole() ); } @Test - public void shouldContainMemberListAfterEnteringCluster() throws Exception + public void aliveMembersWithValidCurrentInstanceState() { - // given - Cluster cluster = mock( Cluster.class ); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); + ClusterMember currentInstance = createClusterMember( 1, HighAvailabilityModeSwitcher.UNKNOWN ); + ClusterMember otherInstance = createClusterMember( 2, HighAvailabilityModeSwitcher.SLAVE ); + List members = Arrays.asList( currentInstance, otherInstance ); - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); + when( observedClusterMembers.getAliveMembers() ).thenReturn( members ); + when( observedClusterMembers.getCurrentMember() ).thenReturn( currentInstance ); + when( stateMachine.getCurrentState() ).thenReturn( HighAvailabilityMemberState.MASTER ); - // when - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + Iterable currentMembers = clusterMembers.getAliveMembers(); - // then - assertThat( members.getMembers(), hasItems( - sameMemberAs( new ClusterMember( clusterId1 ) ), - sameMemberAs( new ClusterMember( clusterId2 ) ), - sameMemberAs( new ClusterMember( clusterId3 ) ) )); + assertEquals( "Only active members should be available", 2, Iterables.count( currentMembers ) ); + assertEquals( 1, countInstancesWithRole( currentMembers, HighAvailabilityModeSwitcher.MASTER ) ); + assertEquals( 1, countInstancesWithRole( currentMembers, HighAvailabilityModeSwitcher.SLAVE ) ); } @Test - public void joinedMemberShowsInList() throws Exception + public void observedStateDoesNotKnowCurrentInstance() { - // given - Cluster cluster = mock( Cluster.class ); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); + ClusterMember currentInstance = createClusterMember( 1, HighAvailabilityModeSwitcher.SLAVE ); + ClusterMember otherInstance = createClusterMember( 2, HighAvailabilityModeSwitcher.MASTER ); + List members = Arrays.asList( currentInstance, otherInstance ); - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); + when( observedClusterMembers.getMembers() ).thenReturn( members ); + when( observedClusterMembers.getCurrentMember() ).thenReturn( null ); + when( stateMachine.getCurrentState() ).thenReturn( HighAvailabilityMemberState.SLAVE ); - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) ); - - // when - listener.getValue().joinedCluster( clusterId3, clusterUri3 ); - - // then - assertThat( members.getMembers(), hasItems( - sameMemberAs( new ClusterMember( clusterId1 ) ), - sameMemberAs( new ClusterMember( clusterId2 ) ), - sameMemberAs( new ClusterMember( clusterId3 ) ) ) ); - } - - @Test - public void iCanGetToMyself() throws Exception - { - // given - Cluster cluster = mock( Cluster.class ); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, clusterId1 ); - - // when - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) ); - - ClusterMember me = members.getSelf(); - assertNotNull( me ); - assertEquals( 1, me.getInstanceId().toIntegerIndex() ); - assertEquals( clusterId1, me.getInstanceId() ); - } - - @Test - public void leftMemberDisappearsFromList() throws Exception - { - // given - Cluster cluster = mock( Cluster.class ); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - // when - listener.getValue().leftCluster( clusterId3, clusterUri3 ); - - // then - assertThat( - members.getMembers(), - not( hasItems( sameMemberAs( new ClusterMember( clusterId3 ) ) ) )); - } - - @Test - public void availableMasterShowsProperInformation() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // when - clusterMemberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); - - // then - assertThat( - members.getMembers(), - hasItem( sameMemberAs( new ClusterMember( clusterId1 ).availableAs( - MASTER, haUri1, StoreId.DEFAULT ) ) ) ); - } - - @Test - public void availableSlaveShowsProperInformation() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // when - clusterMemberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); - - // then - assertThat( - members.getMembers(), - hasItem( sameMemberAs( new ClusterMember( - clusterId1 ).availableAs( SLAVE, haUri1, StoreId.DEFAULT ) ) ) ); - } - - @Test - public void membersShowsAsUnavailableWhenNewMasterElectedBeforeTheyBecomeAvailable() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - clusterMemberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); - - // when - clusterMemberListener.getValue().coordinatorIsElected( clusterId2 ); - - // then - assertThat( - members.getMembers(), - hasItem( sameMemberAs( new ClusterMember( clusterId1 ) ) ) ); - } - - @Test - public void failedMemberShowsAsSuch() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor heartBeatListener = ArgumentCaptor.forClass( HeartbeatListener.class ); - verify( heartbeat ).addHeartbeatListener( heartBeatListener.capture() ); - - // when - heartBeatListener.getValue().failed( clusterId1); - - // then - assertThat( - members.getMembers(), - hasItem( sameMemberAs( new ClusterMember( - clusterId1 ).failed() ) ) ); + assertNull( clusterMembers.getCurrentMember() ); + assertEquals( members, clusterMembers.getMembers() ); } @Test - public void failedThenAliveMemberShowsAsAlive() throws Exception + public void incorrectlyObservedCurrentInstanceStateUpdated() { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, null ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + ClusterMember currentInstance = createClusterMember( 1, HighAvailabilityModeSwitcher.SLAVE ); + ClusterMember otherInstance = createClusterMember( 2, HighAvailabilityModeSwitcher.MASTER ); + List members = Arrays.asList( currentInstance, otherInstance ); - ArgumentCaptor heartBeatListener = ArgumentCaptor.forClass( HeartbeatListener.class ); - verify( heartbeat ).addHeartbeatListener( heartBeatListener.capture() ); + when( observedClusterMembers.getMembers() ).thenReturn( members ); + when( observedClusterMembers.getCurrentMember() ).thenReturn( currentInstance ); + when( stateMachine.getCurrentState() ).thenReturn( HighAvailabilityMemberState.MASTER ); - // when - heartBeatListener.getValue().failed( clusterId1 ); - heartBeatListener.getValue().alive( clusterId1 ); + Iterable currentMembers = clusterMembers.getMembers(); - // then - assertThat( - members.getMembers(), - hasItem( sameMemberAs( new ClusterMember( clusterId1 ) ) ) ); + assertEquals( "All members should be available", 2, Iterables.count( currentMembers ) ); + assertEquals( 2, countInstancesWithRole( currentMembers, HighAvailabilityModeSwitcher.MASTER ) ); } - @Test - public void missingMasterUnavailabilityEventDoesNotClobberState() throws Exception + private static int countInstancesWithRole( Iterable currentMembers, String role ) { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, clusterId1 ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // when - // first we are available as slaves - clusterMemberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); - // and then for some reason as master, without an unavailable message in between - clusterMemberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); - - // then - assertThat( members.getSelf().getHARole(), equalTo( MASTER ) ); - } - - @Test - public void missingSlaveUnavailabilityEventDoesNotClobberState() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, clusterId1 ); - - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // when - // first we are available as master - clusterMemberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); - // and then for some reason as slave, without an unavailable message in between - clusterMemberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); - - // then - assertThat( members.getSelf().getHARole(), equalTo( SLAVE ) ); - } - - @Test - public void missingMasterUnavailabilityEventForOtherInstanceStillRemovesBackupRole() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, clusterId1 ); - // initialized with the members of the cluster - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // instance 2 is available as MASTER and BACKUP - clusterMemberListener.getValue().memberIsAvailable( - OnlineBackupKernelExtension.BACKUP, clusterId2, clusterUri2, StoreId.DEFAULT ); - clusterMemberListener.getValue().memberIsAvailable( MASTER, clusterId2, clusterUri2, StoreId.DEFAULT ); - - // when - instance 2 becomes available as SLAVE - clusterMemberListener.getValue().memberIsAvailable( SLAVE, clusterId2, clusterUri2, StoreId.DEFAULT ); - - // then - instance 2 should be available ONLY as SLAVE - for ( ClusterMember clusterMember : members.getMembers() ) - { - if ( clusterMember.getInstanceId().equals( clusterId2 ) ) - { - assertThat( count( clusterMember.getRoles() ), equalTo( 1l ) ); - assertThat( Iterables.single( clusterMember.getRoles() ), equalTo( SLAVE ) ); - break; // that's the only member we care about - } - } - } - - @Test - public void receivingInstanceFailureEventRemovesAllRolesForIt() throws Exception - { - // given - Cluster cluster = mock(Cluster.class); - Heartbeat heartbeat = mock( Heartbeat.class ); - ClusterMemberEvents clusterMemberEvents = mock(ClusterMemberEvents.class); - - ClusterMembers members = new ClusterMembers( cluster, heartbeat, clusterMemberEvents, clusterId1 ); - // initialized with the members of the cluster - ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); - verify( cluster ).addClusterListener( listener.capture() ); - listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); - - ArgumentCaptor clusterMemberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); - verify( clusterMemberEvents ).addClusterMemberListener( clusterMemberListener.capture() ); - - // instance 2 is available as MASTER and BACKUP - clusterMemberListener.getValue().memberIsAvailable( - OnlineBackupKernelExtension.BACKUP, clusterId2, clusterUri2, StoreId.DEFAULT ); - clusterMemberListener.getValue().memberIsAvailable( MASTER, clusterId2, clusterUri2, StoreId.DEFAULT ); - - // when - instance 2 becomes failed - clusterMemberListener.getValue().memberIsFailed( clusterId2 ); - - // then - instance 2 should not be available as any roles - for ( ClusterMember clusterMember : members.getMembers() ) + int counter = 0; + for ( ClusterMember clusterMember : currentMembers ) { - if ( clusterMember.getInstanceId().equals( clusterId2 ) ) + if ( role.equals( clusterMember.getHARole() ) ) { - assertThat( count( clusterMember.getRoles() ), equalTo( 0l ) ); - break; // that's the only member we care about + counter++; } } + return counter; } - private ClusterConfiguration clusterConfiguration( URI... uris ) + private static ClusterMember createClusterMember( int id, String role ) { - ClusterConfiguration toReturn = new ClusterConfiguration( "neo4j.ha", StringLogger.SYSTEM, asList( uris ) ); - toReturn.joined( clusterId1, clusterUri1 ); - toReturn.joined( clusterId2, clusterUri2 ); - if ( uris.length == 3 ) - { - toReturn.joined( clusterId3, clusterUri3 ); - } - return toReturn; + ClusterMember member = new ClusterMember( new InstanceId( id ) ); + return member.availableAs( role, null, StoreId.DEFAULT ); } } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java index dc1483e2345f4..cea9a236f3a74 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java @@ -87,28 +87,7 @@ public void shouldNotReturnUnavailableSlaves() // given Cluster cluster = mock( Cluster.class ); ClusterMembers clusterMembers = mock( ClusterMembers.class ); - when( clusterMembers.getMembers() ).thenReturn( Iterables.option( new ClusterMember( INSTANCE_ID ) ) ); - - SlaveFactory slaveFactory = mock( SlaveFactory.class ); - - HighAvailabilitySlaves slaves = new HighAvailabilitySlaves( clusterMembers, cluster, slaveFactory ); - slaves.init(); - - // when - Iterable memberSlaves = slaves.getSlaves(); - - // then - assertThat( count( memberSlaves ), equalTo( 0L ) ); - } - - @Test - public void shouldNotReturnAvailableButFailedSlaves() - { - // given - Cluster cluster = mock( Cluster.class ); - ClusterMembers clusterMembers = mock( ClusterMembers.class ); - when( clusterMembers.getMembers() ).thenReturn( Iterables.option( - new ClusterMember( INSTANCE_ID ).availableAs( SLAVE, HA_URI, StoreId.DEFAULT ).failed() ) ); + when( clusterMembers.getAliveMembers() ).thenReturn( Iterables.option( new ClusterMember( INSTANCE_ID ) ) ); SlaveFactory slaveFactory = mock( SlaveFactory.class ); @@ -128,7 +107,7 @@ public void shouldReturnAvailableAndAliveSlaves() // given Cluster cluster = mock( Cluster.class ); ClusterMembers clusterMembers = mock( ClusterMembers.class ); - when( clusterMembers.getMembers() ).thenReturn( Iterables.option( + when( clusterMembers.getAliveMembers() ).thenReturn( Iterables.option( new ClusterMember( INSTANCE_ID ).availableAs( SLAVE, HA_URI, StoreId.DEFAULT ) ) ); SlaveFactory slaveFactory = mock( SlaveFactory.class ); @@ -151,7 +130,7 @@ public void shouldClearSlavesWhenNewMasterElected() // given Cluster cluster = mock( Cluster.class ); ClusterMembers clusterMembers = mock( ClusterMembers.class ); - when( clusterMembers.getMembers() ).thenReturn( Iterables.option( + when( clusterMembers.getAliveMembers() ).thenReturn( Iterables.option( new ClusterMember( INSTANCE_ID ).availableAs( SLAVE, HA_URI, StoreId.DEFAULT ) ) ); SlaveFactory slaveFactory = mock( SlaveFactory.class ); @@ -214,7 +193,7 @@ private static ClusterMembers clusterMembersOfSize( int size ) } ClusterMembers clusterMembers = mock( ClusterMembers.class ); - when( clusterMembers.getMembers() ).thenReturn( members ); + when( clusterMembers.getAliveMembers() ).thenReturn( members ); return clusterMembers; } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembersTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembersTest.java new file mode 100644 index 0000000000000..633f9e37510ae --- /dev/null +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ObservedClusterMembersTest.java @@ -0,0 +1,459 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.ha.cluster.member; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.net.URI; + +import org.neo4j.backup.OnlineBackupKernelExtension; +import org.neo4j.cluster.InstanceId; +import org.neo4j.cluster.member.ClusterMemberEvents; +import org.neo4j.cluster.member.ClusterMemberListener; +import org.neo4j.cluster.protocol.cluster.Cluster; +import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; +import org.neo4j.cluster.protocol.cluster.ClusterListener; +import org.neo4j.cluster.protocol.heartbeat.Heartbeat; +import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener; +import org.neo4j.helpers.collection.Iterables; +import org.neo4j.kernel.impl.store.StoreId; +import org.neo4j.kernel.impl.util.StringLogger; +import org.neo4j.kernel.logging.DevNullLoggingService; +import org.neo4j.kernel.logging.Logging; + +import static java.net.URI.create; +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.MASTER; +import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.SLAVE; +import static org.neo4j.kernel.ha.cluster.member.ClusterMemberMatcher.sameMemberAs; + +public class ObservedClusterMembersTest +{ + private static final Logging logging = new DevNullLoggingService(); + private static final InstanceId clusterId1 = new InstanceId( 1 ); + private static final InstanceId clusterId2 = new InstanceId( 2 ); + private static final InstanceId clusterId3 = new InstanceId( 3 ); + private static final URI clusterUri1 = create( "cluster://server1" ); + private static final URI clusterUri2 = create( "cluster://server2" ); + private static final URI clusterUri3 = create( "cluster://server3" ); + private static final URI haUri1 = create( "ha://server1?serverId=" + clusterId1.toIntegerIndex() ); + + @Test + public void shouldRegisterItselfOnListeners() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents clusterMemberEvents = mock( ClusterMemberEvents.class ); + + // when + new ObservedClusterMembers( logging, cluster, heartbeat, clusterMemberEvents, null ); + + // then + verify( cluster ).addClusterListener( Mockito.any() ); + verify( heartbeat ).addHeartbeatListener( Mockito.any() ); + verify( clusterMemberEvents ).addClusterMemberListener( Mockito.any() ); + } + + @Test + public void shouldContainMemberListAfterEnteringCluster() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + // when + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + // then + assertThat( members.getMembers(), hasItems( + sameMemberAs( new ClusterMember( clusterId1 ) ), + sameMemberAs( new ClusterMember( clusterId2 ) ), + sameMemberAs( new ClusterMember( clusterId3 ) ) ) ); + } + + @Test + public void joinedMemberShowsInList() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) ); + + // when + listener.getValue().joinedCluster( clusterId3, clusterUri3 ); + + // then + assertThat( members.getMembers(), hasItems( + sameMemberAs( new ClusterMember( clusterId1 ) ), + sameMemberAs( new ClusterMember( clusterId2 ) ), + sameMemberAs( new ClusterMember( clusterId3 ) ) ) ); + } + + @Test + public void iCanGetToMyself() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, + clusterId1 ); + + // when + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) ); + + ClusterMember me = members.getCurrentMember(); + assertNotNull( me ); + assertEquals( 1, me.getInstanceId().toIntegerIndex() ); + assertEquals( clusterId1, me.getInstanceId() ); + } + + @Test + public void leftMemberDisappearsFromList() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + // when + listener.getValue().leftCluster( clusterId3, clusterUri3 ); + + // then + assertThat( + members.getMembers(), + not( hasItems( sameMemberAs( new ClusterMember( clusterId3 ) ) ) ) ); + } + + @Test + public void availableMasterShowsProperInformation() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // when + memberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); + + // then + assertThat( + members.getMembers(), + hasItem( sameMemberAs( new ClusterMember( clusterId1 ).availableAs( + MASTER, haUri1, StoreId.DEFAULT ) ) ) ); + } + + @Test + public void availableSlaveShowsProperInformation() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // when + memberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); + + // then + assertThat( + members.getMembers(), + hasItem( sameMemberAs( new ClusterMember( + clusterId1 ).availableAs( SLAVE, haUri1, StoreId.DEFAULT ) ) ) ); + } + + @Test + public void membersShowsAsUnavailableWhenNewMasterElectedBeforeTheyBecomeAvailable() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + memberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); + + // when + memberListener.getValue().coordinatorIsElected( clusterId2 ); + + // then + assertThat( + members.getMembers(), + hasItem( sameMemberAs( new ClusterMember( clusterId1 ) ) ) ); + } + + @Test + public void failedMemberShowsAsSuch() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor heartBeatListener = ArgumentCaptor.forClass( HeartbeatListener.class ); + verify( heartbeat ).addHeartbeatListener( heartBeatListener.capture() ); + + // when + heartBeatListener.getValue().failed( clusterId1 ); + + // then + assertThat( + members.getMembers(), + hasItem( sameMemberAs( new ClusterMember( + clusterId1 ).failed() ) ) ); + } + + @Test + public void failedThenAliveMemberShowsAsAlive() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, null ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor heartBeatListener = ArgumentCaptor.forClass( HeartbeatListener.class ); + verify( heartbeat ).addHeartbeatListener( heartBeatListener.capture() ); + + // when + heartBeatListener.getValue().failed( clusterId1 ); + heartBeatListener.getValue().alive( clusterId1 ); + + // then + assertThat( + members.getMembers(), + hasItem( sameMemberAs( new ClusterMember( clusterId1 ) ) ) ); + } + + @Test + public void missingMasterUnavailabilityEventDoesNotClobberState() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, + clusterId1 ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // when + // first we are available as slaves + memberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); + // and then for some reason as master, without an unavailable message in between + memberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); + + // then + assertThat( members.getCurrentMember().getHARole(), equalTo( MASTER ) ); + } + + @Test + public void missingSlaveUnavailabilityEventDoesNotClobberState() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, + clusterId1 ); + + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // when + // first we are available as master + memberListener.getValue().memberIsAvailable( MASTER, clusterId1, haUri1, StoreId.DEFAULT ); + // and then for some reason as slave, without an unavailable message in between + memberListener.getValue().memberIsAvailable( SLAVE, clusterId1, haUri1, StoreId.DEFAULT ); + + // then + assertThat( members.getCurrentMember().getHARole(), equalTo( SLAVE ) ); + } + + @Test + public void missingMasterUnavailabilityEventForOtherInstanceStillRemovesBackupRole() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, + clusterId1 ); + // initialized with the members of the cluster + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // instance 2 is available as MASTER and BACKUP + memberListener.getValue().memberIsAvailable( + OnlineBackupKernelExtension.BACKUP, clusterId2, clusterUri2, StoreId.DEFAULT ); + memberListener.getValue().memberIsAvailable( MASTER, clusterId2, clusterUri2, StoreId.DEFAULT ); + + // when - instance 2 becomes available as SLAVE + memberListener.getValue().memberIsAvailable( SLAVE, clusterId2, clusterUri2, StoreId.DEFAULT ); + + // then - instance 2 should be available ONLY as SLAVE + for ( ClusterMember clusterMember : members.getMembers() ) + { + if ( clusterMember.getInstanceId().equals( clusterId2 ) ) + { + assertThat( count( clusterMember.getRoles() ), equalTo( 1l ) ); + assertThat( Iterables.single( clusterMember.getRoles() ), equalTo( SLAVE ) ); + break; // that's the only member we care about + } + } + } + + @Test + public void receivingInstanceFailureEventRemovesAllRolesForIt() throws Exception + { + // given + Cluster cluster = mock( Cluster.class ); + Heartbeat heartbeat = mock( Heartbeat.class ); + ClusterMemberEvents memberEvents = mock( ClusterMemberEvents.class ); + + ObservedClusterMembers members = new ObservedClusterMembers( logging, cluster, heartbeat, memberEvents, + clusterId1 ); + // initialized with the members of the cluster + ArgumentCaptor listener = ArgumentCaptor.forClass( ClusterListener.class ); + verify( cluster ).addClusterListener( listener.capture() ); + listener.getValue().enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) ); + + ArgumentCaptor memberListener = ArgumentCaptor.forClass( ClusterMemberListener.class ); + verify( memberEvents ).addClusterMemberListener( memberListener.capture() ); + + // instance 2 is available as MASTER and BACKUP + memberListener.getValue().memberIsAvailable( + OnlineBackupKernelExtension.BACKUP, clusterId2, clusterUri2, StoreId.DEFAULT ); + memberListener.getValue().memberIsAvailable( MASTER, clusterId2, clusterUri2, StoreId.DEFAULT ); + + // when - instance 2 becomes failed + memberListener.getValue().memberIsFailed( clusterId2 ); + + // then - instance 2 should not be available as any roles + for ( ClusterMember clusterMember : members.getMembers() ) + { + if ( clusterMember.getInstanceId().equals( clusterId2 ) ) + { + assertThat( count( clusterMember.getRoles() ), equalTo( 0L ) ); + break; // that's the only member we care about + } + } + } + + private ClusterConfiguration clusterConfiguration( URI... uris ) + { + ClusterConfiguration toReturn = new ClusterConfiguration( "neo4j.ha", StringLogger.SYSTEM, asList( uris ) ); + toReturn.joined( clusterId1, clusterUri1 ); + toReturn.joined( clusterId2, clusterUri2 ); + if ( uris.length == 3 ) + { + toReturn.joined( clusterId3, clusterUri3 ); + } + return toReturn; + } +}