Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Separated HighAvailabilityMembers into HighAvailabilitySlaves / Members

and introduced abstractions along the way to be able to write mocked
tests.
  • Loading branch information...
commit 2ccaa35b39604fc0ecd4f4b062c6de4a3c8b4b78 1 parent 104c941
@tinwelint tinwelint authored
Showing with 1,975 additions and 490 deletions.
  1. +3 −3 advanced/management/src/main/java/org/neo4j/management/ClusterDatabaseInfo.java
  2. +10 −2 advanced/management/src/main/java/org/neo4j/management/ClusterMemberInfo.java
  3. +3 −0  advanced/management/src/main/java/org/neo4j/management/HighAvailability.java
  4. +27 −0 enterprise/cluster/src/main/java/org/neo4j/cluster/Binding.java
  5. +42 −0 enterprise/cluster/src/main/java/org/neo4j/cluster/ClusterMonitor.java
  6. +11 −3 enterprise/cluster/src/main/java/org/neo4j/cluster/client/ClusterClient.java
  7. +1 −2  enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterConfiguration.java
  8. +1 −1  enterprise/com/src/main/java/org/neo4j/com/Client.java
  9. +6 −4 enterprise/ha/src/main/java/org/neo4j/kernel/HighlyAvailableKernelData.java
  10. +6 −6 enterprise/ha/src/main/java/org/neo4j/kernel/ha/ClusterDatabaseInfoProvider.java
  11. +66 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/DefaultSlaveFactory.java
  12. +3 −3 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaIdGeneratorFactory.java
  13. +7 −32 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighAvailabilityBean.java
  14. +0 −413 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighAvailabilityMembers.java
  15. +13 −9 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java
  16. +27 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveFactory.java
  17. +2 −2 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/AbstractModeSwitcher.java
  18. +33 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailability.java
  19. +7 −2 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java
  20. +1 −1  enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java
  21. +28 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMonitor.java
  22. +110 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/AbstractHighAvailabilityMembers.java
  23. +205 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilityMembers.java
  24. +155 −0 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java
  25. +18 −6 enterprise/ha/src/test/java/jmx/HaBeanIT.java
  26. +1 −1  ...ise/ha/src/test/java/org/neo4j/kernel/ha/{TestTxPushStrategyConfig.java → TxPushStrategyConfigIT.java}
  27. +165 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMemberMatcher.java
  28. +243 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilityMembersTest.java
  29. +91 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesIT.java
  30. +186 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java
  31. +60 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/MockedBinding.java
  32. +132 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/MockedCluster.java
  33. +119 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/MockedClusterMonitor.java
  34. +73 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/MockedHeartbeat.java
  35. +120 −0 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/MockedHighAvailability.java
View
6 advanced/management/src/main/java/org/neo4j/management/ClusterDatabaseInfo.java
@@ -24,10 +24,10 @@
private final long lastCommittedTxId;
private final long lastUpdateTime;
- public ClusterDatabaseInfo( String instanceId, boolean available, String haRole, String[] clusterRoles, String[] uris,
- long lastCommittedTxId, long lastUpdateTime )
+ public ClusterDatabaseInfo( ClusterMemberInfo memberInfo, long lastCommittedTxId, long lastUpdateTime )
{
- super( instanceId, available, haRole, clusterRoles, uris );
+ super( memberInfo.getInstanceId(), memberInfo.isAvailable(), memberInfo.isAlive(), memberInfo.getHaRole(),
+ memberInfo.getClusterRoles(), memberInfo.getUris() );
this.lastCommittedTxId = lastCommittedTxId;
this.lastUpdateTime = lastUpdateTime;
}
View
12 advanced/management/src/main/java/org/neo4j/management/ClusterMemberInfo.java
@@ -38,15 +38,18 @@
private static final long serialVersionUID = 1L;
private final String instanceId;
private final boolean available;
+ private final boolean alive;
private final String haRole;
private final String[] clusterRoles;
private final String[] uris;
- @ConstructorProperties( { "instanceId", "available", "haRole", "clusterRoles", "uris" } )
- public ClusterMemberInfo( String instanceId, boolean available, String haRole, String[] clusterRoles, String[] uris )
+ @ConstructorProperties( { "instanceId", "available", "alive", "haRole", "clusterRoles", "uris" } )
+ public ClusterMemberInfo( String instanceId, boolean available, boolean alive, String haRole,
+ String[] clusterRoles, String[] uris )
{
this.instanceId = instanceId;
this.available = available;
+ this.alive = alive;
this.haRole = haRole;
this.clusterRoles = clusterRoles;
this.uris = uris;
@@ -62,6 +65,11 @@ public boolean isAvailable()
return available;
}
+ public boolean isAlive()
+ {
+ return alive;
+ }
+
public String getHaRole()
{
return haRole;
View
3  advanced/management/src/main/java/org/neo4j/management/HighAvailability.java
@@ -34,6 +34,9 @@
@Description( "Whether this instance is available or not" )
boolean isAvailable();
+ @Description( "Whether this instance is alive or not" )
+ boolean isAlive();
+
@Description( "The role this instance has in the cluster" )
String getRole();
View
27 enterprise/cluster/src/main/java/org/neo4j/cluster/Binding.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.cluster;
+
+public interface Binding
+{
+ void addBindingListener( BindingListener listener );
+
+ void removeBindingListener( BindingListener listener );
+}
View
42 enterprise/cluster/src/main/java/org/neo4j/cluster/ClusterMonitor.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.cluster;
+
+import org.neo4j.cluster.protocol.cluster.ClusterListener;
+import org.neo4j.cluster.protocol.heartbeat.Heartbeat;
+
+/**
+ * Bundles up different ways of listening in on events going on
+ * in a cluster.
+ *
+ * {@link Binding} for notifications about which URI is used
+ * for sending events of the network.
+ * {@link Heartbeat} for notifications about failed/alive members.
+ * {@link #addClusterListener(ClusterListener)}, {@link #removeClusterListener(ClusterListener)
+ * for getting notified about cluster membership events.
+ *
+ * @author Mattias Persson
+ */
+public interface ClusterMonitor extends Binding, Heartbeat
+{
+ void addClusterListener( ClusterListener listener);
+
+ void removeClusterListener( ClusterListener listener);
+}
View
14 enterprise/cluster/src/main/java/org/neo4j/cluster/client/ClusterClient.java
@@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import org.neo4j.cluster.BindingListener;
+import org.neo4j.cluster.ClusterMonitor;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.ConnectedStateMachines;
import org.neo4j.cluster.MultiPaxosServerFactory;
@@ -56,7 +57,7 @@
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.logging.Logging;
-public class ClusterClient extends LifecycleAdapter implements Cluster, AtomicBroadcast, Heartbeat
+public class ClusterClient extends LifecycleAdapter implements ClusterMonitor, Cluster, AtomicBroadcast
{
private final LifeSupport life = new LifeSupport();
private final Cluster cluster;
@@ -175,8 +176,8 @@ public String getAddress()
}
}, StringLogger.SYSTEM );
- server = life.add( protocolServerFactory.newProtocolServer( timeoutStrategy, networkNodeTCP, networkNodeTCP,
- acceptorInstanceStore, electionCredentialsProvider ) );
+ server = protocolServerFactory.newProtocolServer( timeoutStrategy, networkNodeTCP, networkNodeTCP,
+ acceptorInstanceStore, electionCredentialsProvider );
networkNodeTCP.addNetworkChannelsListener( new NetworkInstance.NetworkChannelsListener()
{
@@ -344,10 +345,17 @@ public void removeHeartbeatListener( HeartbeatListener listener )
heartbeat.removeHeartbeatListener( listener );
}
+ @Override
public void addBindingListener( BindingListener bindingListener )
{
server.addBindingListener( bindingListener );
}
+
+ @Override
+ public void removeBindingListener( BindingListener listener )
+ {
+ server.removeBindingListener( listener );
+ }
public void dumpDiagnostics( StringBuilder appendTo )
{
View
3  ...rprise/cluster/src/main/java/org/neo4j/cluster/protocol/cluster/ClusterConfiguration.java
@@ -128,8 +128,7 @@ public void setRoles( Map<String, URI> roles )
assert members.contains( uri );
}
- this.roles.clear();
- this.roles.putAll( roles );
+ this.roles = new HashMap<String, URI>( roles );
}
public List<URI> getMembers()
View
2  enterprise/com/src/main/java/org/neo4j/com/Client.java
@@ -20,9 +20,9 @@
package org.neo4j.com;
import static org.neo4j.com.Protocol.addLengthFieldPipes;
+import static org.neo4j.com.Protocol.assertChunkSizeIsWithinFrameSize;
import static org.neo4j.com.Protocol.readString;
import static org.neo4j.com.Protocol.writeString;
-import static org.neo4j.com.Server.assertChunkSizeIsWithinFrameSize;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
View
10 enterprise/ha/src/main/java/org/neo4j/kernel/HighlyAvailableKernelData.java
@@ -19,11 +19,13 @@
*/
package org.neo4j.kernel;
+import static org.neo4j.helpers.collection.IteratorUtil.asCollection;
+
import org.neo4j.kernel.ha.ClusterDatabaseInfoProvider;
-import org.neo4j.kernel.ha.HighAvailabilityMembers;
-import org.neo4j.kernel.ha.HighAvailabilityMembers.MemberInfo;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
+import org.neo4j.kernel.ha.cluster.member.HighAvailabilityMembers;
import org.neo4j.management.ClusterDatabaseInfo;
+import org.neo4j.management.ClusterMemberInfo;
public class HighlyAvailableKernelData extends KernelData
{
@@ -52,9 +54,9 @@ public GraphDatabaseAPI graphDatabase()
return db;
}
- public MemberInfo[] getClusterInfo()
+ public ClusterMemberInfo[] getClusterInfo()
{
- return memberInfo.getMembers();
+ return asCollection( memberInfo.getMembers() ).toArray( new ClusterMemberInfo[0] );
}
public ClusterDatabaseInfo getMemberInfo()
View
12 enterprise/ha/src/main/java/org/neo4j/kernel/ha/ClusterDatabaseInfoProvider.java
@@ -23,7 +23,7 @@
import org.neo4j.cluster.BindingListener;
import org.neo4j.cluster.client.ClusterClient;
-import org.neo4j.kernel.ha.HighAvailabilityMembers.MemberInfo;
+import org.neo4j.kernel.ha.cluster.member.HighAvailabilityMembers;
import org.neo4j.management.ClusterDatabaseInfo;
import org.neo4j.management.ClusterMemberInfo;
@@ -47,13 +47,13 @@ public void listeningAt( URI uri )
public ClusterDatabaseInfo getInfo()
{
- for ( MemberInfo member : members.getMembers() )
- if ( member.getClusterUri().equals( me ) )
+ for ( ClusterMemberInfo member : members.getMembers() )
+ {
+ if ( member.getInstanceId().equals( me.toString() ) )
{
- ClusterMemberInfo info = HighAvailabilityBean.clusterMemberInfo( member );
- return new ClusterDatabaseInfo( info.getInstanceId(), info.isAvailable(),
- info.getHaRole(), info.getClusterRoles(), info.getUris(), 0, 0 );
+ return new ClusterDatabaseInfo( member, 0, 0 );
}
+ }
// TODO return something instead of throwing exception, right?
throw new IllegalStateException( "Couldn't find any information about myself, can't be right" );
View
66 enterprise/ha/src/main/java/org/neo4j/kernel/ha/DefaultSlaveFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha;
+
+import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.getServerId;
+
+import java.net.URI;
+
+import org.neo4j.kernel.configuration.Config;
+import org.neo4j.kernel.impl.nioneo.store.StoreId;
+import org.neo4j.kernel.impl.nioneo.xa.NeoStoreXaDataSource;
+import org.neo4j.kernel.impl.transaction.DataSourceRegistrationListener;
+import org.neo4j.kernel.impl.transaction.XaDataSourceManager;
+import org.neo4j.kernel.impl.transaction.xaframework.XaDataSource;
+import org.neo4j.kernel.impl.util.StringLogger;
+
+public class DefaultSlaveFactory implements SlaveFactory
+{
+ private StringLogger logger;
+ private int maxConcurrentChannelsPerSlave;
+ private int chunkSize;
+ private StoreId storeId;
+
+ public DefaultSlaveFactory( XaDataSourceManager xaDsm, StringLogger logger,
+ int maxConcurrentChannelsPerSlave, int chunkSize )
+ {
+ this.logger = logger;
+ this.maxConcurrentChannelsPerSlave = maxConcurrentChannelsPerSlave;
+ this.chunkSize = chunkSize;
+ xaDsm.addDataSourceRegistrationListener( new StoreIdSettingListener() );
+ }
+
+ @Override
+ public Slave newSlave( URI uri )
+ {
+ return new SlaveClient( getServerId( uri ), uri.getHost(), uri.getPort(), logger, storeId,
+ maxConcurrentChannelsPerSlave, chunkSize );
+ }
+
+ private class StoreIdSettingListener extends DataSourceRegistrationListener.Adapter
+ {
+ @Override
+ public void registeredDataSource( XaDataSource ds )
+ {
+ if ( ds.getName().equals( Config.DEFAULT_DATA_SOURCE_NAME ) )
+ storeId = ((NeoStoreXaDataSource) ds).getStoreId();
+ }
+ }
+}
View
6 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaIdGeneratorFactory.java
@@ -27,10 +27,10 @@
import org.neo4j.kernel.DefaultIdGeneratorFactory;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.IdType;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberChangeEvent;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberListener;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.impl.nioneo.store.FileSystemAbstraction;
import org.neo4j.kernel.impl.nioneo.store.IdGenerator;
import org.neo4j.kernel.impl.nioneo.store.IdRange;
@@ -42,10 +42,10 @@
private final IdGeneratorFactory localFactory = new DefaultIdGeneratorFactory();
private final Master master;
- public HaIdGeneratorFactory( Master master, HighAvailabilityMemberStateMachine stateHandler )
+ public HaIdGeneratorFactory( Master master, HighAvailability highAvailability )
{
this.master = master;
- stateHandler.addClusterMemberListener( new HaIdGeneratorFactoryClusterMemberListener() );
+ highAvailability.addHighAvailabilityMemberListener( new HaIdGeneratorFactoryClusterMemberListener() );
}
@Override
View
39 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighAvailabilityBean.java
@@ -19,18 +19,14 @@
*/
package org.neo4j.kernel.ha;
-import java.net.URI;
-
import javax.management.NotCompliantMBeanException;
-import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Format;
import org.neo4j.helpers.Service;
import org.neo4j.jmx.impl.ManagementBeanProvider;
import org.neo4j.jmx.impl.ManagementData;
import org.neo4j.jmx.impl.Neo4jMBean;
import org.neo4j.kernel.HighlyAvailableKernelData;
-import org.neo4j.kernel.ha.HighAvailabilityMembers.MemberInfo;
import org.neo4j.management.ClusterMemberInfo;
import org.neo4j.management.HighAvailability;
@@ -93,19 +89,7 @@ public String getInstanceId()
@Override
public ClusterMemberInfo[] getInstancesInCluster()
{
- try
- {
- MemberInfo[] members = kernelData.getClusterInfo();
- ClusterMemberInfo[] result = new ClusterMemberInfo[members.length];
- for ( int i = 0; i < result.length; i++ )
- result[i] = clusterMemberInfo( members[i] );
- return result;
- }
- catch ( Throwable e )
- {
- e.printStackTrace();
- throw Exceptions.launderedException( e );
- }
+ return kernelData.getClusterInfo();
}
@Override
@@ -119,6 +103,12 @@ public boolean isAvailable()
{
return kernelData.getMemberInfo().isAvailable();
}
+
+ @Override
+ public boolean isAlive()
+ {
+ return kernelData.getMemberInfo().isAlive();
+ }
@Override
public String getLastUpdateTime()
@@ -142,19 +132,4 @@ public String update()
return "Update completed in " + time + "ms";
}
}
-
- private static String[] urisAsStrings( URI[] uris )
- {
- String[] strings = new String[uris.length];
- for ( int i = 0; i < strings.length; i++ )
- strings[i] = uris[i].toString();
- return strings;
- }
-
- public static ClusterMemberInfo clusterMemberInfo( MemberInfo member )
- {
- return new ClusterMemberInfo( "" + member.getServerId(),
- member.isAvailable(), member.getHaRole(), member.getClusterRoles(),
- HighAvailabilityBean.urisAsStrings( member.getUris() ) );
- }
}
View
413 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighAvailabilityMembers.java
@@ -1,413 +0,0 @@
-/**
- * Copyright (c) 2002-2012 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package org.neo4j.kernel.ha;
-
-import static org.neo4j.helpers.collection.IteratorUtil.asCollection;
-import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.getServerId;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.neo4j.cluster.client.ClusterClient;
-import org.neo4j.cluster.protocol.cluster.ClusterConfiguration;
-import org.neo4j.cluster.protocol.cluster.ClusterListener;
-import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
-import org.neo4j.com.ComSettings;
-import org.neo4j.helpers.Predicate;
-import org.neo4j.helpers.collection.FilteringIterable;
-import org.neo4j.helpers.collection.IterableWrapper;
-import org.neo4j.kernel.configuration.Config;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberChangeEvent;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberListener;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
-import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher;
-import org.neo4j.kernel.impl.nioneo.store.StoreId;
-import org.neo4j.kernel.impl.nioneo.xa.NeoStoreXaDataSource;
-import org.neo4j.kernel.impl.transaction.DataSourceRegistrationListener;
-import org.neo4j.kernel.impl.transaction.XaDataSourceManager;
-import org.neo4j.kernel.impl.transaction.xaframework.XaDataSource;
-import org.neo4j.kernel.impl.util.StringLogger;
-import org.neo4j.kernel.lifecycle.LifeSupport;
-import org.neo4j.kernel.lifecycle.Lifecycle;
-
-/**
- * Keeps an accurate list of alive and available slaves in the cluster.
- * If we are the master in the cluster then the {@link Slave}s will have
- * active connections to the slaves themselves, otherwise they will just
- * be something that holds information about a slave.
- */
-public class HighAvailabilityMembers implements Slaves, Lifecycle
-{
- private final Map<URI /*Member cluster URI, not HA URI*/, Member> members;
- protected final StringLogger msgLog;
- private StoreId storeId;
- private final Config config;
- private final LifeSupport life;
- private final ClusterClient clusterClient;
-
- public HighAvailabilityMembers( ClusterClient clusterClient, HighAvailabilityMemberStateMachine clusterEvents,
- StringLogger msgLog, Config config, XaDataSourceManager xaDsm )
- {
- this.clusterClient = clusterClient;
- this.msgLog = msgLog;
- this.config = config;
- this.life = new LifeSupport();
- this.members = Collections.synchronizedMap( new HashMap<URI, Member>() );
- clusterEvents.addClusterMemberListener( new MembersListener() );
- xaDsm.addDataSourceRegistrationListener( new StoreIdSettingListener() );
- clusterClient.addClusterListener( new MembersClusterListener() );
- clusterClient.addHeartbeatListener( new MembersHeartbeatListener() );
- }
-
- @Override
- public void init() throws Throwable
- {
- life.init();
- }
-
- @Override
- public void start() throws Throwable
- {
- life.start();
- }
-
- @Override
- public void stop() throws Throwable
- {
- life.stop();
- }
-
- @Override
- public void shutdown() throws Throwable
- {
- life.shutdown();
- }
-
- public MemberInfo[] getMembers()
- {
- return this.members.values()
- .toArray( new MemberInfo[0] );
- }
-
- @Override
- public Iterable<Slave> getSlaves()
- {
- List<Member> members = Arrays.asList( internalGetMembers() );
- Iterable<Member> aliveMembers = new FilteringIterable<Member>( members,
- ALIVE_SLAVE );
- return new IterableWrapper<Slave, Member>( aliveMembers )
- {
- @Override
- protected Slave underlyingObjectToObject( Member member )
- {
- return member.slaveClient;
- }
- };
- }
-
- private Member[] internalGetMembers()
- {
- return members.values()
- .toArray( new Member[0] );
- }
-
- private Member getMember( URI uri )
- {
- Member member = members.get( uri );
- if ( member == null )
- {
- throw new IllegalStateException( "Member " + uri
- + " doesn't exist amongst "
- + members );
- }
- return member;
- }
-
- private static final Predicate<Member> ALIVE_SLAVE = new Predicate<HighAvailabilityMembers.Member>()
- {
- @Override
- public boolean accept( Member item )
- {
- return item.available && item.isSlave();
- }
- };
-
- public interface MemberInfo
- {
- int getServerId();
-
- URI getClusterUri();
-
- URI[] getUris();
-
- boolean isSlave();
-
- boolean isMaster();
-
- String getHaRole();
-
- String[] getClusterRoles();
-
- boolean isAvailable();
- }
-
- private class Member implements MemberInfo
- {
- private final URI clusterUri;
- private final Set<String> clusterRoles;
- private HighAvailabilityMemberState role = HighAvailabilityMemberState.PENDING; // TODO reuse that enum here really?
- private URI haUri;
- private boolean available;
-
- // Only assigned if I'm the master
- private SlaveClient slaveClient;
-
- public Member( URI clusterUri, Iterable<String> initialClusterRoles )
- {
- this.clusterRoles = new HashSet<String>( asCollection( initialClusterRoles ) );
- this.clusterUri = clusterUri;
- }
-
- @Override
- public boolean isSlave()
- {
- return this.role == HighAvailabilityMemberState.SLAVE;
- }
-
- @Override
- public boolean isMaster()
- {
- return this.role == HighAvailabilityMemberState.MASTER;
- }
-
- void setSlaveClient( SlaveClient client )
- {
- this.slaveClient = life.add( client );
- }
-
- void becomeAvailable( URI haUri, HighAvailabilityMemberState role )
- {
- this.haUri = haUri;
- this.available = true;
- this.role = role;
- }
-
- @Override
- public int getServerId()
- {
- return haUri != null ? HighAvailabilityModeSwitcher.getServerId( haUri ) : -1;
- }
-
- @Override
- public String toString()
- {
- return "Member [clusterUri=" + clusterUri + ", role=" + role + ", haUri=" + haUri +
- ", alive=" + available + ", slaveClient=" + slaveClient + "]";
- }
-
- @Override
- public URI getClusterUri()
- {
- return this.clusterUri;
- }
-
- @Override
- public URI[] getUris()
- {
- Collection<URI> uris = new ArrayList<URI>();
- uris.add( clusterUri );
- URI haUri = this.haUri;
- if ( haUri != null )
- uris.add( haUri );
- return uris.toArray( new URI[0] );
- }
-
- @Override
- public boolean isAvailable()
- {
- return this.available;
- }
-
- @Override
- public String getHaRole()
- {
- if ( isMaster() )
- return HighAvailabilityMemberState.MASTER.name();
- if ( isSlave() )
- return HighAvailabilityMemberState.SLAVE.name();
- return "";
- }
-
- @Override
- public synchronized String[] getClusterRoles()
- {
- return clusterRoles.toArray( new String[0] );
- }
-
- public void becomeUnavailable()
- {
- available = false;
- this.role = HighAvailabilityMemberState.PENDING;
- }
- }
-
- private class MembersListener extends HighAvailabilityMemberListener.Adapter
- {
- private volatile boolean instantiateFullSlaveClients;
- private volatile URI previouslyElectedMaster;
-
- @Override
- public void masterIsElected( HighAvailabilityMemberChangeEvent event )
- {
- URI masterUri = event.getServerClusterUri();
- instantiateFullSlaveClients = masterUri.equals( clusterClient.getServerUri() );
- if ( previouslyElectedMaster == null || !previouslyElectedMaster.equals( masterUri ) )
- {
- life.clear();
- for ( Member member : internalGetMembers() )
- member.becomeUnavailable();
- previouslyElectedMaster = masterUri;
- }
- }
-
- @Override
- public void slaveIsAvailable( HighAvailabilityMemberChangeEvent event )
- {
- Member member = getMember( event.getServerClusterUri() );
- URI haUri = event.getServerHaUri();
- if ( instantiateFullSlaveClients )
- {
- SlaveClient client = new SlaveClient(
- getServerId( haUri ),
- haUri.getHost(), haUri.getPort(), msgLog, storeId,
- config.get( HaSettings.max_concurrent_channels_per_slave ),
- config.get( ComSettings.com_chunk_size ) );
- member.setSlaveClient( client );
- }
- member.becomeAvailable( haUri, HighAvailabilityMemberState.SLAVE );
- }
-
- @Override
- public void masterIsAvailable( HighAvailabilityMemberChangeEvent event )
- {
- // TODO anything special for master?
- Member member = getMember( event.getServerClusterUri() );
- member.becomeAvailable( event.getServerHaUri(), HighAvailabilityMemberState.MASTER );
- }
-
- @Override
- public void instanceStops( HighAvailabilityMemberChangeEvent event )
- {
- // Here event.getServerClusterURI() seems to be null, so don't
- // update any member here
- }
- }
-
- private class StoreIdSettingListener extends DataSourceRegistrationListener.Adapter
- {
- @Override
- public void registeredDataSource( XaDataSource ds )
- {
- if ( ds.getName().equals( Config.DEFAULT_DATA_SOURCE_NAME ) )
- {
- NeoStoreXaDataSource neoXaDs = (NeoStoreXaDataSource) ds;
- storeId = neoXaDs.getStoreId();
- }
- }
- }
-
- private class MembersClusterListener extends ClusterListener.Adapter
- {
- private ClusterConfiguration clusterConfiguration;
-
- @Override
- public void enteredCluster( ClusterConfiguration clusterConfiguration )
- {
- this.clusterConfiguration = clusterConfiguration;
- // This should be the first thing we get back, i.e.
- // the initial full cluster configuration
- for ( URI memberClusterUri : clusterConfiguration.getMembers() )
- {
- Member member = new Member( memberClusterUri, clusterConfiguration.getRolesOf( memberClusterUri ) );
- members.put( memberClusterUri, member );
- }
- }
-
- @Override
- public void leftCluster( URI member )
- {
- memberLeft( member );
- }
-
- @Override
- public void leftCluster()
- {
- memberLeft( clusterClient.getServerUri() );
- }
-
- @Override
- public void joinedCluster( URI memberClusterUri )
- {
- members.put( memberClusterUri, new Member( memberClusterUri,
- clusterConfiguration.getRolesOf( memberClusterUri ) ) );
- }
- }
-
- private class MembersHeartbeatListener implements HeartbeatListener
- {
- @Override
- public void failed( URI server )
- {
- getMember( server ).available = false;
- }
-
- @Override
- public void alive( URI server )
- {
- getMember( server ).available = true;
- }
- }
-
- private void memberLeft( URI memberClusterUri )
- {
- Member removedMember = members.remove( memberClusterUri );
- assert removedMember != null : "Tried to remove member that wasn't there";
- life.remove( removedMember );
- }
-
- // public void debug( String string )
- // {
- // System.out.println( clusterClient.getServerUri() + ":" + string );
- // System.out.println( "{" );
- // for ( Member member : internalGetMembers() )
- // System.out.println( " " + member );
- // System.out.println( "}" );
- // }
-}
View
22 enterprise/ha/src/main/java/org/neo4j/kernel/ha/HighlyAvailableGraphDatabase.java
@@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
-import ch.qos.logback.classic.LoggerContext;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.client.ClusterClient;
import org.neo4j.cluster.com.NetworkInstance;
@@ -46,6 +45,8 @@
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberState;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher;
+import org.neo4j.kernel.ha.cluster.member.HighAvailabilityMembers;
+import org.neo4j.kernel.ha.cluster.member.HighAvailabilitySlaves;
import org.neo4j.kernel.ha.cluster.paxos.PaxosHighAvailabilityEvents;
import org.neo4j.kernel.impl.cache.CacheProvider;
import org.neo4j.kernel.impl.core.Caches;
@@ -62,10 +63,13 @@
import org.neo4j.kernel.logging.LogbackService;
import org.neo4j.kernel.logging.Logging;
+import ch.qos.logback.classic.LoggerContext;
+
public class HighlyAvailableGraphDatabase extends InternalAbstractGraphDatabase
{
private RequestContextFactory requestContextFactory;
- private HighAvailabilityMembers slaves;
+ private Slaves slaves;
+ private HighAvailabilityMembers members;
private DelegateInvocationHandler delegateInvocationHandler;
private LoggerContext loggerContext;
private DefaultTransactionSupport transactionSupport;
@@ -177,8 +181,6 @@ protected TxHook createTxHook()
memberContext = new HighAvailabilityMemberContext( clusterClient );
- life.add( memberContext );
-
memberStateMachine = new HighAvailabilityMemberStateMachine( memberContext, accessGuard, clusterEvents,
logging.getLogger( HighAvailabilityMemberStateMachine.class ) );
life.add( new HighAvailabilityModeSwitcher( delegateInvocationHandler, clusterEvents, memberStateMachine, this,
@@ -206,8 +208,10 @@ protected TxIdGenerator createTxIdGenerator()
TxIdGenerator txIdGenerator =
(TxIdGenerator) Proxy.newProxyInstance( TxIdGenerator.class.getClassLoader(),
new Class[]{TxIdGenerator.class}, txIdGeneratorDelegate );
- slaves = life.add( new HighAvailabilityMembers( clusterClient, memberStateMachine, msgLog, config,
- xaDataSourceManager ) );
+ slaves = life.add( new HighAvailabilitySlaves( clusterClient, memberStateMachine, new DefaultSlaveFactory(
+ xaDataSourceManager, msgLog, config.get( HaSettings.max_concurrent_channels_per_slave ),
+ config.get( ComSettings.com_chunk_size ) ), logging ) );
+ members = new HighAvailabilityMembers( clusterClient, memberStateMachine );
new TxIdGeneratorModeSwitcher( memberStateMachine, txIdGeneratorDelegate,
(HaXaDataSourceManager) xaDataSourceManager, master, requestContextFactory, msgLog, config, slaves );
return txIdGenerator;
@@ -262,14 +266,14 @@ protected void createNeoDataSource()
@Override
protected KernelData createKernelData()
{
- return new HighlyAvailableKernelData( this, this.slaves,
- new ClusterDatabaseInfoProvider( clusterClient, slaves ) );
+ return new HighlyAvailableKernelData( this, members,
+ new ClusterDatabaseInfoProvider( clusterClient, members ) );
}
@Override
protected void registerRecovery()
{
- memberStateMachine.addClusterMemberListener( new HighAvailabilityMemberListener()
+ memberStateMachine.addHighAvailabilityMemberListener( new HighAvailabilityMemberListener()
{
@Override
public void masterIsElected( HighAvailabilityMemberChangeEvent event )
View
27 enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha;
+
+import java.net.URI;
+
+public interface SlaveFactory
+{
+ Slave newSlave( URI uri );
+}
View
4 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/AbstractModeSwitcher.java
@@ -37,11 +37,11 @@
private final DelegateInvocationHandler<T> delegate;
private LifeSupport life;
- protected AbstractModeSwitcher( HighAvailabilityMemberStateMachine stateMachine, DelegateInvocationHandler<T> delegate )
+ protected AbstractModeSwitcher( HighAvailability highAvailability, DelegateInvocationHandler<T> delegate )
{
this.delegate = delegate;
this.life = new LifeSupport();
- stateMachine.addClusterMemberListener( new DelegateStateSwitcher() );
+ highAvailability.addHighAvailabilityMemberListener( new DelegateStateSwitcher() );
}
@Override
View
33 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailability.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster;
+
+/**
+ * A place to register {@link HighAvailabilityMemberListener listeners}
+ * that will receive events about (high) availability and roles in a cluster.
+ *
+ * @author Mattias Persson
+ */
+public interface HighAvailability
+{
+ void addHighAvailabilityMemberListener( HighAvailabilityMemberListener listener );
+
+ void removeHighAvailabilityMemberListener( HighAvailabilityMemberListener listener );
+}
View
9 ...rise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMemberStateMachine.java
@@ -35,7 +35,7 @@
* that wants to know what is going on should register ClusterMemberListener implementations
* which will receive callbacks on state changes.
*/
-public class HighAvailabilityMemberStateMachine extends LifecycleAdapter
+public class HighAvailabilityMemberStateMachine extends LifecycleAdapter implements HighAvailability
{
private final HighAvailabilityMemberContext context;
private final InstanceAccessGuard accessGuard;
@@ -73,11 +73,16 @@ public void notify( HighAvailabilityMemberListener listener )
accessGuard.setState( state );
}
- public void addClusterMemberListener( HighAvailabilityMemberListener toAdd )
+ public void addHighAvailabilityMemberListener( HighAvailabilityMemberListener toAdd )
{
memberListeners = Listeners.addListener( toAdd, memberListeners );
}
+ public void removeHighAvailabilityMemberListener( HighAvailabilityMemberListener toRemove )
+ {
+ memberListeners = Listeners.removeListener( toRemove, memberListeners );
+ }
+
public HighAvailabilityMemberState getCurrentState()
{
return state;
View
2  enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityModeSwitcher.java
@@ -119,7 +119,7 @@ public HighAvailabilityModeSwitcher( DelegateInvocationHandler delegateHandler,
this.config = config;
this.msgLog = msgLog;
this.life = new LifeSupport();
- stateHandler.addClusterMemberListener( this );
+ stateHandler.addHighAvailabilityMemberListener( this );
}
@Override
View
28 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/HighAvailabilityMonitor.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster;
+
+import org.neo4j.cluster.Binding;
+import org.neo4j.cluster.protocol.cluster.Cluster;
+import org.neo4j.cluster.protocol.heartbeat.Heartbeat;
+
+public interface HighAvailabilityMonitor extends HighAvailability, Cluster, Heartbeat, Binding
+{
+}
View
110 .../ha/src/main/java/org/neo4j/kernel/ha/cluster/member/AbstractHighAvailabilityMembers.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import java.net.URI;
+
+import org.neo4j.cluster.Binding;
+import org.neo4j.cluster.BindingListener;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
+import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberChangeEvent;
+import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberListener;
+
+/**
+ * Basic foundation for listening to events going on in the cluster and
+ * keep up to date information for when requested.
+ *
+ * @author Mattias Persson
+ */
+public abstract class AbstractHighAvailabilityMembers
+{
+ private volatile URI myServerUri;
+ private volatile URI lastKnownElectedMaster;
+
+ protected AbstractHighAvailabilityMembers( Binding binding, HighAvailability highAvailability )
+ {
+ binding.addBindingListener( new LocalBindingListener() );
+ highAvailability.addHighAvailabilityMemberListener( new LocalHighAvailabilityMemberListener() );
+ }
+
+ private class LocalBindingListener implements BindingListener
+ {
+ @Override
+ public void listeningAt( URI me )
+ {
+ myServerUri = me;
+ }
+ }
+
+ private class LocalHighAvailabilityMemberListener extends HighAvailabilityMemberListener.Adapter
+ {
+ @Override
+ public void masterIsElected( HighAvailabilityMemberChangeEvent event )
+ {
+ URI electedMaster = event.getServerClusterUri();
+ if ( changed( electedMaster ) )
+ {
+ newMasterElected();
+ lastKnownElectedMaster = electedMaster;
+ }
+ }
+
+ private boolean changed( URI electedMaster )
+ {
+ return lastKnownElectedMaster == null || !electedMaster.equals( lastKnownElectedMaster );
+ }
+
+ @Override
+ public void slaveIsAvailable( HighAvailabilityMemberChangeEvent event )
+ {
+ AbstractHighAvailabilityMembers.this.slaveIsAvailable( event.getServerClusterUri(), event.getServerHaUri(), iAmMaster() );
+ }
+
+ @Override
+ public void masterIsAvailable( HighAvailabilityMemberChangeEvent event )
+ {
+ AbstractHighAvailabilityMembers.this.masterIsAvailable( event.getServerClusterUri(), event.getServerHaUri(), iAmMaster() );
+ }
+
+ private boolean iAmMaster()
+ {
+ return lastKnownElectedMaster.equals( getMyServerUri() );
+ }
+ }
+
+ protected URI getMyServerUri()
+ {
+ if ( myServerUri == null )
+ throw new IllegalStateException( "My server URI not retreived yet" );
+ return myServerUri;
+ }
+
+ protected void newMasterElected()
+ {
+ }
+
+ protected void slaveIsAvailable( URI serverClusterUri, URI serverHaUri, boolean iAmMaster )
+ {
+ }
+
+ protected void masterIsAvailable( URI serverClusterUri, URI serverHaUri, boolean iAmMaster )
+ {
+ }
+}
View
205 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilityMembers.java
@@ -0,0 +1,205 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import static org.neo4j.helpers.collection.Iterables.map;
+import static org.neo4j.helpers.collection.IteratorUtil.asCollection;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.neo4j.cluster.ClusterMonitor;
+import org.neo4j.cluster.protocol.cluster.ClusterConfiguration;
+import org.neo4j.cluster.protocol.cluster.ClusterListener;
+import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
+import org.neo4j.helpers.Function;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
+import org.neo4j.kernel.impl.util.CopyOnWriteHashMap;
+import org.neo4j.management.ClusterMemberInfo;
+
+/**
+ * Keeps an up to date list of members, their roles and availability for
+ * display for example in JMX.
+ *
+ * @author Mattias Persson
+ */
+public class HighAvailabilityMembers extends AbstractHighAvailabilityMembers
+{
+ private final Map<URI, Member> members = new CopyOnWriteHashMap<URI, Member>();
+ private ClusterConfiguration clusterConfiguration;
+
+ public HighAvailabilityMembers( ClusterMonitor clusterMonitor, HighAvailability highAvailability )
+ {
+ super( clusterMonitor, highAvailability );
+ clusterMonitor.addClusterListener( new LocalClusterListener() );
+ clusterMonitor.addHeartbeatListener( new LocalHeartbeatListener() );
+ }
+
+ public Iterable<ClusterMemberInfo> getMembers()
+ {
+ return map( new Function<Member, ClusterMemberInfo>()
+ {
+ @Override
+ public ClusterMemberInfo map( Member from )
+ {
+ return from.asClusterMemberInfo();
+ }
+ }, members.values() );
+ }
+
+ private class LocalHeartbeatListener extends HeartbeatListener.Adapter
+ {
+ @Override
+ public void failed( URI server )
+ {
+ members.put( server, getMember( server ).setAlive( false ) );
+ }
+
+ @Override
+ public void alive( URI server )
+ {
+ members.put( server, getMember( server ).setAlive( true ) );
+ }
+ }
+
+ private class LocalClusterListener extends ClusterListener.Adapter
+ {
+ @Override
+ public void enteredCluster( ClusterConfiguration configuration )
+ {
+ clusterConfiguration = configuration;
+ initializeMembersFromClusterConfiguration( configuration );
+ }
+
+ @Override
+ public void leftCluster()
+ {
+ members.remove( getMyServerUri() );
+ }
+
+ @Override
+ public void joinedCluster( URI member )
+ {
+ members.put( member, new Member( member ) );
+ }
+
+ @Override
+ public void leftCluster( URI member )
+ {
+ members.remove( member );
+ }
+ }
+
+ private enum HighAvailabilityRole
+ {
+ UNKNOWN,
+ SLAVE,
+ MASTER;
+ }
+
+ private class Member
+ {
+ private final URI clusterUri;
+ private final HighAvailabilityRole haRole;
+ private final URI haUri;
+ private final boolean available;
+ private final boolean alive;
+
+ public Member( URI clusterUri )
+ {
+ this( clusterUri, HighAvailabilityRole.UNKNOWN, null, false, true );
+ }
+
+ private Member( URI clusterUri, HighAvailabilityRole haRole, URI haUri, boolean available, boolean alive )
+ {
+ this.clusterUri = clusterUri;
+ this.haRole = haRole;
+ this.haUri = haUri;
+ this.available = available;
+ this.alive = alive;
+ }
+
+ protected ClusterMemberInfo asClusterMemberInfo()
+ {
+ String[] clusterRoles = asCollection( clusterConfiguration.getRolesOf( clusterUri ) ).toArray( new String[0] );
+ return new ClusterMemberInfo( clusterUri.toString(), available, alive, haRole.name(),
+ clusterRoles, new String[] { clusterUri.toString(), nullSafeUriToString( haUri ) } );
+ }
+
+ Member becomeAvailable( HighAvailabilityRole haRole, URI haUri )
+ {
+ return new Member( this.clusterUri, haRole, haUri, true, this.alive );
+ }
+
+ Member setAlive( boolean alive )
+ {
+ return new Member( this.clusterUri, this.haRole, this.haUri, this.available, alive );
+ }
+
+ @Override
+ public String toString()
+ {
+ return asClusterMemberInfo().toString();
+ }
+ }
+
+ private void initializeMembersFromClusterConfiguration( ClusterConfiguration configuration )
+ {
+ Map<URI, Member> newMembers = new HashMap<URI, Member>();
+ for ( URI memberClusterUri : clusterConfiguration.getMembers() )
+ newMembers.put( memberClusterUri, new Member( memberClusterUri ) );
+ members.clear();
+ members.putAll( newMembers );
+ }
+
+ @Override
+ protected void slaveIsAvailable( URI serverClusterUri, URI serverHaUri, boolean iAmMaster )
+ {
+ members.put( serverClusterUri, getMember( serverClusterUri )
+ .becomeAvailable( HighAvailabilityRole.SLAVE, serverHaUri ) );
+ }
+
+ @Override
+ protected void masterIsAvailable( URI serverClusterUri, URI serverHaUri, boolean iAmMaster )
+ {
+ members.put( serverClusterUri, getMember( serverClusterUri )
+ .becomeAvailable( HighAvailabilityRole.MASTER, serverHaUri ) );
+ }
+
+ @Override
+ protected void newMasterElected()
+ {
+ initializeMembersFromClusterConfiguration( clusterConfiguration );
+ }
+
+ protected Member getMember( URI server )
+ {
+ Member member = members.get( server );
+ if ( member == null )
+ throw new IllegalStateException( "Member " + server + " not found in " + members );
+ return member;
+ }
+
+ private static String nullSafeUriToString( URI uri )
+ {
+ return uri != null ? uri.toString() : null;
+ }
+}
View
155 enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlaves.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.neo4j.cluster.ClusterMonitor;
+import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
+import org.neo4j.helpers.Predicate;
+import org.neo4j.helpers.collection.FilteringIterable;
+import org.neo4j.helpers.collection.IterableWrapper;
+import org.neo4j.kernel.ha.Slave;
+import org.neo4j.kernel.ha.SlaveFactory;
+import org.neo4j.kernel.ha.Slaves;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
+import org.neo4j.kernel.impl.util.CopyOnWriteHashMap;
+import org.neo4j.kernel.lifecycle.LifeSupport;
+import org.neo4j.kernel.lifecycle.Lifecycle;
+import org.neo4j.kernel.logging.Logging;
+
+/**
+ * Keeps active connections to {@link Slave slaves} for a master to communicate to
+ * when so needed.
+ *
+ * @author Mattias Persson
+ */
+public class HighAvailabilitySlaves extends AbstractHighAvailabilityMembers implements Lifecycle, Slaves
+{
+ private static final Predicate<SlaveContext> AVAILABLE = new Predicate<HighAvailabilitySlaves.SlaveContext>()
+ {
+ @Override
+ public boolean accept( SlaveContext item )
+ {
+ return item.available;
+ }
+ };
+
+ private final LifeSupport life = new LifeSupport();
+ private final Map<URI, SlaveContext> slaves = new CopyOnWriteHashMap<URI, SlaveContext>();
+ private SlaveFactory slaveFactory;
+
+ public HighAvailabilitySlaves( ClusterMonitor clusterMonitor, HighAvailability highAvailability,
+ SlaveFactory slaveFactory, Logging logging )
+ {
+ super( clusterMonitor, highAvailability );
+ this.slaveFactory = slaveFactory;
+ clusterMonitor.addHeartbeatListener( new LocalHeartbeatListener() );
+ }
+
+ private static class SlaveContext
+ {
+ private final Slave slave;
+ private volatile boolean available = true;
+
+ protected SlaveContext( Slave slave )
+ {
+ this.slave = slave;
+ }
+ }
+
+ private class LocalHeartbeatListener extends HeartbeatListener.Adapter
+ {
+ @Override
+ public void failed( URI server )
+ {
+ getSlave( server ).available = false;
+ }
+
+ @Override
+ public void alive( URI server )
+ {
+ getSlave( server ).available = true;
+ }
+ }
+
+ @Override
+ public Iterable<Slave> getSlaves()
+ {
+ return new IterableWrapper<Slave, SlaveContext>( new FilteringIterable<SlaveContext>( slaves.values(), AVAILABLE ) )
+ {
+ @Override
+ protected Slave underlyingObjectToObject( SlaveContext context )
+ {
+ return context.slave;
+ }
+ };
+ }
+
+ public SlaveContext getSlave( URI server )
+ {
+ SlaveContext slave = slaves.get( server );
+ if ( slave == null )
+ throw new IllegalStateException( "Slave for '" + server + "' not found" );
+ return slave;
+ }
+
+ @Override
+ protected void slaveIsAvailable( URI serverClusterUri, URI serverHaUri, boolean iAmMaster )
+ {
+ if ( iAmMaster )
+ {
+ Slave slave = life.add( slaveFactory.newSlave( serverHaUri ) );
+ slaves.put( serverClusterUri, new SlaveContext( slave ) );
+ }
+ }
+
+ @Override
+ public void init() throws Throwable
+ {
+ life.init();
+ }
+
+ @Override
+ public void start() throws Throwable
+ {
+ life.start();
+ }
+
+ @Override
+ public void stop() throws Throwable
+ {
+ life.stop();
+ }
+
+ @Override
+ public void shutdown() throws Throwable
+ {
+ life.shutdown();
+ }
+
+ @Override
+ protected void newMasterElected()
+ {
+ life.clear();
+ slaves.clear();
+ }
+}
View
24 enterprise/ha/src/test/java/jmx/HaBeanIT.java
@@ -105,7 +105,7 @@ private void assertMasterInformation( HighAvailability ha )
assertTrue( "single instance should be master and available", ha.isAvailable() );
assertEquals( "single instance should be master", HighAvailabilityMemberState.MASTER.name(), ha.getRole() );
ClusterMemberInfo info = ha.getInstancesInCluster()[0];
- assertEquals( "single instance should be the returned instance id", "1", info.getInstanceId() );
+ assertTrue( "single instance should be the returned instance id", info.getInstanceId().endsWith( ":5001" ) );
assertTrue( "single instance should have coordinator cluster role", Arrays.equals( info.getClusterRoles(),
new String[]{ClusterConfiguration.COORDINATOR} ) );
}
@@ -186,7 +186,7 @@ public void leftInstanceDisappearsFromMemberList() throws Throwable
}
@Test
- public void failedMemberIsStillInMemberListAlthoughUnavailable() throws Throwable
+ public void failedMemberIsStillInMemberListAlthoughFailed() throws Throwable
{
startCluster( 3 );
assertEquals( 3, ha( cluster.getAnySlave() ).getInstancesInCluster().length );
@@ -194,8 +194,8 @@ public void failedMemberIsStillInMemberListAlthoughUnavailable() throws Throwabl
// Fail the instance
HighlyAvailableGraphDatabase failedDb = cluster.getAnySlave();
RepairKit dbFailure = cluster.fail( failedDb );
- await( ha( cluster.getMaster() ), dbAvailability( false ) );
- await( ha( cluster.getAnySlave( failedDb )), dbAvailability( false ) );
+ await( ha( cluster.getMaster() ), dbAlive( false ) );
+ await( ha( cluster.getAnySlave( failedDb )), dbAlive( false ) );
// Repair the failure and come back
dbFailure.repair();
@@ -206,14 +206,14 @@ public void failedMemberIsStillInMemberListAlthoughUnavailable() throws Throwabl
private void assertMasterAndSlaveInformation( ClusterMemberInfo[] instancesInCluster ) throws Exception
{
ClusterMemberInfo master = member( instancesInCluster, 5001 );
- assertEquals( "1", master.getInstanceId() );
+ assertTrue( master.getInstanceId().endsWith( ":5001" ) );
assertEquals( HighAvailabilityMemberState.MASTER.name(), master.getHaRole() );
assertTrue( "Unexpected start of HA URI " + uri( "ha", master.getUris() ),
uri( "ha", master.getUris() ).startsWith( "ha://" + InetAddress.getLocalHost().getHostAddress() + ":1137" ) );
assertTrue( "Master not available", master.isAvailable() );
ClusterMemberInfo slave = member( instancesInCluster, 5002 );
- assertEquals( "2", slave.getInstanceId() );
+ assertTrue( slave.getInstanceId().endsWith( ":5002" ) );
assertEquals( HighAvailabilityMemberState.SLAVE.name(), slave.getHaRole() );
assertTrue( "Unexpected start of HA URI" + uri( "ha", slave.getUris() ),
uri( "ha", slave.getUris() ).startsWith( "ha://" + InetAddress.getLocalHost().getHostAddress() + ":1138" ) );
@@ -263,4 +263,16 @@ public boolean accept( ClusterMemberInfo item )
}
};
}
+
+ private Predicate<ClusterMemberInfo> dbAlive( final boolean alive )
+ {
+ return new Predicate<ClusterMemberInfo>()
+ {
+ @Override
+ public boolean accept( ClusterMemberInfo item )
+ {
+ return item.isAlive() == alive;
+ }
+ };
+ }
}
View
2  ...j/kernel/ha/TestTxPushStrategyConfig.java → ...o4j/kernel/ha/TxPushStrategyConfigIT.java
@@ -38,7 +38,7 @@
import org.neo4j.test.ha.ClusterManager;
import org.neo4j.test.ha.ClusterManager.ManagedCluster;
-public class TestTxPushStrategyConfig
+public class TxPushStrategyConfigIT
{
private LifeSupport life = new LifeSupport();
private ManagedCluster cluster;
View
165 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/ClusterMemberMatcher.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import static java.util.Arrays.asList;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.neo4j.management.ClusterMemberInfo;
+
+public class ClusterMemberMatcher extends BaseMatcher<Iterable<ClusterMemberInfo>>
+{
+ private boolean trueForExactFalseForContains;
+ private ClusterMemberMatch[] expectedMembers;
+
+ public ClusterMemberMatcher( boolean trueForExactFalseForContains, ClusterMemberMatch[] expected )
+ {
+ this.trueForExactFalseForContains = trueForExactFalseForContains;
+ this.expectedMembers = expected;
+ }
+
+ @Override
+ public void describeTo( Description description )
+ {
+ description.appendText( Arrays.toString( expectedMembers ) );
+ }
+
+ @Override
+ public boolean matches( Object item )
+ {
+ if ( !( item instanceof Iterable ) )
+ return false;
+
+ @SuppressWarnings( "unchecked" )
+ Iterable<ClusterMemberInfo> other = (Iterable<ClusterMemberInfo>) item;
+ int foundCount = 0;
+ for ( ClusterMemberMatch expectedMember : expectedMembers )
+ {
+ boolean found = false;
+ for ( ClusterMemberInfo member : other )
+ {
+ if ( expectedMember.match( member ) )
+ {
+ found = true;
+ foundCount++;
+ break;
+ }
+ }
+ if ( !found )
+ return false;
+ }
+
+ if ( trueForExactFalseForContains == true && foundCount != expectedMembers.length )
+ return false;
+
+ return true;
+ }
+
+ public static ClusterMemberMatch member( URI member )
+ {
+ return new ClusterMemberMatch( member );
+ }
+
+ public static ClusterMemberMatcher containsMembers( ClusterMemberMatch... expected )
+ {
+ return new ClusterMemberMatcher( false, expected );
+ }
+
+ public static ClusterMemberMatcher containsOnlyMembers( ClusterMemberMatch... expected )
+ {
+ return new ClusterMemberMatcher( true, expected );
+ }
+
+ public static class ClusterMemberMatch
+ {
+ private URI member;
+ private Boolean available;
+ private Boolean alive;
+ private String haRole;
+ private Set<String> uris;
+
+ ClusterMemberMatch( URI member )
+ {
+ this.member = member;
+ }
+
+ public ClusterMemberMatch available( boolean available )
+ {
+ this.available = available;
+ return this;
+ }
+
+ public ClusterMemberMatch alive( boolean alive )
+ {
+ this.alive = alive;
+ return this;
+ }
+
+ private boolean match( ClusterMemberInfo toMatch )
+ {
+ if ( !member.toString().equals( toMatch.getInstanceId() ) )
+ return false;
+ if ( available != null && toMatch.isAvailable() != available )
+ return false;
+ if ( alive != null && toMatch.isAlive() != alive )
+ return false;
+ if ( haRole != null && !haRole.equals( toMatch.getHaRole() ) )
+ return false;
+ if ( uris != null && !uris.equals( new HashSet<String>( asList( toMatch.getUris() ) ) ) )
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder( "Member[" + member );
+ if ( available != null )
+ builder.append( ", available:" + available );
+ if ( alive != null )
+ builder.append( ", alive:" + alive );
+ if ( haRole != null )
+ builder.append( ", haRole:" + haRole );
+ if ( uris != null )
+ builder.append( ", uris:" + uris );
+ return builder.append( "]" ).toString();
+ }
+
+ public ClusterMemberMatch haRole( String role )
+ {
+ this.haRole = role;
+ return this;
+ }
+
+ public ClusterMemberMatch uris( URI... uris )
+ {
+ this.uris = new HashSet<String>();
+ for ( URI uri : uris )
+ this.uris.add( uri.toString() );
+ return this;
+ }
+ }
+}
View
243 ...rise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilityMembersTest.java
@@ -0,0 +1,243 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.neo4j.kernel.ha.cluster.member.ClusterMemberMatcher.containsMembers;
+import static org.neo4j.kernel.ha.cluster.member.ClusterMemberMatcher.containsOnlyMembers;
+import static org.neo4j.kernel.ha.cluster.member.ClusterMemberMatcher.member;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.neo4j.cluster.BindingListener;
+import org.neo4j.cluster.ClusterMonitor;
+import org.neo4j.cluster.protocol.cluster.ClusterConfiguration;
+import org.neo4j.cluster.protocol.cluster.ClusterListener;
+import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
+import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberListener;
+
+public class HighAvailabilityMembersTest
+{
+ @Test
+ public void shouldRegisterItselfOnListeners() throws Exception
+ {
+ // given
+ ClusterMonitor clusterMonitor = mock( ClusterMonitor.class );
+ HighAvailability highAvailability = mock( HighAvailability.class );
+
+ // when
+ new HighAvailabilityMembers( clusterMonitor, highAvailability );
+
+ // then
+ verify( clusterMonitor ).addBindingListener( Mockito.<BindingListener>any() );
+ verify( clusterMonitor ).addClusterListener( Mockito.<ClusterListener>any() );
+ verify( clusterMonitor ).addHeartbeatListener( Mockito.<HeartbeatListener>any() );
+ verify( highAvailability ).addHighAvailabilityMemberListener( Mockito.<HighAvailabilityMemberListener>any() );
+ }
+
+ @Test
+ public void shouldContainMemberListAfterEnteringCluster() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+
+ // when
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) );
+
+ // then
+ assertThat( members.getMembers(), containsOnlyMembers(
+ member( clusterUri1 ).available( false ).alive( true ),
+ member( clusterUri2 ).available( false ).alive( true ),
+ member( clusterUri3 ).available( false ).alive( true ) ) );
+ }
+
+ @Test
+ public void joinedMemberShowsInList() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) );
+
+ // when
+ clusterMonitor.joinedCluster( clusterUri3 );
+
+ // then
+ assertThat( members.getMembers(), containsOnlyMembers(
+ member( clusterUri1 ).available( false ),
+ member( clusterUri2 ).available( false ),
+ member( clusterUri3 ).available( false ) ) );
+ }
+
+ @Test
+ public void leftMemberDissappearsFromList() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) );
+
+ // when
+ clusterMonitor.leftCluster( clusterUri2 );
+
+ // then
+ assertThat( members.getMembers(), containsOnlyMembers(
+ member( clusterUri1 ).available( false ),
+ member( clusterUri3 ).available( false ) ) );
+ }
+
+ @Test
+ public void availableMasterShowsProperInformation() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.listeningAt( clusterUri1 );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) );
+
+ // when
+ highAvailability.masterIsElectedAndAvailable( clusterUri1, haUri1 );
+
+ // then
+ assertThat( members.getMembers(), containsMembers(
+ member( clusterUri1 ).available( true ).alive( true ).haRole( "MASTER" ).uris( clusterUri1, haUri1 ) ) );
+ }
+
+ @Test
+ public void availableSlaveShowsProperInformation() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.listeningAt( clusterUri1 );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) );
+ highAvailability.masterIsElectedAndAvailable( clusterUri1, haUri1 );
+
+ // when
+ highAvailability.slaveIsAvailable( clusterUri2, haUri2 );
+
+ // then
+ assertThat( members.getMembers(), containsMembers(
+ member( clusterUri2 ).available( true ).alive( true ).haRole( "SLAVE" ).uris( clusterUri2, haUri2 ) ) );
+ }
+
+ @Test
+ public void membersShowsAsUnavailableWhenNewMasterElectedBeforeTheyBecomeAvailable() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.listeningAt( clusterUri1 );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2, clusterUri3 ) );
+ highAvailability.masterIsElectedAndAvailable( clusterUri1, haUri1 );
+ highAvailability.slaveIsAvailable( clusterUri2, haUri2 );
+ highAvailability.slaveIsAvailable( clusterUri3, haUri3 );
+
+ // when
+ highAvailability.masterIsElected( clusterUri2, haUri2 );
+
+ // then
+ assertThat( members.getMembers(), containsMembers(
+ member( clusterUri1 ).available( false ).alive( true ),
+ member( clusterUri2 ).available( false ).alive( true ),
+ member( clusterUri3 ).available( false ).alive( true ) ) );
+ }
+
+ @Test
+ public void failedMemberShowsAsSuch() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.listeningAt( clusterUri1 );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) );
+ highAvailability.masterIsElectedAndAvailable( clusterUri1, haUri1 );
+ highAvailability.slaveIsAvailable( clusterUri2, haUri2 );
+
+ // when
+ clusterMonitor.failed( clusterUri2 );
+
+ // then
+ assertThat( members.getMembers(), containsMembers(
+ member( clusterUri1 ).available( true ).haRole( "MASTER" ),
+ member( clusterUri2 ).available( true ).alive( false ) ) );
+ }
+
+ @Test
+ public void failedThenAliveMemberShowsAsAlive() throws Exception
+ {
+ // given
+ MockedClusterMonitor clusterMonitor = new MockedClusterMonitor();
+ MockedHighAvailability highAvailability = new MockedHighAvailability();
+ HighAvailabilityMembers members = new HighAvailabilityMembers( clusterMonitor, highAvailability );
+ clusterMonitor.listeningAt( clusterUri1 );
+ clusterMonitor.enteredCluster( clusterConfiguration( clusterUri1, clusterUri2 ) );
+ highAvailability.masterIsElectedAndAvailable( clusterUri1, haUri1 );
+ highAvailability.slaveIsAvailable( clusterUri2, haUri2 );
+ clusterMonitor.failed( clusterUri2 );
+
+ // when
+ clusterMonitor.alive( clusterUri2 );
+
+ // then
+ assertThat( members.getMembers(), containsMembers(
+ member( clusterUri1 ).available( true ).alive( true ).haRole( "MASTER" ),
+ member( clusterUri2 ).available( true ).alive( true ).haRole( "SLAVE" ) ) );
+ }
+
+ private static URI clusterUri1 = uri( "cluster://server1" );
+ private static URI clusterUri2 = uri( "cluster://server2" );
+ private static URI clusterUri3 = uri( "cluster://server3" );
+ private static URI haUri1 = uri( "ha://server1?serverId=1" );
+ private static URI haUri2 = uri( "ha://server2?serverId=2" );
+ private static URI haUri3 = uri( "ha://server3?serverId=3" );
+
+ private static URI uri( String string )
+ {
+ try
+ {
+ return new URI( string );
+ }
+ catch ( URISyntaxException e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+
+ private ClusterConfiguration clusterConfiguration( URI... uris )
+ {
+ return new ClusterConfiguration( "neo4j.ha", asList( uris ) );
+ }
+}
View
91 enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesIT.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.neo4j.helpers.collection.MapUtil.stringMap;
+import static org.neo4j.kernel.ha.HaSettings.tx_push_factor;
+import static org.neo4j.test.ha.ClusterManager.clusterOfSize;
+
+import org.junit.After;
+import org.junit.Test;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Transaction;
+import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
+import org.neo4j.test.TargetDirectory;
+import org.neo4j.test.ha.ClusterManager;
+import org.neo4j.test.ha.ClusterManager.ManagedCluster;
+import org.neo4j.tooling.GlobalGraphOperations;
+
+public class HighAvailabilitySlavesIT
+{
+ private final TargetDirectory DIR = TargetDirectory.forTest( getClass() );
+ private ClusterManager clusterManager;
+
+ @After
+ public void after() throws Throwable
+ {
+ clusterManager.shutdown();
+ }
+
+ @Test
+ public void transactionsGetsPushedToSlaves() throws Throwable
+ {
+ // given
+ clusterManager = new ClusterManager( clusterOfSize( 3 ), DIR.directory( "dbs", true ),
+ stringMap( tx_push_factor.name(), "2" ) );
+ clusterManager.start();
+ ManagedCluster cluster = clusterManager.getDefaultCluster();
+
+ // when
+ String name = "a node";
+ long node = createNode( cluster.getMaster(), name );
+
+ // then
+ for ( HighlyAvailableGraphDatabase db : cluster.getAllMembers() )
+ assertEquals( node, getNodeByName( db, name ) );
+ }
+
+ private long getNodeByName( HighlyAvailableGraphDatabase db, String name )
+ {
+ for ( Node node : GlobalGraphOperations.at( db ).getAllNodes() )
+ if ( name.equals( node.getProperty( "name", null ) ) )
+ return node.getId();
+ fail( "No node '" + name + "' found in " + db );
+ return 0; // Never called
+ }
+
+ private long createNode( HighlyAvailableGraphDatabase db, String name )
+ {
+ Transaction tx = db.beginTx();
+ try
+ {
+ Node node = db.createNode();
+ node.setProperty( "name", name );
+ tx.success();
+ return node.getId();
+ }
+ finally
+ {
+ tx.finish();
+ }
+ }
+}
View
186 ...prise/ha/src/test/java/org/neo4j/kernel/ha/cluster/member/HighAvailabilitySlavesTest.java
@@ -0,0 +1,186 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.neo4j.kernel.ha.cluster.member;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.neo4j.helpers.collection.IteratorUtil.asCollection;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.neo4j.cluster.BindingListener;
+import org.neo4j.cluster.ClusterMonitor;
+import org.neo4j.cluster.protocol.heartbeat.HeartbeatListener;
+import org.neo4j.kernel.ha.SlaveFactory;
+import org.neo4j.kernel.ha.cluster.HighAvailability;
+import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberListener;
+import org.neo4j.kernel.logging.DevNullLoggingService;
+
+public class HighAvailabilitySlavesTest
+{
+ private static URI uri( String string )
+ {
+ try
+ {
+ return new URI( string );
+ }
+ catch ( URISyntaxException e )
+ {
+ throw new Error( e );
+ }
+ }
+
+ private static URI clusterUri1 = uri( "cluster://server1" );
+ private static URI haUri1 = uri( "ha://server1?serverId=1" );
+ private static URI clusterUri2 = uri( "cluster://server2" );
+ private static URI haUri2 = uri( "ha://server2?serverId=2" );
+ private static URI clusterUri3 = uri( "cluster://server3" );
+ private static URI haUri3 = uri( "ha://server3?serverId=3" );
+
+ @Test
+ public void shouldRegisterItselfOnMonitors() throws Exception
+ {
+ // given
+ ClusterMonitor clusterMonitor = mock( ClusterMonitor.class );
+ HighAvailability highAvailability = mock( HighAvailability.class );
+ SlaveFactory slaveFactory = mock( SlaveFactory.class );
+
+ // when
+ new HighAvailabilitySlaves( clusterMonitor, highAvailability, slaveFactory, new DevNullLoggingService() );